2023-03-01 14:10:50 +08:00
|
|
|
package cmd
|
|
|
|
|
|
|
|
import (
|
2023-03-13 10:30:50 +08:00
|
|
|
"context"
|
2023-03-01 14:10:50 +08:00
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"github.com/spf13/cobra"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"os"
|
|
|
|
"os/signal"
|
|
|
|
"stream_capture_db/config"
|
|
|
|
"stream_capture_db/model"
|
|
|
|
"syscall"
|
|
|
|
|
|
|
|
"git.hpds.cc/Component/logging"
|
|
|
|
"git.hpds.cc/pavement/hpds_node"
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
ConfigFileFlag string = "./config/config.yaml"
|
|
|
|
)
|
|
|
|
|
|
|
|
func must(err error) {
|
|
|
|
if err != nil {
|
2023-03-13 10:30:50 +08:00
|
|
|
_, _ = fmt.Fprint(os.Stderr, err)
|
2023-03-01 14:10:50 +08:00
|
|
|
os.Exit(1)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
func NewStartCmd() *cobra.Command {
|
|
|
|
cmd := &cobra.Command{
|
|
|
|
Use: "start",
|
|
|
|
Short: "Start hpds_web application",
|
|
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
|
|
var (
|
|
|
|
cfg *config.StreamFuncCaptureDBConfig
|
|
|
|
err error
|
|
|
|
)
|
2023-03-13 10:30:50 +08:00
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
defer cancel()
|
|
|
|
|
2023-03-01 14:10:50 +08:00
|
|
|
configFileFlag, err := cmd.Flags().GetString("c")
|
|
|
|
if err != nil {
|
|
|
|
fmt.Println("get local config err: ", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
must(err)
|
|
|
|
cfg, err = config.ParseConfigByFile(configFileFlag)
|
|
|
|
must(err)
|
2023-03-13 10:30:50 +08:00
|
|
|
logger := LoadLoggerConfig(cfg.Logging)
|
2023-03-01 14:10:50 +08:00
|
|
|
//连接数据库
|
|
|
|
model.New(cfg.Db.DriveName, cfg.Db.Conn)
|
|
|
|
|
|
|
|
model.Logger = LoadLoggerConfig(cfg.Logging)
|
|
|
|
exitChannel := make(chan os.Signal)
|
|
|
|
defer close(exitChannel)
|
|
|
|
// 退出信号监听
|
|
|
|
go func(c chan os.Signal) {
|
|
|
|
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
}(exitChannel)
|
|
|
|
sf := hpds_node.NewStreamFunction(
|
|
|
|
cfg.Functions.Name,
|
|
|
|
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", cfg.Node.Host, cfg.Node.Port)),
|
|
|
|
hpds_node.WithObserveDataTags(cfg.Functions.DataTag),
|
|
|
|
hpds_node.WithCredential(cfg.Node.Token),
|
|
|
|
)
|
|
|
|
_ = sf.SetHandler(handler)
|
|
|
|
// start
|
|
|
|
_ = sf.Connect()
|
2023-03-13 10:30:50 +08:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
logger.With(
|
|
|
|
zap.String("web", "exit"),
|
|
|
|
).Error(ctx.Err().Error())
|
|
|
|
return
|
|
|
|
case errs := <-exitChannel:
|
|
|
|
logger.With(
|
|
|
|
zap.String("web", "服务退出"),
|
|
|
|
).Info(errs.String())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
2023-03-01 14:10:50 +08:00
|
|
|
},
|
|
|
|
}
|
|
|
|
cmd.Flags().StringVar(&ConfigFileFlag, "c", "./config/config.yaml", "The configuration file path")
|
|
|
|
return cmd
|
|
|
|
}
|
|
|
|
|
|
|
|
func LoadLoggerConfig(opt config.LogOptions) *logging.Logger {
|
|
|
|
return logging.NewLogger(
|
|
|
|
logging.SetPath(opt.Path),
|
|
|
|
logging.SetPrefix(opt.Prefix),
|
|
|
|
logging.SetDevelopment(opt.Development),
|
|
|
|
logging.SetDebugFileSuffix(opt.DebugFileSuffix),
|
|
|
|
logging.SetWarnFileSuffix(opt.WarnFileSuffix),
|
|
|
|
logging.SetErrorFileSuffix(opt.ErrorFileSuffix),
|
|
|
|
logging.SetInfoFileSuffix(opt.InfoFileSuffix),
|
|
|
|
logging.SetMaxAge(opt.MaxAge),
|
|
|
|
logging.SetMaxBackups(opt.MaxBackups),
|
|
|
|
logging.SetMaxSize(opt.MaxSize),
|
|
|
|
logging.SetLevel(logging.LogLevel["debug"]),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
func handler(data []byte) (byte, []byte) {
|
|
|
|
node := new(model.Node)
|
|
|
|
err := json.Unmarshal(data, node)
|
|
|
|
if err != nil {
|
|
|
|
stat := new(model.NodeState)
|
|
|
|
err = json.Unmarshal(data, stat)
|
|
|
|
if err != nil {
|
|
|
|
model.Logger.With(zap.String("The received data", string(data))).Error("数据无法解析", zap.Error(err))
|
|
|
|
return 0x12, []byte(err.Error())
|
|
|
|
}
|
|
|
|
stat.Insert()
|
|
|
|
} else {
|
|
|
|
node.Insert()
|
|
|
|
}
|
|
|
|
return 0x12, nil
|
|
|
|
}
|