taskExecute/mq/service.go

262 lines
6.8 KiB
Go
Raw Permalink Normal View History

2023-03-23 14:35:24 +08:00
package mq
import (
"bytes"
2023-03-23 14:35:24 +08:00
"encoding/base64"
"encoding/json"
"fmt"
"git.hpds.cc/pavement/hpds_node"
"golang.org/x/image/bmp"
"golang.org/x/image/tiff"
2023-03-23 14:35:24 +08:00
"image"
"image/jpeg"
"image/png"
2023-03-23 14:35:24 +08:00
"strings"
"taskExecute/pkg/utils"
"taskExecute/proto"
"time"
)
func CreateWorkflowQueue(wf *Workflow) *WorkflowQueue {
nodeId := ""
qList := NewQueue(len(wf.Nodes))
for i := 0; i < len(wf.Nodes); i++ {
node := GetNextNode(wf, nodeId)
_ = qList.Push(node.Id)
nodeId = node.Id
}
return qList
//switch node.Type {
//case "start-node":
// node = GetNextNode(wf, node.Id)
//case "image-node":
// //处理图像后
// img, _ := ProcessWorkflowNode(node, payload, fc)
// payload["resImage"] = img
// nextNode := GetNextNode(wf, node.Id)
//
//case "fetch-node":
//case "model-node":
//case "mq-node":
//default:
//
//}
}
func ProcessWorkflow(payload map[string]interface{}, fc proto.FileCapture, wf *Workflow) {
qList := CreateWorkflowQueue(wf)
var (
img image.Image
imgList []image.Image
2023-03-23 14:35:24 +08:00
imgType string = "jpeg"
err error
resultData string
)
startTime := time.Now().Unix()
for i := 0; i < len(wf.Nodes); i++ {
nodeId, _ := qList.Pop()
node := GetWorkflowNodeById(wf, nodeId)
if node == nil {
continue
}
2023-03-23 14:35:24 +08:00
switch node.Type {
case "start-node":
fn, _ := base64.StdEncoding.DecodeString(fc.File)
buff := bytes.NewBuffer(fn)
2023-05-18 11:02:34 +08:00
img, imgType, err = image.Decode(buff)
if err != nil {
goto ReturnPoint
}
2023-03-23 14:35:24 +08:00
case "image-node":
//处理图像后
fn := utils.ImageToBuff(img, imgType)
if node.Properties.NodeData.Method == "zoom" {
2023-03-23 14:35:24 +08:00
img, imgType, err = utils.Clip(fn, node.Properties.NodeData.Width, node.Properties.NodeData.Height, node.Properties.NodeData.EqualProportion)
if err != nil {
goto ReturnPoint
}
} else if node.Properties.NodeData.Method == "crop" {
imgList = utils.SplitImage(fn, node.Properties.NodeData.Width, node.Properties.NodeData.Height)
2023-03-23 14:35:24 +08:00
} else if node.Properties.NodeData.Method == "gray" {
img, err = utils.Gray(fn)
if err != nil {
goto ReturnPoint
}
} else if node.Properties.NodeData.Method == "rotate" {
switch node.Properties.NodeData.RotationAngle {
case 90:
img = utils.Rotate90(fn)
case 180:
img = utils.Rotate180(fn)
case 270:
img = utils.Rotate270(fn)
default:
img, _, _ = image.Decode(fn)
2023-03-23 14:35:24 +08:00
}
} else if node.Properties.NodeData.Method == "formatConversion" {
//img = utils.BuffToImage(fn)
2023-03-23 14:35:24 +08:00
switch node.Properties.NodeData.Format {
case "bmp":
imgType = "bmp"
_ = bmp.Encode(fn, img)
2023-03-23 14:35:24 +08:00
case "png":
imgType = "png"
_ = png.Encode(fn, img)
2023-03-23 14:35:24 +08:00
case "tiff":
imgType = "tiff"
_ = tiff.Encode(fn, img, &tiff.Options{Predictor: false})
2023-03-23 14:35:24 +08:00
default:
imgType = "jpeg"
_ = jpeg.Encode(fn, img, &jpeg.Options{
Quality: 100,
})
2023-03-23 14:35:24 +08:00
}
img, _, _ = image.Decode(fn)
2023-03-23 14:35:24 +08:00
}
case "fetch-node":
header := make(map[string]string)
header["ContentType"] = node.Properties.NodeData.ContentType
param := make(map[string]string)
isBody := false
if len(imgList) > 0 {
result := make([]string, len(imgList))
for idx, subImg := range imgList {
for _, val := range node.Properties.NodeData.DynamicValidateForm.Fields {
switch val.Type {
case "fileName":
param[val.Key] = fc.FileName
case "imgBase64":
param[val.Key] = utils.ImageToBase64(subImg, imgType)
default:
isBody = true
}
}
if !isBody {
data, e := utils.HttpDo(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url),
strings.ToUpper(node.Properties.NodeData.MethodType), param, header)
if e != nil {
err = e
goto ReturnPoint
}
result[idx] = string(data)
//resultData = string(data)
} else {
buff := utils.ImageToBuff(subImg, imgType)
files := make([]utils.UploadFile, 1)
files[0] = utils.UploadFile{
Name: "image",
Filepath: "./output.jpg",
File: buff,
}
data := utils.PostFile(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url),
param, "multipart/form-data", files, header)
//resultData = data
result[idx] = string(data)
}
2023-03-23 14:35:24 +08:00
}
resultStr, _ := json.Marshal(result)
resultData = string(resultStr)
2023-03-23 14:35:24 +08:00
} else {
for _, val := range node.Properties.NodeData.DynamicValidateForm.Fields {
switch val.Type {
case "fileName":
param[val.Key] = fc.FileName
case "imgBase64":
param[val.Key] = utils.ImageToBase64(img, imgType)
default:
isBody = true
}
}
if !isBody {
data, e := utils.HttpDo(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url),
strings.ToUpper(node.Properties.NodeData.MethodType), param, header)
if e != nil {
err = e
goto ReturnPoint
}
resultData = string(data)
} else {
buff := utils.ImageToBuff(img, imgType)
files := make([]utils.UploadFile, 1)
files[0] = utils.UploadFile{
Name: "image",
Filepath: "./output.jpg",
File: buff,
}
data := utils.PostFile(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url),
param, "multipart/form-data", files, header)
resultData = data
2023-03-23 14:35:24 +08:00
}
}
case "model-node":
continue
case "mq-node":
continue
default:
continue
}
}
ReturnPoint:
item := new(TaskResponseBody)
item.TaskId = int64(payload["taskId"].(float64))
item.TaskCode = payload["taskCode"].(string)
item.NodeId = int64(payload["nodeId"].(float64))
item.ModelId = int64(payload["modelId"].(float64))
item.DatasetArr = payload["datasetArr"].(string)
item.FileId = fc.FileId
2023-03-24 08:50:34 +08:00
item.SrcPath = fc.FileName
2023-03-23 14:35:24 +08:00
item.StartTime = startTime
item.FinishTime = time.Now().Unix()
if err != nil {
item.Code = 500
item.Msg = fmt.Sprintf("执行任务:%s", err.Error())
} else {
item.Code = 0
item.Msg = "执行成功"
item.Body = resultData
}
cli := GetMqClient("task-response", 1)
ap := cli.EndPoint.(hpds_node.AccessPoint)
res := new(InstructionReq)
res.Command = TaskResponse
res.Payload = item
pData, _ := json.Marshal(res)
_ = GenerateAndSendData(ap, pData)
}
func GetNextNode(wf *Workflow, currNodeId string) (node *WorkflowNode) {
var nextId string
if len(currNodeId) > 0 {
//下一节点
for _, v := range wf.Edges {
if v.SourceNodeId == currNodeId {
nextId = v.TargetNodeId
}
}
} else {
//开始节点
for _, v := range wf.Nodes {
if v.Type == "start-node" {
return &v
}
}
}
if len(nextId) > 0 {
for _, v := range wf.Nodes {
if v.Id == nextId {
return &v
}
}
}
return nil
}
func GetWorkflowNodeById(wf *Workflow, id string) (node *WorkflowNode) {
for _, v := range wf.Nodes {
if v.Id == id {
return &v
}
}
return nil
}