103 lines
2.5 KiB
Go
103 lines
2.5 KiB
Go
|
package cmd
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"github.com/spf13/cobra"
|
||
|
"go.uber.org/zap"
|
||
|
"os"
|
||
|
"os/signal"
|
||
|
"stream_capture_db/config"
|
||
|
"stream_capture_db/model"
|
||
|
"syscall"
|
||
|
|
||
|
"git.hpds.cc/Component/logging"
|
||
|
"git.hpds.cc/pavement/hpds_node"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ConfigFileFlag string = "./config/config.yaml"
|
||
|
)
|
||
|
|
||
|
func must(err error) {
|
||
|
if err != nil {
|
||
|
fmt.Fprint(os.Stderr, err)
|
||
|
os.Exit(1)
|
||
|
}
|
||
|
}
|
||
|
func NewStartCmd() *cobra.Command {
|
||
|
cmd := &cobra.Command{
|
||
|
Use: "start",
|
||
|
Short: "Start hpds_web application",
|
||
|
Run: func(cmd *cobra.Command, args []string) {
|
||
|
var (
|
||
|
cfg *config.StreamFuncCaptureDBConfig
|
||
|
err error
|
||
|
)
|
||
|
configFileFlag, err := cmd.Flags().GetString("c")
|
||
|
if err != nil {
|
||
|
fmt.Println("get local config err: ", err)
|
||
|
return
|
||
|
}
|
||
|
must(err)
|
||
|
cfg, err = config.ParseConfigByFile(configFileFlag)
|
||
|
must(err)
|
||
|
//连接数据库
|
||
|
model.New(cfg.Db.DriveName, cfg.Db.Conn)
|
||
|
|
||
|
model.Logger = LoadLoggerConfig(cfg.Logging)
|
||
|
exitChannel := make(chan os.Signal)
|
||
|
defer close(exitChannel)
|
||
|
// 退出信号监听
|
||
|
go func(c chan os.Signal) {
|
||
|
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
||
|
}(exitChannel)
|
||
|
sf := hpds_node.NewStreamFunction(
|
||
|
cfg.Functions.Name,
|
||
|
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", cfg.Node.Host, cfg.Node.Port)),
|
||
|
hpds_node.WithObserveDataTags(cfg.Functions.DataTag),
|
||
|
hpds_node.WithCredential(cfg.Node.Token),
|
||
|
)
|
||
|
_ = sf.SetHandler(handler)
|
||
|
// start
|
||
|
_ = sf.Connect()
|
||
|
select {}
|
||
|
},
|
||
|
}
|
||
|
cmd.Flags().StringVar(&ConfigFileFlag, "c", "./config/config.yaml", "The configuration file path")
|
||
|
return cmd
|
||
|
}
|
||
|
|
||
|
func LoadLoggerConfig(opt config.LogOptions) *logging.Logger {
|
||
|
return logging.NewLogger(
|
||
|
logging.SetPath(opt.Path),
|
||
|
logging.SetPrefix(opt.Prefix),
|
||
|
logging.SetDevelopment(opt.Development),
|
||
|
logging.SetDebugFileSuffix(opt.DebugFileSuffix),
|
||
|
logging.SetWarnFileSuffix(opt.WarnFileSuffix),
|
||
|
logging.SetErrorFileSuffix(opt.ErrorFileSuffix),
|
||
|
logging.SetInfoFileSuffix(opt.InfoFileSuffix),
|
||
|
logging.SetMaxAge(opt.MaxAge),
|
||
|
logging.SetMaxBackups(opt.MaxBackups),
|
||
|
logging.SetMaxSize(opt.MaxSize),
|
||
|
logging.SetLevel(logging.LogLevel["debug"]),
|
||
|
)
|
||
|
}
|
||
|
|
||
|
func handler(data []byte) (byte, []byte) {
|
||
|
node := new(model.Node)
|
||
|
err := json.Unmarshal(data, node)
|
||
|
if err != nil {
|
||
|
stat := new(model.NodeState)
|
||
|
err = json.Unmarshal(data, stat)
|
||
|
if err != nil {
|
||
|
model.Logger.With(zap.String("The received data", string(data))).Error("数据无法解析", zap.Error(err))
|
||
|
return 0x12, []byte(err.Error())
|
||
|
}
|
||
|
stat.Insert()
|
||
|
} else {
|
||
|
node.Insert()
|
||
|
}
|
||
|
return 0x12, nil
|
||
|
}
|