200 lines
4.6 KiB
Go
200 lines
4.6 KiB
Go
|
package mq
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"go.uber.org/zap"
|
||
|
"hpds_annotation/config"
|
||
|
"hpds_annotation/global"
|
||
|
"hpds_annotation/internal/proto"
|
||
|
"net/http"
|
||
|
"os"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"git.hpds.cc/Component/logging"
|
||
|
"git.hpds.cc/Component/network/frame"
|
||
|
"git.hpds.cc/pavement/hpds_node"
|
||
|
"github.com/shirou/gopsutil/v3/host"
|
||
|
)
|
||
|
|
||
|
var MqList []HpdsMqNode
|
||
|
|
||
|
type HpdsMqNode struct {
|
||
|
MqType uint
|
||
|
Topic string
|
||
|
Node config.HpdsNode
|
||
|
EndPoint interface{}
|
||
|
}
|
||
|
|
||
|
var ifChannelsMapInit = false
|
||
|
|
||
|
var ChannelsMap = map[string]chan string{}
|
||
|
|
||
|
func initChannelsMap() {
|
||
|
ChannelsMap = make(map[string]chan string)
|
||
|
}
|
||
|
func AddChannel(userId string) {
|
||
|
if !ifChannelsMapInit {
|
||
|
initChannelsMap()
|
||
|
ifChannelsMapInit = true
|
||
|
}
|
||
|
var newChannel = make(chan string)
|
||
|
ChannelsMap[userId] = newChannel
|
||
|
logging.L().Info("Build SSE connection for user = " + userId)
|
||
|
}
|
||
|
|
||
|
func must(logger *logging.Logger, err error) {
|
||
|
if err != nil {
|
||
|
if logger != nil {
|
||
|
logger.With(zap.String("web节点", "错误信息")).Error("启动错误", zap.Error(err))
|
||
|
} else {
|
||
|
_, _ = fmt.Fprint(os.Stderr, err)
|
||
|
}
|
||
|
os.Exit(1)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *logging.Logger) (mqList []HpdsMqNode, err error) {
|
||
|
mqList = make([]HpdsMqNode, 0)
|
||
|
for _, v := range funcs {
|
||
|
switch v.MqType {
|
||
|
case 2:
|
||
|
sf := hpds_node.NewStreamFunction(
|
||
|
v.Name,
|
||
|
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
|
||
|
hpds_node.WithObserveDataTags(frame.Tag(v.DataTag)),
|
||
|
hpds_node.WithCredential(node.Token),
|
||
|
)
|
||
|
err = sf.Connect()
|
||
|
nodeInfo := HpdsMqNode{
|
||
|
MqType: 2,
|
||
|
Topic: v.Name,
|
||
|
Node: node,
|
||
|
EndPoint: sf,
|
||
|
}
|
||
|
must(logger, err)
|
||
|
switch v.Name {
|
||
|
case "edge-cmd-request":
|
||
|
_ = sf.SetHandler(EdgeCmdHandle)
|
||
|
default:
|
||
|
|
||
|
}
|
||
|
mqList = append(mqList, nodeInfo)
|
||
|
default:
|
||
|
ap := hpds_node.NewAccessPoint(
|
||
|
v.Name,
|
||
|
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
|
||
|
hpds_node.WithCredential(node.Token),
|
||
|
)
|
||
|
err = ap.Connect()
|
||
|
nodeInfo := HpdsMqNode{
|
||
|
MqType: 1,
|
||
|
Topic: v.Name,
|
||
|
Node: node,
|
||
|
EndPoint: ap,
|
||
|
}
|
||
|
must(logger, err)
|
||
|
ap.SetDataTag(frame.Tag(v.DataTag))
|
||
|
mqList = append(mqList, nodeInfo)
|
||
|
}
|
||
|
|
||
|
}
|
||
|
return mqList, err
|
||
|
}
|
||
|
|
||
|
func GetMqClient(topic string, mqType uint) *HpdsMqNode {
|
||
|
for _, v := range MqList {
|
||
|
if v.Topic == topic && v.MqType == mqType {
|
||
|
return &v
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte, logger *logging.Logger) error {
|
||
|
logger.With(zap.String("web节点", "发送消息")).Info("数据", zap.String("发送的数据", string(data)))
|
||
|
_, err := stream.Write(data)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
time.Sleep(1000 * time.Millisecond)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func EdgeCmdHandle(data []byte) (frame.Tag, []byte) {
|
||
|
global.Logger.Info("任务日志", zap.String("接收数据", string(data)))
|
||
|
cmd := new(InstructionReq)
|
||
|
err := json.Unmarshal(data, cmd)
|
||
|
if err != nil {
|
||
|
return 0x0B, []byte(err.Error())
|
||
|
}
|
||
|
switch cmd.Command {
|
||
|
case DataLabelRequest:
|
||
|
hi, _ := host.Info()
|
||
|
payload := cmd.Payload.(map[string]interface{})
|
||
|
if payload["nodeGuid"] == hi.HostID {
|
||
|
//currTime := time.Now().Unix()
|
||
|
var (
|
||
|
req proto.ListRequest
|
||
|
res *proto.BaseResponse
|
||
|
)
|
||
|
if v, ok := payload["path"]; ok {
|
||
|
req.Path = v.(string)
|
||
|
}
|
||
|
resPayload := new(DataLabelRes)
|
||
|
resPayload.NodeGuid = hi.HostID
|
||
|
|
||
|
switch strings.ToLower(payload["cmd"].(string)) {
|
||
|
case "get-list":
|
||
|
res, err = GetList(req)
|
||
|
if err != nil {
|
||
|
goto ErrorPoint
|
||
|
}
|
||
|
case "get-file-info":
|
||
|
res, err = GetFileInfo(req)
|
||
|
if err != nil {
|
||
|
goto ErrorPoint
|
||
|
}
|
||
|
}
|
||
|
ErrorPoint:
|
||
|
if err != nil {
|
||
|
res.Code = http.StatusInternalServerError
|
||
|
res.Status = http.StatusText(http.StatusInternalServerError)
|
||
|
res.Err = err
|
||
|
res.Message = "失败"
|
||
|
}
|
||
|
resPayload.Body = res
|
||
|
rsp := new(InstructionReq)
|
||
|
rsp.Command = DataLabelResponse
|
||
|
rsp.Payload = resPayload
|
||
|
str, _ := json.Marshal(rsp)
|
||
|
cli := GetMqClient("edge-cmd-response", 1)
|
||
|
if cli != nil {
|
||
|
_ = GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), str, global.Logger)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return 0x0B, nil
|
||
|
}
|
||
|
|
||
|
func SendLabelData(list []proto.FileContent, logger *logging.Logger) {
|
||
|
cli := GetMqClient("edge-cmd-response", 1)
|
||
|
if cli != nil {
|
||
|
for _, v := range list {
|
||
|
payload := InstructionReq{
|
||
|
Command: DataLabelResponse,
|
||
|
Payload: v,
|
||
|
}
|
||
|
s, _ := json.Marshal(payload)
|
||
|
err := GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s, logger)
|
||
|
if err != nil {
|
||
|
logger.With(
|
||
|
zap.String("文件名称", v.Name),
|
||
|
zap.String("存储路径", v.Path),
|
||
|
).Error("文件传输", zap.Error(err))
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|