package cmd import ( "context" "encoding/json" "fmt" "git.hpds.cc/Component/network/frame" "git.hpds.cc/Component/network/log" "github.com/spf13/cobra" "go.uber.org/zap" "os" "os/signal" "stream_capture_db/config" "stream_capture_db/model" "syscall" "git.hpds.cc/Component/logging" "git.hpds.cc/pavement/hpds_node" ) var ( ConfigFileFlag string = "./config/config.yaml" ) func must(err error) { if err != nil { _, _ = fmt.Fprint(os.Stderr, err) os.Exit(1) } } func NewStartCmd() *cobra.Command { cmd := &cobra.Command{ Use: "start", Short: "Start hpds_web application", Run: func(cmd *cobra.Command, args []string) { var ( cfg *config.StreamFuncCaptureDBConfig err error ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() configFileFlag, err := cmd.Flags().GetString("c") if err != nil { fmt.Println("get local config err: ", err) return } must(err) cfg, err = config.ParseConfigByFile(configFileFlag) must(err) logger := LoadLoggerConfig(cfg.Logging) //连接数据库 model.New(cfg.Db.DriveName, cfg.Db.Conn) model.Logger = LoadLoggerConfig(cfg.Logging) exitChannel := make(chan os.Signal) defer close(exitChannel) // 退出信号监听 go func(c chan os.Signal) { signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) }(exitChannel) sf := hpds_node.NewStreamFunction( cfg.Functions.Name, hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", cfg.Node.Host, cfg.Node.Port)), hpds_node.WithObserveDataTags(frame.Tag(cfg.Functions.DataTag)), hpds_node.WithCredential(cfg.Node.Token), ) _ = sf.SetHandler(handler) // start _ = sf.Connect() for { select { case <-ctx.Done(): logger.With( zap.String("web", "exit"), ).Error(ctx.Err().Error()) return case errs := <-exitChannel: logger.With( zap.String("web", "服务退出"), ).Info(errs.String()) return } } }, } cmd.Flags().StringVar(&ConfigFileFlag, "c", "./config/config.yaml", "The configuration file path") return cmd } func LoadLoggerConfig(opt config.LogOptions) *logging.Logger { return logging.NewLogger( logging.SetPath(opt.Path), logging.SetPrefix(opt.Prefix), logging.SetDevelopment(opt.Development), logging.SetDebugFileSuffix(opt.DebugFileSuffix), logging.SetWarnFileSuffix(opt.WarnFileSuffix), logging.SetErrorFileSuffix(opt.ErrorFileSuffix), logging.SetInfoFileSuffix(opt.InfoFileSuffix), logging.SetMaxAge(opt.MaxAge), logging.SetMaxBackups(opt.MaxBackups), logging.SetMaxSize(opt.MaxSize), logging.SetLevel(logging.LogLevel["debug"]), ) } func handler(data []byte) (frame.Tag, []byte) { node := new(model.Node) err := json.Unmarshal(data, node) if err != nil { stat := new(model.NodeState) err = json.Unmarshal(data, stat) if err != nil { model.Logger.With(zap.String("The received data", string(data))).Error("数据无法解析", zap.Error(err)) return 0x12, []byte(err.Error()) } stat.Insert() } else { log.Infof("接收到数据 %s", string(data)) node.Insert() } return 0x12, nil }