From 7d09ba028650f253b64cc6f0eff786a533ee987d Mon Sep 17 00:00:00 2001 From: wangjian Date: Thu, 18 May 2023 10:59:35 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E5=A2=9E=E5=8A=A0=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E5=A4=84=E7=90=86=E8=BF=9B=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.yaml | 10 ++--- model/node.go | 9 ++++ model/task.go | 20 ++++++--- mq/index.go | 122 +++++++++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 132 insertions(+), 29 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 1f16fbc..b10d85d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -20,7 +20,7 @@ consul: timeout: 5 deregister: 1 db: - conn: root:123456@tcp(127.0.0.1:3306)/hpds_jky?charset=utf8mb4 + conn: root:OIxv7QptYBO3@tcp(192.168.53.10:3306)/diagnosis?charset=utf8mb4 drive_name: mysql cache: host: 127.0.0.1 @@ -29,13 +29,13 @@ cache: pool_size: 10 minio: protocol: http - endpoint: 127.0.0.1:9000 + endpoint: 192.168.53.10:9000 accessKeyId: root secretAccessKey: OIxv7QptYBO3 - bucket: jky-data + bucket: diagnosis node: - host: 127.0.0.1 - port: 27188 + host: 192.168.53.10 + port: 9188 token: 06d36c6f5705507dae778fdce90d0767 functions: - name: task-request diff --git a/model/node.go b/model/node.go index bd9ed6e..e440936 100644 --- a/model/node.go +++ b/model/node.go @@ -44,3 +44,12 @@ func GetAllNode(modelId int64) []Node { } return list } + +func GetNodeById(nodeId int64) *Node { + item := new(Node) + h, err := DB.ID(nodeId).Get(item) + if err != nil || !h { + return nil + } + return item +} diff --git a/model/task.go b/model/task.go index 50d7950..134dce7 100644 --- a/model/task.go +++ b/model/task.go @@ -62,23 +62,29 @@ func UpdateTaskProgress(taskProgress *proto.TaskLogProgress) { } } -func UpdateTaskProgressByLog(res *TaskResult) float64 { +func UpdateTaskProgressByLog(res *TaskResult, isFailing bool) float64 { + ret := -1.0 item := new(Task) h, err := DB.ID(res.TaskId).Get(item) if err != nil || !h { - return -1 + return ret + } + if isFailing { + item.FailingCount += 1 + } else { + item.CompletedCount += 1 } - item.CompletedCount += 1 item.UnfinishedCount -= 1 - if item.CompletedCount <= item.TotalCount { + if item.CompletedCount+item.FailingCount >= item.TotalCount { item.FinishTime = time.Now().Unix() item.UnfinishedCount = 0 item.Status = 3 + ret = 1.0 } item.UpdateAt = time.Now().Unix() - _, _ = DB.ID(res.TaskId).Cols("completed_count", "total_count", "unfinished_count", "update_at").Update(item) + _, _ = DB.ID(res.TaskId).Cols("completed_count", "failing_count", "total_count", "unfinished_count", "update_at", "finish_time", "status").Update(item) if item.TotalCount > 0 { - return float64(item.UnfinishedCount) / float64(item.TotalCount) + return 1 - float64(item.CompletedCount)/float64(item.TotalCount) } - return -1 + return ret } diff --git a/mq/index.go b/mq/index.go index ce05661..bfc1bac 100644 --- a/mq/index.go +++ b/mq/index.go @@ -208,6 +208,72 @@ func TaskRequestHandler(data []byte) (frame.Tag, []byte) { } else { if len(payload["datasetArr"].(string)) > 0 { GoroutinueChan := make(chan bool, 5) + if payload["nodeId"].(float64) == 0 { + //根据业务属性进行分配节点 + m := model.GetModelById(int64(payload["modelId"].(float64))) + var nodeList []model.Node + //todo 需要增加模型下发记录 + if m.IsLightWeight { + nodeList = model.GetLightWeight(m.ModelId) + } else { + nodeList = model.GetAllNode(m.ModelId) + } + if nodeList != nil { + if len(nodeList) > 1 { + //这里采用加权算法,权重采用CPU占用+mem使用+任务执行状态 + list := model.GetNodeState(nodeList) + lb := balance.LoadBalanceFactory(balance.LbWeightRoundRobin) + for _, v := range list { + _ = lb.Add(v) + } + nodeId, _ := lb.Get(0) + if nodeId == nil { + //todo 需要增加未能获取的处理 + } + payload["nodeId"] = nodeId.NodeId + payload["nodeGuid"] = nodeId.NodeGuid + + cmd := &InstructionReq{ + Command: TaskExecute, + Payload: payload, + } + + pData, _ := json.Marshal(cmd) + cli := GetMqClient("task-execute", 1) + if cli != nil { + _ = GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), pData) + } + model.UpdateTaskExecuteNode(int64(payload["taskId"].(float64)), nodeId.NodeId) + } else { + payload["nodeId"] = nodeList[0].NodeId + issue := new(model.IssueModel) + h, _ := model.DB.Where("model_id=? and node_id =?", int64(payload["modelId"].(float64)), nodeList[0].NodeId).Get(issue) + if !h { + + } + payload["issueResult"] = issue.IssueResult + cmd := &InstructionReq{ + Command: TaskExecute, + Payload: payload, + } + pData, _ := json.Marshal(cmd) + cli := GetMqClient("task-execute", 1) + if cli != nil { + _ = GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), pData) + } + model.UpdateTaskExecuteNode(int64(payload["taskId"].(float64)), nodeList[0].NodeId) + } + } else { + + } + + } else { + node := model.GetNodeById(int64(payload["nodeId"].(float64))) + if node != nil { + payload["nodeGuid"] = node.NodeGuid + } + } + //数据集处理 datasetArr := strings.Split(payload["datasetArr"].(string), ",") //for _, val := range datasetArr { // dId, err := strconv.ParseInt(val, 10, 64) @@ -217,8 +283,11 @@ func TaskRequestHandler(data []byte) (frame.Tag, []byte) { // dt := new(model.Dataset) // _, _ = model.DB.ID(dId).Get(dt) fileList := make([]model.FileManager, 0) - _ = model.DB.In("dataset_id", datasetArr). + err = model.DB.In("dataset_id", datasetArr). Find(&fileList) + if err != nil { + + } item := &TaskItem{ TaskId: int64(payload["taskId"].(float64)), TotalCount: int64(len(fileList)), @@ -238,38 +307,50 @@ func TaskRequestHandler(data []byte) (frame.Tag, []byte) { UnfinishedCount: int64(len(fileList)), } model.UpdateTaskProgress(taskProgress) - //taskProgressCmd := &InstructionReq{ - // Command: TaskLog, - // Payload: taskProgress, - //} - //deliver("task-log", 1, taskProgressCmd) + taskLog := &model.TaskLog{ + TaskId: int64(payload["taskId"].(float64)), + NodeId: int64(payload["nodeId"].(float64)), + Content: fmt.Sprintf("[%s] 在节点[%s]上开始执行任务,任务数量共[%d]", time.Now().Format("2006-01-02 15:04:05"), payload["nodeGuid"].(string), taskProgress.TotalCount), + CreateAt: time.Now().Unix(), + UpdateAt: time.Now().Unix(), + } + model.InsertLog(taskLog) + taskProgressCmd := &InstructionReq{ + Command: TaskLog, + Payload: taskProgress, + } + deliver("task-log", 1, taskProgressCmd) //数据集处理 minioCli := minio.NewClient(config.Cfg.Minio.AccessKeyId, config.Cfg.Minio.SecretAccessKey, config.Cfg.Minio.Endpoint, false, logging.L()) for _, v := range fileList { GoroutinueChan <- true - go func(fa model.FileManager) { + go func(fa model.FileManager, payload map[string]interface{}) { + p := make(map[string]interface{}) + for key, val := range payload { + p[key] = val + } dstPath := strings.Replace(fa.AccessUrl, fmt.Sprintf("%s://%s/", config.Cfg.Minio.Protocol, config.Cfg.Minio.Endpoint), "", 1) dstPath = strings.Replace(dstPath, config.Cfg.Minio.Bucket, "", 1) imgByte, _ := minioCli.GetObject(dstPath, config.Cfg.Minio.Bucket) - f := FileCapture{ + fc := FileCapture{ FileId: fa.FileId, FileName: fa.FileName, File: base64.StdEncoding.EncodeToString(imgByte), - DatasetName: payload["datasetName"].(string), + DatasetName: p["datasetName"].(string), CaptureTime: fa.CreateAt, } - payload["single"] = f + p["single"] = fc taskCode, _ := uuid.NewUUID() - payload["taskCode"] = taskCode.String() + p["taskCode"] = taskCode.String() cmd := &InstructionReq{ Command: TaskExecute, - Payload: payload, + Payload: p, } deliver("task-execute", 1, cmd) <-GoroutinueChan - }(v) + }(v, payload) } //} } @@ -357,13 +438,20 @@ func TaskResponseHandler(data []byte) (frame.Tag, []byte) { if _, ok := payload["srcPath"]; ok && payload["srcPath"] != nil { item.SrcPath = payload["srcPath"].(string) } - item.Result = payload["body"].(string) + if _, ok := payload["body"]; ok { + item.Result = payload["body"].(string) + } + isFailing := false + if _, ok := payload["code"]; ok && int(payload["code"].(float64)) == 500 { + item.Result = payload["msg"].(string) + isFailing = true + } _, err = model.DB.Insert(item) if err != nil { fmt.Println("接收TaskResponse数据出错", err) } //更新运行进度 - rat := model.UpdateTaskProgressByLog(item) + rat := model.UpdateTaskProgressByLog(item, isFailing) var ( ratStr string ) @@ -376,8 +464,8 @@ func TaskResponseHandler(data []byte) (frame.Tag, []byte) { taskLog.TaskId = item.TaskId taskLog.NodeId = item.NodeId if len(item.SrcPath) > 0 { - taskLog.Content = fmt.Sprintf("[%s] %s 图片%s处理完成", time.Unix(item.FinishTime, 0).Format("2006-01-02 15:04:05"), - ratStr, item.SrcPath) + taskLog.Content = fmt.Sprintf("[%s] 图片%s处理完成 %s ", time.Unix(item.FinishTime, 0).Format("2006-01-02 15:04:05"), + item.SrcPath, ratStr) } else { taskLog.Content = fmt.Sprintf("[%s] %s", time.Unix(item.FinishTime, 0).Format("2006-01-02 15:04:05"), ratStr)