1、增加任务处理进度

This commit is contained in:
wangjian 2023-05-18 10:59:35 +08:00
parent 685b46c1e5
commit 7d09ba0286
4 changed files with 132 additions and 29 deletions

View File

@ -20,7 +20,7 @@ consul:
timeout: 5
deregister: 1
db:
conn: root:123456@tcp(127.0.0.1:3306)/hpds_jky?charset=utf8mb4
conn: root:OIxv7QptYBO3@tcp(192.168.53.10:3306)/diagnosis?charset=utf8mb4
drive_name: mysql
cache:
host: 127.0.0.1
@ -29,13 +29,13 @@ cache:
pool_size: 10
minio:
protocol: http
endpoint: 127.0.0.1:9000
endpoint: 192.168.53.10:9000
accessKeyId: root
secretAccessKey: OIxv7QptYBO3
bucket: jky-data
bucket: diagnosis
node:
host: 127.0.0.1
port: 27188
host: 192.168.53.10
port: 9188
token: 06d36c6f5705507dae778fdce90d0767
functions:
- name: task-request

View File

@ -44,3 +44,12 @@ func GetAllNode(modelId int64) []Node {
}
return list
}
func GetNodeById(nodeId int64) *Node {
item := new(Node)
h, err := DB.ID(nodeId).Get(item)
if err != nil || !h {
return nil
}
return item
}

View File

