package mq import ( "bytes" "encoding/base64" "encoding/json" "fmt" "git.hpds.cc/pavement/hpds_node" "golang.org/x/image/bmp" "golang.org/x/image/tiff" "image" "image/jpeg" "image/png" "strings" "taskExecute/pkg/utils" "taskExecute/proto" "time" ) func CreateWorkflowQueue(wf *Workflow) *WorkflowQueue { nodeId := "" qList := NewQueue(len(wf.Nodes)) for i := 0; i < len(wf.Nodes); i++ { node := GetNextNode(wf, nodeId) _ = qList.Push(node.Id) nodeId = node.Id } return qList //switch node.Type { //case "start-node": // node = GetNextNode(wf, node.Id) //case "image-node": // //处理图像后 // img, _ := ProcessWorkflowNode(node, payload, fc) // payload["resImage"] = img // nextNode := GetNextNode(wf, node.Id) // //case "fetch-node": //case "model-node": //case "mq-node": //default: // //} } func ProcessWorkflow(payload map[string]interface{}, fc proto.FileCapture, wf *Workflow) { qList := CreateWorkflowQueue(wf) var ( img image.Image imgList []image.Image imgType string = "jpeg" err error resultData string ) startTime := time.Now().Unix() for i := 0; i < len(wf.Nodes); i++ { nodeId, _ := qList.Pop() node := GetWorkflowNodeById(wf, nodeId) if node == nil { continue } switch node.Type { case "start-node": fn, _ := base64.StdEncoding.DecodeString(fc.File) buff := bytes.NewBuffer(fn) img, imgType, err = image.Decode(buff) if err != nil { goto ReturnPoint } case "image-node": //处理图像后 fn := utils.ImageToBuff(img, imgType) if node.Properties.NodeData.Method == "zoom" { img, imgType, err = utils.Clip(fn, node.Properties.NodeData.Width, node.Properties.NodeData.Height, node.Properties.NodeData.EqualProportion) if err != nil { goto ReturnPoint } } else if node.Properties.NodeData.Method == "crop" { imgList = utils.SplitImage(fn, node.Properties.NodeData.Width, node.Properties.NodeData.Height) } else if node.Properties.NodeData.Method == "gray" { img, err = utils.Gray(fn) if err != nil { goto ReturnPoint } } else if node.Properties.NodeData.Method == "rotate" { switch node.Properties.NodeData.RotationAngle { case 90: img = utils.Rotate90(fn) case 180: img = utils.Rotate180(fn) case 270: img = utils.Rotate270(fn) default: img, _, _ = image.Decode(fn) } } else if node.Properties.NodeData.Method == "formatConversion" { //img = utils.BuffToImage(fn) switch node.Properties.NodeData.Format { case "bmp": imgType = "bmp" _ = bmp.Encode(fn, img) case "png": imgType = "png" _ = png.Encode(fn, img) case "tiff": imgType = "tiff" _ = tiff.Encode(fn, img, &tiff.Options{Predictor: false}) default: imgType = "jpeg" _ = jpeg.Encode(fn, img, &jpeg.Options{ Quality: 100, }) } img, _, _ = image.Decode(fn) } case "fetch-node": header := make(map[string]string) header["ContentType"] = node.Properties.NodeData.ContentType param := make(map[string]string) isBody := false if len(imgList) > 0 { result := make([]string, len(imgList)) for idx, subImg := range imgList { for _, val := range node.Properties.NodeData.DynamicValidateForm.Fields { switch val.Type { case "fileName": param[val.Key] = fc.FileName case "imgBase64": param[val.Key] = utils.ImageToBase64(subImg, imgType) default: isBody = true } } if !isBody { data, e := utils.HttpDo(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url), strings.ToUpper(node.Properties.NodeData.MethodType), param, header) if e != nil { err = e goto ReturnPoint } result[idx] = string(data) //resultData = string(data) } else { buff := utils.ImageToBuff(subImg, imgType) files := make([]utils.UploadFile, 1) files[0] = utils.UploadFile{ Name: "image", Filepath: "./output.jpg", File: buff, } data := utils.PostFile(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url), param, "multipart/form-data", files, header) //resultData = data result[idx] = string(data) } } resultStr, _ := json.Marshal(result) resultData = string(resultStr) } else { for _, val := range node.Properties.NodeData.DynamicValidateForm.Fields { switch val.Type { case "fileName": param[val.Key] = fc.FileName case "imgBase64": param[val.Key] = utils.ImageToBase64(img, imgType) default: isBody = true } } if !isBody { data, e := utils.HttpDo(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url), strings.ToUpper(node.Properties.NodeData.MethodType), param, header) if e != nil { err = e goto ReturnPoint } resultData = string(data) } else { buff := utils.ImageToBuff(img, imgType) files := make([]utils.UploadFile, 1) files[0] = utils.UploadFile{ Name: "image", Filepath: "./output.jpg", File: buff, } data := utils.PostFile(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url), param, "multipart/form-data", files, header) resultData = data } } case "model-node": continue case "mq-node": continue default: continue } } ReturnPoint: item := new(TaskResponseBody) item.TaskId = int64(payload["taskId"].(float64)) item.TaskCode = payload["taskCode"].(string) item.NodeId = int64(payload["nodeId"].(float64)) item.ModelId = int64(payload["modelId"].(float64)) item.DatasetArr = payload["datasetArr"].(string) item.FileId = fc.FileId item.SrcPath = fc.FileName item.StartTime = startTime item.FinishTime = time.Now().Unix() if err != nil { item.Code = 500 item.Msg = fmt.Sprintf("执行任务:%s", err.Error()) } else { item.Code = 0 item.Msg = "执行成功" item.Body = resultData } cli := GetMqClient("task-response", 1) ap := cli.EndPoint.(hpds_node.AccessPoint) res := new(InstructionReq) res.Command = TaskResponse res.Payload = item pData, _ := json.Marshal(res) _ = GenerateAndSendData(ap, pData) } func GetNextNode(wf *Workflow, currNodeId string) (node *WorkflowNode) { var nextId string if len(currNodeId) > 0 { //下一节点 for _, v := range wf.Edges { if v.SourceNodeId == currNodeId { nextId = v.TargetNodeId } } } else { //开始节点 for _, v := range wf.Nodes { if v.Type == "start-node" { return &v } } } if len(nextId) > 0 { for _, v := range wf.Nodes { if v.Id == nextId { return &v } } } return nil } func GetWorkflowNodeById(wf *Workflow, id string) (node *WorkflowNode) { for _, v := range wf.Nodes { if v.Id == id { return &v } } return nil }