package cmd import ( "fmt" "git.hpds.cc/Component/network/log" "github.com/spf13/cobra" "hpds_mq/config" "os" "os/signal" "syscall" "git.hpds.cc/pavement/hpds_node" ) const ( AppLogPrefix = "\033[33m[hpds-mq:app]\033[0m " ) var ( ConfigFileFlag string = "./config/config.yml" ConsulAddress string = "http://localhost:8500" NodeName string = "main-node" Mode string = "dev" ) func must(err error) { if err != nil { _, _ = fmt.Fprint(os.Stderr, err) os.Exit(1) } } func NewStartCmd() *cobra.Command { cmd := &cobra.Command{ Use: "start", Short: "Start hpds_mq broker", Run: func(cmd *cobra.Command, args []string) { var ( cfg *config.MessageQueueConfig err error ) must(err) configFileFlag, err := cmd.Flags().GetString("c") if err != nil { fmt.Println("get local config err: ", err) return } ConsulAddress, err = cmd.Flags().GetString("r") if err != nil { fmt.Println("get remote config err: ", err) return } NodeName, err = cmd.Flags().GetString("n") if err != nil { fmt.Println("get remote path config err: ", err) return } Mode, err = cmd.Flags().GetString("m") if err != nil { fmt.Println("get remote path config err: ", err) return } if len(configFileFlag) > 1 { cfg, err = config.ParseConfigByFile(configFileFlag) must(err) err = config.UpdateRemoteConfig(cfg) must(err) ConfigFileFlag = configFileFlag } else { //获取consul注册中心的配置文件 cfg, err = config.GetRemoteConfig(ConsulAddress, fmt.Sprintf("hpds-pavement/hpds_mq/%s/%s", Mode, NodeName)) must(err) err = config.UpdateLocalConfig(cfg, ConfigFileFlag) } // 退出channel exitChannel := make(chan os.Signal) defer close(exitChannel) // 退出信号监听 go func(c chan os.Signal) { signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) }(exitChannel) mq, err := hpds_node.NewMq(configFileFlag) must(err) mq.InitOptions(hpds_node.WithAuth("token", cfg.Token)) defer func() { _ = mq.Close() }() if len(cfg.CascadeNode) > 0 { for i := 0; i < len(cfg.CascadeNode); i++ { _ = mq.AddDownstreamMq(hpds_node.NewDownstreamMq( cfg.CascadeNode[i].Name, hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", cfg.CascadeNode[i].Host, cfg.CascadeNode[i].Port)), hpds_node.WithCredential(fmt.Sprintf("token:%s", cfg.CascadeNode[i].Token)), )) } } // start mq service log.Infof("%s Server has started!, pid: %d", cfg.Name, os.Getpid()) go func() { err = mq.ListenAndServe() if err != nil { log.Errorf("%s %s error %s", AppLogPrefix, cfg.Name, err.Error()) os.Exit(-1) } }() select {} }, } cmd.Flags().StringVar(&ConfigFileFlag, "c", "./config/config.yaml", "The configuration file path") cmd.Flags().StringVar(&ConsulAddress, "r", "http://consul.hpds.cc", "The configuration remote consul address") cmd.Flags().StringVar(&NodeName, "n", "main-node", "The configuration name") cmd.Flags().StringVar(&Mode, "m", "dev", "run mode : dev | test | releases") return cmd } /* func Run() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 退出channel exitChannel := make(chan os.Signal) defer close(exitChannel) // 退出信号监听 go func(c chan os.Signal) { signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) }(exitChannel) } */