@ -62,23 +62,29 @@ func UpdateTaskProgress(taskProgress *proto.TaskLogProgress) {
}
}
func UpdateTaskProgressByLog(res *TaskResult) float64 {
func UpdateTaskProgressByLog(res *TaskResult, isFailing bool) float64 {
ret := -1.0
item := new(Task)
h, err := DB.ID(res.TaskId).Get(item)
if err != nil || !h {
return -1
return ret
}
if isFailing {
item.FailingCount += 1
} else {
item.CompletedCount += 1
}
item.UnfinishedCount -= 1
if item.CompletedCount <= item.TotalCount {
if item.CompletedCount+item.FailingCount >= item.TotalCount {
item.FinishTime = time.Now().Unix()
item.UnfinishedCount = 0
item.Status = 3
ret = 1.0
}
item.UpdateAt = time.Now().Unix()
_, _ = DB.ID(res.TaskId).Cols("completed_count", "total_count", "unfinished_count", "update_at").Update(item)
_, _ = DB.ID(res.TaskId).Cols("completed_count", "failing_count", "total_count", "unfinished_count", "update_at", "finish_time", "status").Update(item)
if item.TotalCount > 0 {
return float64(item.UnfinishedCount) / float64(item.TotalCount)
return 1 - float64(item.CompletedCount)/float64(item.TotalCount)
}
return -1
return ret
}

View File

@ -208,6 +208,72 @@ func TaskRequestHandler(data []byte) (frame.Tag, []byte) {
} else {
if len(payload["datasetArr"].(string)) > 0 {
GoroutinueChan := make(chan bool, 5)
if payload["nodeId"].(float64) == 0 {
//根据业务属性进行分配节点
m := model.GetModelById(int64(payload["modelId"].(float64)))
var nodeList []model.Node
//todo 需要增加模型下发记录
if m.IsLightWeight {
nodeList = model.GetLightWeight(m.ModelId)
} else {
nodeList = model.GetAllNode(m.ModelId)
}
if nodeList != nil {
if len(nodeList) > 1 {
//这里采用加权算法权重采用CPU占用+mem使用+任务执行状态
list := model.GetNodeState(nodeList)
lb := balance.LoadBalanceFactory(balance.LbWeightRoundRobin)
for _, v := range list {
_ = lb.Add(v)
}
nodeId, _ := lb.Get(0)
if nodeId == nil {
//todo 需要增加未能获取的处理
}
payload["nodeId"] = nodeId.NodeId
payload["nodeGuid"] = nodeId.NodeGuid
cmd := &InstructionReq{
Command: TaskExecute,
Payload: payload,
}
pData, _ := json.Marshal(cmd)
cli := GetMqClient("task-execute", 1)
if cli != nil {
_ = GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), pData)
}
model.UpdateTaskExecuteNode(int64(payload["taskId"].(float64)), nodeId.NodeId)
} else {
payload["nodeId"] = nodeList[0].NodeId
issue := new(model.IssueModel)
h, _ := model.DB.Where("model_id=? and node_id =?", int64(payload["modelId"].(float64)), nodeList[0].NodeId).Get(issue)
if !h {
}
payload["issueResult"] = issue.IssueResult
cmd := &InstructionReq{
Command: TaskExecute,
Payload: payload,
}
pData, _ := json.Marshal(cmd)
cli := GetMqClient("task-execute", 1)
if cli != nil {
_ = GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), pData)
}
model.UpdateTaskExecuteNode(int64(payload["taskId"].(float64)), nodeList[0].NodeId)
}
} else {
}
} else {
node := model.GetNodeById(int64(payload["nodeId"].(float64)))
if node != nil {
payload["nodeGuid"] = node.NodeGuid
}
}
//数据集处理
datasetArr := strings.Split(payload["datasetArr"].(string), ",")
//for _, val := range datasetArr {
// dId, err := strconv.ParseInt(val, 10, 64)
@ -217,8 +283,11 @@ func TaskRequestHandler(data []byte) (frame.Tag, []byte) {
// dt := new(model.Dataset)
// _, _ = model.DB.ID(dId).Get(dt)
fileList := make([]model.FileManager, 0)
_ = model.DB.In("dataset_id", datasetArr).
err = model.DB.In("dataset_id", datasetArr).
Find(&fileList)
if err != nil {
}
item := &TaskItem{
TaskId: int64(payload["taskId"].(float64)),
TotalCount: int64(len(fileList)),
@ -238,38 +307,50 @@ func TaskRequestHandler(data []byte) (frame.Tag, []byte) {
UnfinishedCount: int64(len(fileList)),
}
model.UpdateTaskProgress(taskProgress)
//taskProgressCmd := &InstructionReq{
// Command: TaskLog,
// Payload: taskProgress,
//}
//deliver("task-log", 1, taskProgressCmd)
taskLog := &model.TaskLog{
TaskId: int64(payload["taskId"].(float64)),
NodeId: int64(payload["nodeId"].(float64)),
Content: fmt.Sprintf("[%s] 在节点[%s]上开始执行任务,任务数量共[%d]", time.Now().Format("2006-01-02 15:04:05"), payload["nodeGuid"].(string), taskProgress.TotalCount),
CreateAt: time.Now().Unix(),
UpdateAt: time.Now().Unix(),
}
model.InsertLog(taskLog)
taskProgressCmd := &InstructionReq{
Command: TaskLog,
Payload: taskProgress,
}
deliver("task-log", 1, taskProgressCmd)
//数据集处理
minioCli := minio.NewClient(config.Cfg.Minio.AccessKeyId, config.Cfg.Minio.SecretAccessKey, config.Cfg.Minio.Endpoint, false, logging.L())
for _, v := range fileList {
GoroutinueChan <- true
go func(fa model.FileManager) {
go func(fa model.FileManager, payload map[string]interface{}) {
p := make(map[string]interface{})
for key, val := range payload {
p[key] = val
}
dstPath := strings.Replace(fa.AccessUrl, fmt.Sprintf("%s://%s/", config.Cfg.Minio.Protocol, config.Cfg.Minio.Endpoint), "", 1)
dstPath = strings.Replace(dstPath, config.Cfg.Minio.Bucket, "", 1)
imgByte, _ := minioCli.GetObject(dstPath, config.Cfg.Minio.Bucket)
f := FileCapture{
fc := FileCapture{
FileId: fa.FileId,
FileName: fa.FileName,
File: base64.StdEncoding.EncodeToString(imgByte),
DatasetName: payload["datasetName"].(string),
DatasetName: p["datasetName"].(string),
CaptureTime: fa.CreateAt,
}
payload["single"] = f
p["single"] = fc
taskCode, _ := uuid.NewUUID()
payload["taskCode"] = taskCode.String()
p["taskCode"] = taskCode.String()
cmd := &InstructionReq{
Command: TaskExecute,
Payload: payload,
Payload: p,
}
deliver("task-execute", 1, cmd)
<-GoroutinueChan
}(v)
}(v, payload)
}
//}
}
@ -357,13 +438,20 @@ func TaskResponseHandler(data []byte) (frame.Tag, []byte) {
if _, ok := payload["srcPath"]; ok && payload["srcPath"] != nil {
item.SrcPath = payload["srcPath"].(string)
}
if _, ok := payload["body"]; ok {
item.Result = payload["body"].(string)
}
isFailing := false
if _, ok := payload["code"]; ok && int(payload["code"].(float64)) == 500 {
item.Result = payload["msg"].(string)
isFailing = true
}
_, err = model.DB.Insert(item)
if err != nil {
fmt.Println("接收TaskResponse数据出错", err)
}
//更新运行进度
rat := model.UpdateTaskProgressByLog(item)
rat := model.UpdateTaskProgressByLog(item, isFailing)
var (
ratStr string
)
@ -376,8 +464,8 @@ func TaskResponseHandler(data []byte) (frame.Tag, []byte) {
taskLog.TaskId = item.TaskId
taskLog.NodeId = item.NodeId
if len(item.SrcPath) > 0 {
taskLog.Content = fmt.Sprintf("[%s] %s 图片%s处理完成", time.Unix(item.FinishTime, 0).Format("2006-01-02 15:04:05"),
ratStr, item.SrcPath)
taskLog.Content = fmt.Sprintf("[%s] 图片%s处理完成 %s ", time.Unix(item.FinishTime, 0).Format("2006-01-02 15:04:05"),
item.SrcPath, ratStr)
} else {
taskLog.Content = fmt.Sprintf("[%s] %s", time.Unix(item.FinishTime, 0).Format("2006-01-02 15:04:05"),
ratStr)