package mq import ( "encoding/json" "fmt" "go.uber.org/zap" "hpds_annotation/config" "hpds_annotation/global" "hpds_annotation/internal/proto" "net/http" "os" "strings" "time" "git.hpds.cc/Component/logging" "git.hpds.cc/Component/network/frame" "git.hpds.cc/pavement/hpds_node" "github.com/shirou/gopsutil/v3/host" ) var MqList []HpdsMqNode type HpdsMqNode struct { MqType uint Topic string Node config.HpdsNode EndPoint interface{} } var ifChannelsMapInit = false var ChannelsMap = map[string]chan string{} func initChannelsMap() { ChannelsMap = make(map[string]chan string) } func AddChannel(userId string) { if !ifChannelsMapInit { initChannelsMap() ifChannelsMapInit = true } var newChannel = make(chan string) ChannelsMap[userId] = newChannel logging.L().Info("Build SSE connection for user = " + userId) } func must(logger *logging.Logger, err error) { if err != nil { if logger != nil { logger.With(zap.String("web节点", "错误信息")).Error("启动错误", zap.Error(err)) } else { _, _ = fmt.Fprint(os.Stderr, err) } os.Exit(1) } } func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *logging.Logger) (mqList []HpdsMqNode, err error) { mqList = make([]HpdsMqNode, 0) for _, v := range funcs { switch v.MqType { case 2: sf := hpds_node.NewStreamFunction( v.Name, hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)), hpds_node.WithObserveDataTags(frame.Tag(v.DataTag)), hpds_node.WithCredential(node.Token), ) err = sf.Connect() nodeInfo := HpdsMqNode{ MqType: 2, Topic: v.Name, Node: node, EndPoint: sf, } must(logger, err) switch v.Name { case "edge-cmd-request": _ = sf.SetHandler(EdgeCmdHandle) default: } mqList = append(mqList, nodeInfo) default: ap := hpds_node.NewAccessPoint( v.Name, hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)), hpds_node.WithCredential(node.Token), ) err = ap.Connect() nodeInfo := HpdsMqNode{ MqType: 1, Topic: v.Name, Node: node, EndPoint: ap, } must(logger, err) ap.SetDataTag(frame.Tag(v.DataTag)) mqList = append(mqList, nodeInfo) } } return mqList, err } func GetMqClient(topic string, mqType uint) *HpdsMqNode { for _, v := range MqList { if v.Topic == topic && v.MqType == mqType { return &v } } return nil } func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte, logger *logging.Logger) error { logger.With(zap.String("web节点", "发送消息")).Info("数据", zap.String("发送的数据", string(data))) _, err := stream.Write(data) if err != nil { return err } time.Sleep(1000 * time.Millisecond) return nil } func EdgeCmdHandle(data []byte) (frame.Tag, []byte) { global.Logger.Info("任务日志", zap.String("接收数据", string(data))) cmd := new(InstructionReq) err := json.Unmarshal(data, cmd) if err != nil { return 0x0B, []byte(err.Error()) } switch cmd.Command { case DataLabelRequest: hi, _ := host.Info() payload := cmd.Payload.(map[string]interface{}) if payload["nodeGuid"] == hi.HostID { //currTime := time.Now().Unix() var ( req proto.ListRequest res *proto.BaseResponse ) if v, ok := payload["path"]; ok { req.Path = v.(string) } resPayload := new(DataLabelRes) resPayload.NodeGuid = hi.HostID switch strings.ToLower(payload["cmd"].(string)) { case "get-list": res, err = GetList(req) if err != nil { goto ErrorPoint } case "get-file-info": res, err = GetFileInfo(req) if err != nil { goto ErrorPoint } } ErrorPoint: if err != nil { res.Code = http.StatusInternalServerError res.Status = http.StatusText(http.StatusInternalServerError) res.Err = err res.Message = "失败" } resPayload.Body = res rsp := new(InstructionReq) rsp.Command = DataLabelResponse rsp.Payload = resPayload str, _ := json.Marshal(rsp) cli := GetMqClient("edge-cmd-response", 1) if cli != nil { _ = GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), str, global.Logger) } } } return 0x0B, nil } func SendLabelData(list []proto.FileContent, logger *logging.Logger) { cli := GetMqClient("edge-cmd-response", 1) if cli != nil { for _, v := range list { payload := InstructionReq{ Command: DataLabelResponse, Payload: v, } s, _ := json.Marshal(payload) err := GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s, logger) if err != nil { logger.With( zap.String("文件名称", v.Name), zap.String("存储路径", v.Path), ).Error("文件传输", zap.Error(err)) } } } } func SubmitFileData(list []proto.FileTransferInfo, logger *logging.Logger) { cli := GetMqClient("dataset-request", 1) if cli != nil { for _, v := range list { payload := InstructionReq{ Command: DatasetRequest, Payload: v, } s, _ := json.Marshal(payload) err := GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s, logger) if err != nil { logger.With( zap.String("文件名称", v.FileName), zap.String("存储路径", v.FilePath), ).Error("文件传输", zap.Error(err)) } } } }