hpds_jkw_web/mq/index.go

139 lines
3.1 KiB
Go

package mq
import (
"encoding/json"
"fmt"
"git.hpds.cc/Component/logging"
"git.hpds.cc/Component/network/frame"
"go.uber.org/zap"
"hpds-iot-web/config"
"os"
"time"
"git.hpds.cc/pavement/hpds_node"
)
var MqList []HpdsMqNode
type HpdsMqNode struct {
MqType uint
Topic string
Node config.HpdsNode
EndPoint interface{}
}
var ifChannelsMapInit = false
var ChannelsMap = map[string]chan string{}
func initChannelsMap() {
ChannelsMap = make(map[string]chan string)
}
func AddChannel(userId string) {
if !ifChannelsMapInit {
initChannelsMap()
ifChannelsMapInit = true
}
var newChannel = make(chan string)
ChannelsMap[userId] = newChannel
logging.L().Info("Build SSE connection for user = " + userId)
}
func must(logger *logging.Logger, err error) {
if err != nil {
if logger != nil {
logger.With(zap.String("web节点", "错误信息")).Error("启动错误", zap.Error(err))
} else {
_, _ = fmt.Fprint(os.Stderr, err)
}
os.Exit(1)
}
}
func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *logging.Logger) (mqList []HpdsMqNode, err error) {
mqList = make([]HpdsMqNode, 0)
for _, v := range funcs {
switch v.MqType {
case 2:
sf := hpds_node.NewStreamFunction(
v.Name,
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
hpds_node.WithObserveDataTags(frame.Tag(v.DataTag)),
hpds_node.WithCredential(node.Token),
)
err = sf.Connect()
nodeInfo := HpdsMqNode{
MqType: 2,
Topic: v.Name,
Node: node,
EndPoint: sf,
}
must(logger, err)
switch v.Name {
case "task-log":
_ = sf.SetHandler(TaskLogNotificationHandler)
default:
}
mqList = append(mqList, nodeInfo)
default:
ap := hpds_node.NewAccessPoint(
v.Name,
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
hpds_node.WithCredential(node.Token),
)
err = ap.Connect()
nodeInfo := HpdsMqNode{
MqType: 1,
Topic: v.Name,
Node: node,
EndPoint: ap,
}
must(logger, err)
ap.SetDataTag(frame.Tag(v.DataTag))
mqList = append(mqList, nodeInfo)
}
}
return mqList, err
}
func GetMqClient(topic string, mqType uint) *HpdsMqNode {
for _, v := range MqList {
if v.Topic == topic && v.MqType == mqType {
return &v
}
}
return nil
}
func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte, logger *logging.Logger) error {
logger.With(zap.String("web节点", "发送消息")).Info("数据", zap.String("发送的数据", string(data)))
_, err := stream.Write(data)
if err != nil {
return err
}
time.Sleep(1000 * time.Millisecond)
return nil
}
func TaskLogNotificationHandler(data []byte) (frame.Tag, []byte) {
logging.L().Info("任务日志", zap.String("接收数据", string(data)))
cmd := new(InstructionReq)
err := json.Unmarshal(data, cmd)
if err != nil {
return 0x0B, []byte(err.Error())
}
payload := cmd.Payload.(map[string]interface{})
str, _ := json.Marshal(payload)
for key := range ChannelsMap {
//if strings.Contains(key, userEmail) {
// channel := channelsMap[key]
// channel <- string(msgBytes)
//}
channel := ChannelsMap[key]
channel <- string(str)
}
return 0x0B, nil
}