data2minio/mq/index.go

124 lines
2.7 KiB
Go
Raw Normal View History

2023-04-01 16:43:13 +08:00
package mq
import (
"encoding/json"
"fmt"
"os"
"time"
"git.hpds.cc/pavement/hpds_node"
"go.uber.org/zap"
"data_minio/config"
"data_minio/global"
"data_minio/model"
)
var MqList []HpdsMqNode
type HpdsMqNode struct {
MqType uint
Topic string
Node config.HpdsNode
EndPoint interface{}
}
func must(err error) {
if err != nil {
global.Logger.With(zap.String("数据落地服务", "错误信息")).Error("启动错误", zap.Error(err))
os.Exit(1)
}
}
func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode) (mqList []HpdsMqNode, err error) {
mqList = make([]HpdsMqNode, 0)
for _, v := range funcs {
switch v.MqType {
case 2:
sf := hpds_node.NewStreamFunction(
v.Name,
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
hpds_node.WithObserveDataTags(v.DataTag),
hpds_node.WithCredential(node.Token),
)
_ = sf.SetHandler(handler)
err = sf.Connect()
nodeInfo := HpdsMqNode{
MqType: 2,
Topic: v.Name,
Node: node,
EndPoint: sf,
}
must(err)
mqList = append(mqList, nodeInfo)
default:
ap := hpds_node.NewAccessPoint(
v.Name,
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
hpds_node.WithCredential(node.Token),
)
err = ap.Connect()
nodeInfo := HpdsMqNode{
MqType: 1,
Topic: v.Name,
Node: node,
EndPoint: ap,
}
must(err)
ap.SetDataTag(v.DataTag)
mqList = append(mqList, nodeInfo)
}
}
return mqList, err
}
func GetMqClient(topic string, mqType uint) *HpdsMqNode {
for _, v := range MqList {
if v.Topic == topic && v.MqType == mqType {
return &v
}
}
return nil
}
func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte) error {
_, err := stream.Write(data)
if err != nil {
return err
}
time.Sleep(1000 * time.Millisecond)
return nil
}
func handler(data []byte) (byte, []byte) {
req := new(InstructionReq)
err := json.Unmarshal(data, req)
if err != nil {
global.Must(err)
} else {
switch req.Command {
case DatasetRequest:
payload := req.Payload.(map[string]interface{})
dt, err := model.GetDatasetById(int64(payload["datasetId"].(float64)))
global.Must(err)
fileContent, _ := UnCompress([]byte(payload["file"].(string)))
fm := new(model.FileManager)
fm.FileName = payload["fileName"].(string)
//fm.AccessUrl = payload["filePath"].(string)
fm.Scene = fmt.Sprintf("%d", dt.CategoryId)
fm.DatasetId = dt.DatasetId
fm.FileSize = int64(payload["fileSize"].(float64))
fm.FileMd5 = payload["fileMd5"].(string)
fm.Creator = 0
fm.CreateAt = time.Now().Unix()
fm.UpdateAt = time.Now().Unix()
_, err = global.DB.Insert(fm)
global.Must(err)
}
}
return 0x12, nil
}