129 lines
3.2 KiB
Go
129 lines
3.2 KiB
Go
package cmd
|
|
|
|
import (
|
|
"fmt"
|
|
"git.hpds.cc/Component/network/log"
|
|
"github.com/spf13/cobra"
|
|
"hpds_mq/config"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
|
|
"git.hpds.cc/pavement/hpds_node"
|
|
discover "hpds_mq/internal/discover/consul"
|
|
)
|
|
|
|
var (
|
|
consulConfigs chan *discover.ConsulConfig
|
|
ConfigFileFlag string = "./config/config.yml"
|
|
ConsulAddress string = "http://localhost:8500"
|
|
NodeName string = "main-node"
|
|
Mode string = "dev"
|
|
)
|
|
|
|
func must(err error) {
|
|
if err != nil {
|
|
fmt.Fprint(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func NewStartCmd() *cobra.Command {
|
|
cmd := &cobra.Command{
|
|
Use: "start",
|
|
Short: "Start hpds_mq broker",
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
var (
|
|
cfg *config.MessageQueueConfig
|
|
err error
|
|
)
|
|
must(err)
|
|
configFileFlag, err := cmd.Flags().GetString("c")
|
|
if err != nil {
|
|
fmt.Println("get local config err: ", err)
|
|
return
|
|
}
|
|
ConsulAddress, err = cmd.Flags().GetString("r")
|
|
if err != nil {
|
|
fmt.Println("get remote config err: ", err)
|
|
return
|
|
}
|
|
NodeName, err = cmd.Flags().GetString("n")
|
|
if err != nil {
|
|
fmt.Println("get remote path config err: ", err)
|
|
return
|
|
}
|
|
Mode, err = cmd.Flags().GetString("m")
|
|
if err != nil {
|
|
fmt.Println("get remote path config err: ", err)
|
|
return
|
|
}
|
|
if len(configFileFlag) > 1 {
|
|
cfg, err = config.ParseConfigByFile(configFileFlag)
|
|
must(err)
|
|
err = config.UpdateRemoteConfig(cfg)
|
|
must(err)
|
|
ConfigFileFlag = configFileFlag
|
|
} else {
|
|
//获取consul注册中心的配置文件
|
|
cfg, err = config.GetRemoteConfig(ConsulAddress, fmt.Sprintf("hpds-pavement/hpds_mq/%s/%s", Mode, NodeName))
|
|
must(err)
|
|
err = config.UpdateLocalConfig(cfg, ConfigFileFlag)
|
|
}
|
|
|
|
// 退出channel
|
|
exitChannel := make(chan os.Signal)
|
|
defer close(exitChannel)
|
|
|
|
// 退出信号监听
|
|
go func(c chan os.Signal) {
|
|
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
|
}(exitChannel)
|
|
mq, err := hpds_node.NewMq(configFileFlag)
|
|
mq.InitOptions(hpds_node.WithAuth("token", cfg.Token))
|
|
defer mq.Close()
|
|
|
|
if len(cfg.CascadeNode) > 0 {
|
|
for i := 0; i < len(cfg.CascadeNode); i++ {
|
|
mq.AddDownstreamMq(hpds_node.NewDownstreamMq(
|
|
cfg.CascadeNode[i].Name,
|
|
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", cfg.CascadeNode[i].Host, cfg.CascadeNode[i].Port)),
|
|
hpds_node.WithCredential(fmt.Sprintf("token:%s", cfg.CascadeNode[i].Token)),
|
|
))
|
|
}
|
|
}
|
|
// start mq service
|
|
log.Printf("Server has started!, pid: %d", os.Getpid())
|
|
go func() {
|
|
err = mq.ListenAndServe()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}()
|
|
select {}
|
|
},
|
|
}
|
|
cmd.Flags().StringVar(&ConfigFileFlag, "c", "./config/config.yaml", "The configuration file path")
|
|
cmd.Flags().StringVar(&ConsulAddress, "r", "http://consul.hpds.cc", "The configuration remote consul address")
|
|
cmd.Flags().StringVar(&NodeName, "n", "main-node", "The configuration name")
|
|
cmd.Flags().StringVar(&Mode, "m", "dev", "run mode : dev | test | releases")
|
|
return cmd
|
|
}
|
|
|
|
/*
|
|
func Run() {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// 退出channel
|
|
exitChannel := make(chan os.Signal)
|
|
defer close(exitChannel)
|
|
|
|
// 退出信号监听
|
|
go func(c chan os.Signal) {
|
|
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
|
}(exitChannel)
|
|
|
|
}
|
|
*/
|