hpds_mq/cmd/server.go

135 lines
3.3 KiB
Go

package cmd
import (
"fmt"
"git.hpds.cc/Component/network/log"
"github.com/spf13/cobra"
"hpds_mq/config"
"os"
"os/signal"
"syscall"
"git.hpds.cc/pavement/hpds_node"
)
const (
AppLogPrefix = "\033[33m[hpds-mq:app]\033[0m "
)
var (
ConfigFileFlag string = "./config/config.yml"
ConsulAddress string = "http://localhost:8500"
NodeName string = "main-node"
Mode string = "dev"
)
func must(err error) {
if err != nil {
_, _ = fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
}
func NewStartCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "start",
Short: "Start hpds_mq broker",
Run: func(cmd *cobra.Command, args []string) {
var (
cfg *config.MessageQueueConfig
err error
)
must(err)
configFileFlag, err := cmd.Flags().GetString("c")
if err != nil {
fmt.Println("get local config err: ", err)
return
}
ConsulAddress, err = cmd.Flags().GetString("r")
if err != nil {
fmt.Println("get remote config err: ", err)
return
}
NodeName, err = cmd.Flags().GetString("n")
if err != nil {
fmt.Println("get remote path config err: ", err)
return
}
Mode, err = cmd.Flags().GetString("m")
if err != nil {
fmt.Println("get remote path config err: ", err)
return
}
if len(configFileFlag) > 1 {
cfg, err = config.ParseConfigByFile(configFileFlag)
must(err)
err = config.UpdateRemoteConfig(cfg)
must(err)
ConfigFileFlag = configFileFlag
} else {
//获取consul注册中心的配置文件
cfg, err = config.GetRemoteConfig(ConsulAddress, fmt.Sprintf("hpds-pavement/hpds_mq/%s/%s", Mode, NodeName))
must(err)
err = config.UpdateLocalConfig(cfg, ConfigFileFlag)
}
// 退出channel
exitChannel := make(chan os.Signal)
defer close(exitChannel)
// 退出信号监听
go func(c chan os.Signal) {
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
}(exitChannel)
mq, err := hpds_node.NewMq(configFileFlag)
must(err)
mq.InitOptions(hpds_node.WithAuth("token", cfg.Token))
defer func() {
_ = mq.Close()
}()
if len(cfg.CascadeNode) > 0 {
for i := 0; i < len(cfg.CascadeNode); i++ {
_ = mq.AddDownstreamMq(hpds_node.NewDownstreamMq(
cfg.CascadeNode[i].Name,
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", cfg.CascadeNode[i].Host, cfg.CascadeNode[i].Port)),
hpds_node.WithCredential(fmt.Sprintf("token:%s", cfg.CascadeNode[i].Token)),
))
}
}
// start mq service
log.Infof("%s Server has started!, pid: %d", os.Getpid())
go func() {
err = mq.ListenAndServe()
if err != nil {
log.Errorf("%s %s error %s", AppLogPrefix, cfg.Name, err.Error())
os.Exit(-1)
}
}()
select {}
},
}
cmd.Flags().StringVar(&ConfigFileFlag, "c", "./config/config.yaml", "The configuration file path")
cmd.Flags().StringVar(&ConsulAddress, "r", "http://consul.hpds.cc", "The configuration remote consul address")
cmd.Flags().StringVar(&NodeName, "n", "main-node", "The configuration name")
cmd.Flags().StringVar(&Mode, "m", "dev", "run mode : dev | test | releases")
return cmd
}
/*
func Run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 退出channel
exitChannel := make(chan os.Signal)
defer close(exitChannel)
// 退出信号监听
go func(c chan os.Signal) {
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
}(exitChannel)
}
*/