hpds_control_center/mq/index.go

234 lines
6.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package mq
import (
"encoding/json"
"fmt"
"git.hpds.cc/Component/logging"
"go.uber.org/zap"
"hpds_control_center/config"
"hpds_control_center/internal/balance"
"hpds_control_center/model"
"os"
"time"
"git.hpds.cc/pavement/hpds_node"
)
var MqList []HpdsMqNode
type HpdsMqNode struct {
MqType uint
Topic string
Node config.HpdsNode
EndPoint interface{}
Logger *logging.Logger
}
func must(logger *logging.Logger, err error) {
if err != nil {
if logger != nil {
logger.With(zap.String("web节点", "错误信息")).Error("启动错误", zap.Error(err))
} else {
_, _ = fmt.Fprint(os.Stderr, err)
}
os.Exit(1)
}
}
func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *logging.Logger) (mqList []HpdsMqNode, err error) {
mqList = make([]HpdsMqNode, 0)
for _, v := range funcs {
switch v.MqType {
case 2:
sf := hpds_node.NewStreamFunction(
v.Name,
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
hpds_node.WithObserveDataTags(v.DataTag),
hpds_node.WithCredential(node.Token),
)
err = sf.Connect()
must(logger, err)
nodeInfo := HpdsMqNode{
MqType: 2,
Topic: v.Name,
Node: node,
EndPoint: sf,
}
switch v.Name {
case "task-request":
_ = sf.SetHandler(TaskRequestHandler)
default:
}
mqList = append(mqList, nodeInfo)
default:
ap := hpds_node.NewAccessPoint(
v.Name,
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
hpds_node.WithCredential(node.Token),
)
err = ap.Connect()
nodeInfo := HpdsMqNode{
MqType: 1,
Topic: v.Name,
Node: node,
EndPoint: ap,
}
must(logger, err)
ap.SetDataTag(v.DataTag)
mqList = append(mqList, nodeInfo)
}
}
return mqList, err
}
func GetMqClient(topic string, mqType uint) *HpdsMqNode {
for _, v := range MqList {
if v.Topic == topic && v.MqType == mqType {
return &v
}
}
return nil
}
func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte) error {
_, err := stream.Write(data)
if err != nil {
return err
}
time.Sleep(1000 * time.Millisecond)
return nil
}
func TaskRequestHandler(data []byte) (byte, []byte) {
fmt.Println("接收数据", string(data))
cmd := new(InstructionReq)
err := json.Unmarshal(data, cmd)
if err != nil {
return 0x0B, []byte(err.Error())
}
switch cmd.Command {
case TaskAdd:
payload := cmd.Payload.(map[string]interface{})
if payload["nodeId"].(float64) == 0 {
//根据业务属性进行分配节点
m := model.GetModelById(int64(payload["modelId"].(float64)))
var nodeList []model.Node
//todo 需要增加模型下发记录
if m.IsLightWeight {
nodeList = model.GetLightWeight(m.ModelId)
} else {
nodeList = model.GetAllNode(m.ModelId)
}
if nodeList != nil {
if len(nodeList) > 1 {
//这里采用加权算法权重采用CPU占用+mem使用+任务执行状态
list := model.GetNodeState(nodeList)
lb := balance.LoadBalanceFactory(balance.LbWeightRoundRobin)
for _, v := range list {
_ = lb.Add(v)
}
nodeId, _ := lb.Get(0)
payload["nodeId"] = nodeId
cmd := &InstructionReq{
Command: TaskExecute,
Payload: payload,
}
pData, _ := json.Marshal(cmd)
cli := GetMqClient("task-execute", 1)
if cli != nil {
_ = GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), pData)
}
model.UpdateTaskExecuteNode(int64(payload["taskId"].(float64)), nodeId)
} else {
payload["nodeId"] = nodeList[0].NodeId
issue := new(model.IssueModel)
h, _ := model.DB.Where("model_id=? and node_id =?", int64(payload["modelId"].(float64)), nodeList[0].NodeId).Get(issue)
if !h {
}
payload["issueResult"] = issue.IssueResult
cmd := &InstructionReq{
Command: TaskExecute,
Payload: payload,
}
pData, _ := json.Marshal(cmd)
cli := GetMqClient("task-execute", 1)
if cli != nil {
_ = GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), pData)
}
model.UpdateTaskExecuteNode(int64(payload["taskId"].(float64)), nodeList[0].NodeId)
}
} else {
}
} else {
cmd := &InstructionReq{
Command: TaskExecute,
Payload: payload,
}
pData, _ := json.Marshal(cmd)
cli := GetMqClient("task-execute", 1)
if cli != nil {
_ = GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), pData)
}
}
case ModelIssue:
payload := cmd.Payload.(map[string]interface{})
cmd := &InstructionReq{
Command: ModelIssueRepeater,
Payload: payload,
}
pData, _ := json.Marshal(cmd)
cli := GetMqClient("task-execute", 1)
if cli != nil {
_ = GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), pData)
}
case ModelIssueResponse:
payload := cmd.Payload.(map[string]interface{})
//查找下发记录
item := new(model.IssueModel)
h, _ := model.DB.Where("model_id = ? and node_id = ?", payload["modelId"].(int64), payload["nodeId"].(int64)).Get(item)
pData, _ := json.Marshal(payload)
if h {
item.Status = 1
item.IssueResult = string(pData)
item.UpdateAt = time.Now().Unix()
_, _ = model.DB.ID(item.Id).AllCols().Update(item)
} else {
item.ModelId = payload["modelId"].(int64)
item.NodeId = payload["nodeId"].(int64)
item.Status = 1
item.IssueResult = string(pData)
item.CreateAt = time.Now().Unix()
item.UpdateAt = time.Now().Unix()
_, _ = model.DB.Insert(item)
}
case TaskResponse:
payload := cmd.Payload.(map[string]interface{})
item := new(model.TaskResult)
item.TaskId = int64(payload["taskId"].(float64))
item.TaskCode = payload["taskCode"].(string)
item.NodeId = int64(payload["nodeId"].(float64))
item.ModelId = int64(payload["modelId"].(float64))
item.StartTime = int64(payload["startTime"].(float64))
item.FinishTime = int64(payload["finishTime"].(float64))
item.SubDataset = payload["subDataset"].(string)
item.DatasetId = int64(payload["datasetArr"].(float64))
item.SrcPath = payload["srcPath"].(string)
item.Result = payload["body"].(string)
_, _ = model.DB.Insert(item)
//fn := payload["fileName"].(string)
//dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(payload["file"].(string)))
default:
}
return byte(cmd.Command), nil
}