From 939c4403afdf66ebc53471dbe076217a03878030 Mon Sep 17 00:00:00 2001 From: wangjian Date: Mon, 24 Apr 2023 15:23:36 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E4=BF=AE=E6=94=B9bug;=202=E3=80=81?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9B=BE=E7=89=87=E5=88=86=E5=89=B2=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.yaml | 8 +- go.mod | 29 +++-- mq/handler.go | 339 +++++++++++++++++++++++++++++++++++++++++++++++++++-- mq/index.go | 5 +- mq/instruction.go | 10 ++ mq/service.go | 128 ++++++++++++++------ pkg/utils/http.go | 16 ++- pkg/utils/image.go | 111 +++++++++++++++--- proto/mq.go | 9 ++ 9 files changed, 580 insertions(+), 75 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index e54b29c..9a5f2b5 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -19,14 +19,18 @@ minio: endpoint: 127.0.0.1:9000 accessKeyId: root secretAccessKey: OIxv7QptYBO3 + bucket: jky-data node: host: 127.0.0.1 port: 27188 token: 06d36c6f5705507dae778fdce90d0767 functions: - name: task-response - dataTag: 14 + dataTag: 24 mqType: 1 - name: task-execute dataTag: 16 - mqType: 2 \ No newline at end of file + mqType: 2 + - name: task-execute-log + dataTag: 26 + mqType: 1 \ No newline at end of file diff --git a/go.mod b/go.mod index 185364f..b6cfcb1 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.19 require ( git.hpds.cc/Component/logging v0.0.0-20230106105738-e378e873921b - git.hpds.cc/Component/network v0.0.0-20221012021659-2433c68452d5 - git.hpds.cc/pavement/hpds_node v0.0.0-20230307094826-753c4fe9c877 + git.hpds.cc/Component/network v0.0.0-20230405135741-a4ea724bab76 + git.hpds.cc/pavement/hpds_node v0.0.0-20230405153516-9403c4d01e12 github.com/docker/docker v23.0.1+incompatible github.com/docker/go-connections v0.4.0 github.com/emirpasic/gods v1.18.1 @@ -15,7 +15,7 @@ require ( github.com/shirou/gopsutil/v3 v3.23.2 github.com/spf13/cobra v1.6.1 go.uber.org/zap v1.24.0 - golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 + golang.org/x/image v0.1.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -30,15 +30,13 @@ require ( github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/mock v1.6.0 // indirect + github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect github.com/google/uuid v1.3.0 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.15.15 // indirect github.com/klauspost/cpuid/v2 v2.2.3 // indirect - github.com/lucas-clemente/quic-go v0.29.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect - github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect - github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect github.com/matoous/go-nanoid/v2 v2.0.0 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/minio/sha256-simd v1.0.0 // indirect @@ -46,12 +44,14 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect - github.com/nxadm/tail v1.4.8 // indirect - github.com/onsi/ginkgo v1.16.4 // indirect + github.com/onsi/ginkgo/v2 v2.2.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/quic-go/qtls-go1-19 v0.2.1 // indirect + github.com/quic-go/qtls-go1-20 v0.1.1 // indirect + github.com/quic-go/quic-go v0.33.0 // indirect github.com/rs/xid v1.4.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spf13/pflag v1.0.5 // indirect @@ -61,15 +61,20 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.6.0 // indirect - golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect - golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect + golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect + golang.org/x/mod v0.6.0 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect golang.org/x/time v0.3.0 // indirect - golang.org/x/tools v0.1.12 // indirect + golang.org/x/tools v0.2.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect - gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gotest.tools/v3 v3.4.0 // indirect ) + +// +//replace ( +// git.hpds.cc/Component/network => ../network +// git.hpds.cc/pavement/hpds_node => ../hpds_node +//) diff --git a/mq/handler.go b/mq/handler.go index 831397b..6059d03 100644 --- a/mq/handler.go +++ b/mq/handler.go @@ -2,11 +2,14 @@ package mq import ( "bufio" + "bytes" "context" "encoding/base64" "encoding/json" "fmt" + "git.hpds.cc/Component/network/frame" "git.hpds.cc/pavement/hpds_node" + "github.com/docker/docker/api/types/registry" "github.com/emirpasic/gods/lists/arraylist" "github.com/fsnotify/fsnotify" "github.com/gammazero/workerpool" @@ -15,6 +18,7 @@ import ( "github.com/shirou/gopsutil/v3/host" "go.uber.org/zap" "io" + "mime/multipart" "net/http" "os" "os/exec" @@ -36,8 +40,8 @@ var ( TaskList map[string]docker.ContainerStore ) -func TaskExecuteHandler(data []byte) (byte, []byte) { - fmt.Println("接收数据", string(data)) +func TaskExecuteHandler(data []byte) (frame.Tag, []byte) { + //fmt.Println("接收数据", string(data)) cmd := new(InstructionReq) err := json.Unmarshal(data, cmd) if err != nil { @@ -51,7 +55,7 @@ func TaskExecuteHandler(data []byte) (byte, []byte) { //模型下发 waitWorkerStartFinish(config.WorkPool, cmd.Payload.(map[string]interface{}), ModelIssueRepeaterHandler) } - return byte(cmd.Command), nil + return frame.Tag(cmd.Command), nil } func waitWorkerStartFinish(wp *workerpool.WorkerPool, payload map[string]interface{}, f func(payload map[string]interface{})) { @@ -174,12 +178,14 @@ func ModelIssueRepeaterHandler(payload map[string]interface{}) { func ModelTaskExecuteHandler(payload map[string]interface{}) { hi, _ := host.Info() + currTime := time.Now().Unix() + //config.Logger.With(zap.Any("收到数据", payload)) if payload["nodeGuid"] == hi.HostID { - if len(payload["subDataset"].(string)) > 0 { + if len(payload["subDataset"].(string)) > 0 { //如果是订阅数据 sf := hpds_node.NewStreamFunction( payload["subDataset"].(string), hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", config.Cfg.Node.Host, config.Cfg.Node.Port)), - hpds_node.WithObserveDataTags(payload["subDataTag"].(byte)), + hpds_node.WithObserveDataTags(frame.Tag(payload["subDataTag"].(byte))), hpds_node.WithCredential(config.Cfg.Node.Token), ) err := sf.Connect() @@ -190,7 +196,7 @@ func ModelTaskExecuteHandler(payload map[string]interface{}) { Node: config.Cfg.Node, EndPoint: sf, } - _ = sf.SetHandler(func(data []byte) (byte, []byte) { + _ = sf.SetHandler(func(data []byte) (frame.Tag, []byte) { //查询docker是否已经开启 issue := new(docker.ContainerStore) @@ -344,12 +350,331 @@ func ModelTaskExecuteHandler(payload map[string]interface{}) { } } } - return payload["subDataTag"].(byte), nil + return frame.Tag(payload["subDataTag"].(byte)), nil }) MqList = append(MqList, nodeInfo) + } else { + + //查询docker是否已经开启 + issue := new(docker.ContainerStore) + cList := make([]registry.SearchResult, 0) + var err error + issueResult, ok := payload["issueResult"] + if ok { + if len(issueResult.(string)) > 0 { + _ = json.Unmarshal([]byte(payload["issueResult"].(string)), issue) + dCli := docker.NewDockerClient() + cList, err = dCli.SearchImage(issue.Name) + if err != nil { + + } + if len(cList) < 1 { + //TODO: 启动容器 + } + } + } else { + issue.DstPort = "5000" + issue.HttpUrl = "" + } + //如果是数据集任务 + if _, ok := payload["single"]; ok { + f := payload["single"].(map[string]interface{}) + if len(payload["workflow"].(string)) > 0 { + //是否设置工作流程 + wf := new(Workflow) + err = json.Unmarshal([]byte(payload["workflow"].(string)), wf) + if err != nil { + + } + fn := proto.FileCapture{ + FileId: int64(f["fileId"].(float64)), + FileName: f["fileName"].(string), + File: f["file"].(string), + DatasetName: payload["datasetName"].(string), + CaptureTime: int64(f["captureTime"].(float64)), + } + ProcessWorkflow(payload, fn, wf) + //if len(payload["datasetPath"].(string)) > 0 { + // //数据集处理 + // opt := &minio.Options{ + // Creds: credentials.NewStaticV4(config.Cfg.Minio.AccessKeyId, config.Cfg.Minio.SecretAccessKey, ""), + // Secure: false, + // } + // cli, _ := minio.New(config.Cfg.Minio.Endpoint, opt) + // doneCh := make(chan struct{}) + // defer close(doneCh) + // objectCh := cli.ListObjects(context.Background(), config.Cfg.Minio.Bucket, minio.ListObjectsOptions{ + // Prefix: payload["datasetPath"].(string), + // Recursive: true, + // }) + // for object := range objectCh { + // file, _ := cli.GetObject(context.Background(), config.Cfg.Minio.Bucket, object.Key, minio.GetObjectOptions{}) + // imgByte, _ := io.ReadAll(file) + // + // f := proto.FileCapture{ + // FileName: object.Key, + // File: base64.StdEncoding.EncodeToString(imgByte), + // DatasetName: payload["datasetName"].(string), + // CaptureTime: object.LastModified.Unix(), + // } + // ProcessWorkflow(payload, f, wf) + // } + // + //} + } else { + reqUrl := fmt.Sprintf("http://127.0.0.1:%s/%s", issue.DstPort, issue.HttpUrl) + if len(payload["httpUrl"].(string)) > 0 { + reqUrl = payload["httpUrl"].(string) + } + fc := f["file"].(string) + dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(fc)) + + buff := new(bytes.Buffer) + writer := multipart.NewWriter(buff) + formFile, _ := writer.CreateFormFile("image", f["fileName"].(string)) + _, _ = io.Copy(formFile, dec) + _ = writer.Close() + + response, err := http.Post(reqUrl, writer.FormDataContentType(), buff) + if err != nil { + config.Logger.With(zap.String("源文件名", f["fileName"].(string))). + With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f["fileName"].(string)))). + Error("文件提交", zap.Error(err)) + } + defer func() { + _ = response.Body.Close() + }() + body, err := io.ReadAll(response.Body) + if err != nil { + config.Logger.With(zap.String("源文件名", f["fileName"].(string))). + With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f["fileName"].(string)))). + Error("模型识别", zap.Error(err)) + } + cli := GetMqClient("task-response", 1) + pay := make(map[string]interface{}) + pay["taskId"] = payload["taskId"] + pay["fileId"] = f["fileId"] + pay["taskCode"] = payload["taskCode"] + pay["nodeId"] = payload["nodeId"] + pay["modelId"] = payload["modelId"] + pay["startTime"] = currTime + pay["finishTime"] = time.Now().Unix() + pay["subDataset"] = payload["subDataset"] + pay["datasetArr"] = payload["datasetArr"] + pay["srcPath"] = f["fileName"] + pay["body"] = string(body) + ap := cli.EndPoint.(hpds_node.AccessPoint) + res := new(InstructionReq) + res.Command = TaskResponse + res.Payload = pay + pData, _ := json.Marshal(res) + _ = GenerateAndSendData(ap, pData) + + } + } + + //if len(cList) > 0 { + //if len(payload["workflow"].(string)) > 0 { + // //是否设置工作流程 + // wf := new(Workflow) + // err = json.Unmarshal([]byte(payload["workflow"].(string)), wf) + // if err != nil { + // + // } + // if len(payload["datasetPath"].(string)) > 0 { + // //数据集处理 + // opt := &minio.Options{ + // Creds: credentials.NewStaticV4(config.Cfg.Minio.AccessKeyId, config.Cfg.Minio.SecretAccessKey, ""), + // Secure: false, + // } + // cli, _ := minio.New(config.Cfg.Minio.Endpoint, opt) + // doneCh := make(chan struct{}) + // defer close(doneCh) + // objectCh := cli.ListObjects(context.Background(), config.Cfg.Minio.Bucket, minio.ListObjectsOptions{ + // Prefix: payload["datasetPath"].(string), + // Recursive: true, + // }) + // for object := range objectCh { + // file, _ := cli.GetObject(context.Background(), config.Cfg.Minio.Bucket, object.Key, minio.GetObjectOptions{}) + // imgByte, _ := io.ReadAll(file) + // + // f := proto.FileCapture{ + // FileName: object.Key, + // File: base64.StdEncoding.EncodeToString(imgByte), + // DatasetName: payload["datasetName"].(string), + // CaptureTime: object.LastModified.Unix(), + // } + // ProcessWorkflow(payload, f, wf) + // } + // + // } + //} else { + // if len(payload["datasetPath"].(string)) > 0 { + // //数据集处理 + // opt := &minio.Options{ + // Creds: credentials.NewStaticV4(config.Cfg.Minio.AccessKeyId, config.Cfg.Minio.SecretAccessKey, ""), + // Secure: false, + // } + // cli, _ := minio.New(config.Cfg.Minio.Endpoint, opt) + // doneCh := make(chan struct{}) + // defer close(doneCh) + // objectCh := cli.ListObjects(context.Background(), config.Cfg.Minio.Bucket, minio.ListObjectsOptions{ + // Prefix: payload["datasetPath"].(string), + // Recursive: true, + // }) + // for object := range objectCh { + // file, _ := cli.GetObject(context.Background(), config.Cfg.Minio.Bucket, object.Key, minio.GetObjectOptions{}) + // imgByte, _ := io.ReadAll(file) + // + // f := proto.FileCapture{ + // FileName: object.Key, + // File: base64.StdEncoding.EncodeToString(imgByte), + // DatasetName: payload["datasetName"].(string), + // CaptureTime: object.LastModified.Unix(), + // } + // + // if len(payload["httpUrl"].(string)) > 0 { + // dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(f.File)) + // reqUrl := fmt.Sprintf("http://localhost:%s/%s", issue.DstPort, issue.HttpUrl) + // response, err := http.Post(reqUrl, "multipart/form-data", dec) + // if err != nil { + // config.Logger.With(zap.String("源文件名", f.FileName)). + // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). + // Error("文件提交", zap.Error(err)) + // } + // defer func() { + // _ = response.Body.Close() + // config.Logger.With(zap.String("源文件名", f.FileName)). + // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). + // Info("模型识别") + // }() + // body, err := io.ReadAll(response.Body) + // if err != nil { + // config.Logger.With(zap.String("源文件名", f.FileName)). + // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). + // Error("模型识别", zap.Error(err)) + // } + // cli := GetMqClient("task-response", 1) + // ap := cli.EndPoint.(hpds_node.AccessPoint) + // res := new(InstructionReq) + // res.Command = TaskResponse + // res.Payload = body + // pData, _ := json.Marshal(res) + // _ = GenerateAndSendData(ap, pData) + // } + // } + // } + //f := new(proto.FileCapture) + //err := json.Unmarshal(data, f) + //if err != nil { + // + //} + //if len(f.File) > 0 { + // + // i := strings.Index(f.File, ",") + // dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(f.File[i+1:])) + // if len(payload["httpUrl"].(string)) > 0 { + // _ = os.MkdirAll(path.Join(config.Cfg.TmpPath, payload["subDataset"].(string)), os.ModePerm) + // tmpFile, _ := os.Create(path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName)) + // _, err = io.Copy(tmpFile, dec) + // + // reqUrl := fmt.Sprintf("http://localhost:%s/%s", issue.DstPort, issue.HttpUrl) + // response, err := http.Post(reqUrl, "multipart/form-data", dec) + // if err != nil { + // config.Logger.With(zap.String("源文件名", f.FileName)). + // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). + // Error("文件提交", zap.Error(err)) + // } + // defer func() { + // _ = response.Body.Close() + // config.Logger.With(zap.String("源文件名", f.FileName)). + // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). + // Info("模型识别") + // }() + // body, err := io.ReadAll(response.Body) + // if err != nil { + // config.Logger.With(zap.String("源文件名", f.FileName)). + // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). + // Error("模型识别", zap.Error(err)) + // } + // cli := GetMqClient("task-response", 1) + // ap := cli.EndPoint.(hpds_node.AccessPoint) + // res := new(InstructionReq) + // res.Command = TaskResponse + // res.Payload = body + // pData, _ := json.Marshal(res) + // _ = GenerateAndSendData(ap, pData) + // } + // if len(payload["inPath"].(string)) > 0 { + // outPath := "" + // for k, v := range issue.Volumes { + // if v == payload["outPath"].(string) { + // outPath = k + // break + // } + // } + // //创建一个监控对象 + // watch, err := fsnotify.NewWatcher() + // if err != nil { + // config.Logger.Error("创建文件监控", zap.Error(err)) + // } + // defer func(watch *fsnotify.Watcher) { + // _ = watch.Close() + // }(watch) + // + // err = watch.Add(outPath) + // if err != nil { + // config.Logger.With(zap.String("监控目录", outPath)). + // Error("创建文件监控", zap.Error(err)) + // } + // for k, v := range issue.Volumes { + // if v == payload["inPath"].(string) { + // _ = os.MkdirAll(k, os.ModePerm) + // tmpFile, _ := os.Create(path.Join(k, f.FileName)) + // _, err = io.Copy(tmpFile, dec) + // break + // } + // } + // list := arraylist.New() // empty + // t1 := time.NewTicker(1 * time.Second) + // go func() { + // for { + // select { + // case ev := <-watch.Events: + // { + // //判断事件发生的类型,如下5种 + // // Create 创建 + // // Write 写入 + // // Remove 删除 + // // Rename 重命名 + // // Chmod 修改权限 + // if ev.Op&fsnotify.Create == fsnotify.Create { + // config.Logger.Info("创建文件", zap.String("文件名", ev.Name)) + // list.Add(ev.Name) + // } + // } + // case <-t1.C: + // { + // if list.Size() > 0 { + // returnFileHandleResult(list, payload, issue) + // } + // } + // case err = <-watch.Errors: + // { + // config.Logger.With(zap.String("监控目录", outPath)). + // Error("文件监控", zap.Error(err)) + // return + // } + // } + // } + // }() + // } + //} } + //} } } + func returnFileHandleResult(list *arraylist.List, payload map[string]interface{}, issue *docker.ContainerStore) { var ( mu sync.RWMutex diff --git a/mq/index.go b/mq/index.go index b86e26f..7f84020 100644 --- a/mq/index.go +++ b/mq/index.go @@ -3,6 +3,7 @@ package mq import ( "fmt" "git.hpds.cc/Component/logging" + "git.hpds.cc/Component/network/frame" "go.uber.org/zap" "os" "taskExecute/config" @@ -40,7 +41,7 @@ func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *loggin sf := hpds_node.NewStreamFunction( v.Name, hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)), - hpds_node.WithObserveDataTags(v.DataTag), + hpds_node.WithObserveDataTags(frame.Tag(v.DataTag)), hpds_node.WithCredential(node.Token), ) err = sf.Connect() @@ -72,7 +73,7 @@ func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *loggin EndPoint: ap, } must(logger, err) - ap.SetDataTag(v.DataTag) + ap.SetDataTag(frame.Tag(v.DataTag)) mqList = append(mqList, nodeInfo) } diff --git a/mq/instruction.go b/mq/instruction.go index 1c36b3f..aabf662 100644 --- a/mq/instruction.go +++ b/mq/instruction.go @@ -7,6 +7,7 @@ const ( TaskResponse ModelIssueRepeater ModelIssueResponse + TaskExecuteLog ) type InstructionReq struct { @@ -20,9 +21,18 @@ type TaskResponseBody struct { TaskCode string `json:"taskCode"` NodeId int64 `json:"nodeId"` ModelId int64 `json:"modelId"` + FileId int64 `json:"fileId"` SrcPath string `json:"srcPath"` + DatasetArr string `json:"datasetArr"` StartTime int64 `json:"startTime"` FinishTime int64 `json:"finishTime"` Msg string `json:"msg"` Body string `json:"body"` } + +type TaskExecuteLogStruct struct { + TaskId int64 `json:"taskId"` + TaskCode string `json:"taskCode"` + NodeGuid string `json:"nodeGuid"` + Payload interface{} `json:"payload"` +} diff --git a/mq/service.go b/mq/service.go index e9a42b3..fd52e26 100644 --- a/mq/service.go +++ b/mq/service.go @@ -1,11 +1,16 @@ package mq import ( + "bytes" "encoding/base64" "encoding/json" "fmt" "git.hpds.cc/pavement/hpds_node" + "golang.org/x/image/bmp" + "golang.org/x/image/tiff" "image" + "image/jpeg" + "image/png" "strings" "taskExecute/pkg/utils" "taskExecute/proto" @@ -40,8 +45,8 @@ func CreateWorkflowQueue(wf *Workflow) *WorkflowQueue { func ProcessWorkflow(payload map[string]interface{}, fc proto.FileCapture, wf *Workflow) { qList := CreateWorkflowQueue(wf) var ( - img image.Image - //imgBase64 string + img image.Image + imgList []image.Image imgType string = "jpeg" err error resultData string @@ -50,17 +55,24 @@ func ProcessWorkflow(payload map[string]interface{}, fc proto.FileCapture, wf *W for i := 0; i < len(wf.Nodes); i++ { nodeId, _ := qList.Pop() node := GetWorkflowNodeById(wf, nodeId) + if node == nil { + continue + } switch node.Type { case "start-node": - continue + fn, _ := base64.StdEncoding.DecodeString(fc.File) + buff := bytes.NewBuffer(fn) + img, imgType, _ = image.Decode(buff) case "image-node": //处理图像后 - fn, _ := base64.StdEncoding.DecodeString(fc.File) - if node.Properties.NodeData.Method == "crop" { + fn := utils.ImageToBuff(img, imgType) + if node.Properties.NodeData.Method == "zoom" { img, imgType, err = utils.Clip(fn, node.Properties.NodeData.Width, node.Properties.NodeData.Height, node.Properties.NodeData.EqualProportion) if err != nil { goto ReturnPoint } + } else if node.Properties.NodeData.Method == "crop" { + imgList = utils.SplitImage(fn, node.Properties.NodeData.Width, node.Properties.NodeData.Height) } else if node.Properties.NodeData.Method == "gray" { img, err = utils.Gray(fn) if err != nil { @@ -75,56 +87,104 @@ func ProcessWorkflow(payload map[string]interface{}, fc proto.FileCapture, wf *W case 270: img = utils.Rotate270(fn) default: - img = utils.BuffToImage(fn) + img, _, _ = image.Decode(fn) } } else if node.Properties.NodeData.Method == "formatConversion" { - img = utils.BuffToImage(fn) + //img = utils.BuffToImage(fn) switch node.Properties.NodeData.Format { case "bmp": imgType = "bmp" + _ = bmp.Encode(fn, img) case "png": imgType = "png" + _ = png.Encode(fn, img) case "tiff": imgType = "tiff" + _ = tiff.Encode(fn, img, &tiff.Options{Predictor: false}) default: imgType = "jpeg" + _ = jpeg.Encode(fn, img, &jpeg.Options{ + Quality: 100, + }) } + + img, _, _ = image.Decode(fn) } case "fetch-node": header := make(map[string]string) header["ContentType"] = node.Properties.NodeData.ContentType param := make(map[string]string) isBody := false - for _, val := range node.Properties.NodeData.DynamicValidateForm.Fields { - switch val.Type { - case "fileName": - param[val.Key] = fc.FileName - case "imgBase64": - param[val.Key] = utils.ImageToBase64(img, imgType) - default: - isBody = true + if len(imgList) > 0 { + result := make([]string, len(imgList)) + for idx, subImg := range imgList { + for _, val := range node.Properties.NodeData.DynamicValidateForm.Fields { + switch val.Type { + case "fileName": + param[val.Key] = fc.FileName + case "imgBase64": + param[val.Key] = utils.ImageToBase64(subImg, imgType) + default: + isBody = true + } + } + if !isBody { + data, e := utils.HttpDo(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url), + strings.ToUpper(node.Properties.NodeData.MethodType), param, header) + if e != nil { + err = e + goto ReturnPoint + } + result[idx] = string(data) + //resultData = string(data) + } else { + buff := utils.ImageToBuff(subImg, imgType) + files := make([]utils.UploadFile, 1) + files[0] = utils.UploadFile{ + Name: "image", + Filepath: "./output.jpg", + File: buff, + } + data := utils.PostFile(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url), + param, "multipart/form-data", files, header) + //resultData = data + result[idx] = string(data) + } } - } - if !isBody { - data, err := utils.HttpDo(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url), - strings.ToUpper(node.Properties.NodeData.MethodType), param, header) - if err != nil { - goto ReturnPoint - } - resultData = string(data) + resultStr, _ := json.Marshal(result) + resultData = string(resultStr) } else { - buff := utils.ImageToBuff(img, imgType) - files := make([]utils.UploadFile, 1) - files[0] = utils.UploadFile{ - Name: "file", - Filepath: "./output.jpg", - File: buff, + for _, val := range node.Properties.NodeData.DynamicValidateForm.Fields { + switch val.Type { + case "fileName": + param[val.Key] = fc.FileName + case "imgBase64": + param[val.Key] = utils.ImageToBase64(img, imgType) + default: + isBody = true + } + } + if !isBody { + data, e := utils.HttpDo(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url), + strings.ToUpper(node.Properties.NodeData.MethodType), param, header) + if e != nil { + err = e + goto ReturnPoint + } + resultData = string(data) + } else { + buff := utils.ImageToBuff(img, imgType) + files := make([]utils.UploadFile, 1) + files[0] = utils.UploadFile{ + Name: "image", + Filepath: "./output.jpg", + File: buff, + } + data := utils.PostFile(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url), + param, "multipart/form-data", files, header) + resultData = data } - data := utils.PostFile(fmt.Sprintf("%s%s", node.Properties.NodeData.Proto, node.Properties.NodeData.Url), - param, "multipart/form-data", files, header) - resultData = data } - case "model-node": continue case "mq-node": @@ -139,6 +199,8 @@ ReturnPoint: item.TaskCode = payload["taskCode"].(string) item.NodeId = int64(payload["nodeId"].(float64)) item.ModelId = int64(payload["modelId"].(float64)) + item.DatasetArr = payload["datasetArr"].(string) + item.FileId = fc.FileId item.SrcPath = fc.FileName item.StartTime = startTime item.FinishTime = time.Now().Unix() diff --git a/pkg/utils/http.go b/pkg/utils/http.go index 939b427..02a5a7d 100644 --- a/pkg/utils/http.go +++ b/pkg/utils/http.go @@ -14,13 +14,19 @@ import ( func HttpDo(reqUrl, method string, params map[string]string, header map[string]string) (data []byte, err error) { var paramStr string = "" - for k, v := range params { - if len(paramStr) == 0 { - paramStr = fmt.Sprintf("%s=%s", k, url.QueryEscape(v)) - } else { - paramStr = fmt.Sprintf("%s&%s=%s", paramStr, k, url.QueryEscape(v)) + if contentType, ok := header["ContentType"]; ok && strings.Contains(contentType, "json") { + bytesData, _ := json.Marshal(params) + paramStr = string(bytesData) + } else { + for k, v := range params { + if len(paramStr) == 0 { + paramStr = fmt.Sprintf("%s=%s", k, url.QueryEscape(v)) + } else { + paramStr = fmt.Sprintf("%s&%s=%s", paramStr, k, url.QueryEscape(v)) + } } } + client := &http.Client{} req, err := http.NewRequest(strings.ToUpper(method), reqUrl, strings.NewReader(paramStr)) if err != nil { diff --git a/pkg/utils/image.go b/pkg/utils/image.go index 68dce3c..0e9097b 100644 --- a/pkg/utils/image.go +++ b/pkg/utils/image.go @@ -3,12 +3,15 @@ package utils import ( "bytes" "encoding/base64" + "fmt" "golang.org/x/image/bmp" "golang.org/x/image/tiff" "image" "image/color" + "image/draw" "image/jpeg" "image/png" + "math" ) func BuffToImage(in []byte) image.Image { @@ -18,10 +21,12 @@ func BuffToImage(in []byte) image.Image { } // Clip 图片裁剪 -func Clip(in []byte, wi, hi int, equalProportion bool) (out image.Image, imageType string, err error) { - buff := bytes.NewBuffer(in) - m, imgType, _ := image.Decode(buff) - rgbImg := m.(*image.YCbCr) +func Clip(buff *bytes.Buffer, wi, hi int, equalProportion bool) (out image.Image, imageType string, err error) { + //buff := bytes.NewBuffer(in) + m, imgType, err := image.Decode(buff) + if err != nil { + return nil, "", err + } if equalProportion { w := m.Bounds().Max.X h := m.Bounds().Max.Y @@ -29,7 +34,35 @@ func Clip(in []byte, wi, hi int, equalProportion bool) (out image.Image, imageTy wi, hi = fixSize(w, h, wi, hi) } } - return rgbImg.SubImage(image.Rect(0, 0, wi, hi)), imgType, nil + switch imgType { + case "jpeg", "jpg": + rgbImg := m.(*image.YCbCr) + return rgbImg.SubImage(image.Rect(0, 0, wi, hi)), imgType, nil + case "bmp": + img := m.(*image.RGBA) + if equalProportion { + + } + return img.SubImage(image.Rect(0, 0, wi, hi)).(*image.RGBA), imageType, nil + case "png": + switch m.(type) { + case *image.NRGBA: + img := m.(*image.NRGBA) + subImg := img.SubImage(image.Rect(0, 0, wi, hi)).(*image.NRGBA) + + return subImg, imageType, nil + case *image.RGBA: + img := m.(*image.RGBA) + subImg := img.SubImage(image.Rect(0, 0, wi, hi)).(*image.RGBA) + return subImg, imageType, nil + } + case "gif": + img := m.(*image.Paletted) + subImg := img.SubImage(image.Rect(0, 0, wi, hi)).(*image.Paletted) + return subImg, imageType, nil + } + return nil, "", fmt.Errorf("未知的图片格式") + } func fixSize(img1W, img2H, wi, hi int) (new1W, new2W int) { @@ -45,8 +78,8 @@ func fixSize(img1W, img2H, wi, hi int) (new1W, new2W int) { return int(imgWidth * ratio), int(imgHeight * ratio) } -func Gray(in []byte) (out image.Image, err error) { - m := BuffToImage(in) +func Gray(buff *bytes.Buffer) (out image.Image, err error) { + m, _, _ := image.Decode(buff) bounds := m.Bounds() dx := bounds.Dx() dy := bounds.Dy() @@ -64,8 +97,8 @@ func Gray(in []byte) (out image.Image, err error) { return newRgba.SubImage(r), nil } -func Rotate90(in []byte) image.Image { - m := BuffToImage(in) +func Rotate90(buff *bytes.Buffer) image.Image { + m, _, _ := image.Decode(buff) rotate90 := image.NewRGBA(image.Rect(0, 0, m.Bounds().Dy(), m.Bounds().Dx())) // 矩阵旋转 for x := m.Bounds().Min.Y; x < m.Bounds().Max.Y; x++ { @@ -78,8 +111,8 @@ func Rotate90(in []byte) image.Image { } // Rotate180 旋转180度 -func Rotate180(in []byte) image.Image { - m := BuffToImage(in) +func Rotate180(buff *bytes.Buffer) image.Image { + m, _, _ := image.Decode(buff) rotate180 := image.NewRGBA(image.Rect(0, 0, m.Bounds().Dx(), m.Bounds().Dy())) // 矩阵旋转 for x := m.Bounds().Min.X; x < m.Bounds().Max.X; x++ { @@ -92,8 +125,8 @@ func Rotate180(in []byte) image.Image { } // Rotate270 旋转270度 -func Rotate270(in []byte) image.Image { - m := BuffToImage(in) +func Rotate270(buff *bytes.Buffer) image.Image { + m, _, _ := image.Decode(buff) rotate270 := image.NewRGBA(image.Rect(0, 0, m.Bounds().Dy(), m.Bounds().Dx())) // 矩阵旋转 for x := m.Bounds().Min.Y; x < m.Bounds().Max.Y; x++ { @@ -125,7 +158,57 @@ func ImageToBuff(img image.Image, imgType string) *bytes.Buffer { _ = tiff.Encode(buff, img, nil) default: imgType = "jpeg" - _ = jpeg.Encode(buff, img, nil) + _ = jpeg.Encode(buff, img, &jpeg.Options{ + Quality: 100, + }) } return buff } +func SplitImage(buff *bytes.Buffer, w, h int) []image.Image { + img, _, _ := image.Decode(buff) + list := make([]image.Image, 0) + newWidth := int(math.Ceil(float64(img.Bounds().Dx()) / float64(w))) + newHeight := int(math.Ceil(float64(img.Bounds().Dy()) / float64(h))) + rect := image.Rect(0, 0, newWidth*w, newHeight*h) + newImage := image.NewRGBA(rect) + draw.Draw(newImage, img.Bounds(), img, newImage.Bounds().Min, draw.Over) + sp := splitPattern(newImage, w, h) + spLen := len(sp) + var ( + square image.Rectangle + ) + for i := 0; i < spLen; i++ { + square = image.Rect(sp[i][0], sp[i][1], sp[i][2], sp[i][3]) + imgPart := img.(interface { + SubImage(r image.Rectangle) image.Image + }).SubImage(square) + list = append(list, imgPart) + } + return list +} +func splitPattern(img image.Image, w, h int) [][]int { + ret := make([][]int, 0) + vOffSet := width(img) / w + hOffSet := height(img) / h + for r := 0; r < hOffSet; r++ { + for c := 0; c < vOffSet; c++ { + //行偏移,仅应用在x + x1 := w * c + y1 := h * r + + x2 := w * (c + 1) + y2 := (r + 1) * h + el := []int{x1, y1, x2, y2} + ret = append(ret, el) + } + } + return ret +} + +func width(i image.Image) int { + return i.Bounds().Max.X - i.Bounds().Min.X +} + +func height(i image.Image) int { + return i.Bounds().Max.Y - i.Bounds().Min.Y +} diff --git a/proto/mq.go b/proto/mq.go index 30573c2..00479d2 100644 --- a/proto/mq.go +++ b/proto/mq.go @@ -1,6 +1,15 @@ package proto type FileCapture struct { + FileId int64 `json:"fileId"` + FileName string `json:"fileName"` + File string `json:"file"` + DatasetName string `json:"datasetName"` + CaptureTime int64 `json:"captureTime"` +} + +type SingleFileCapture struct { + FileId int64 `json:"fileId"` FileName string `json:"fileName"` File string `json:"file"` DatasetName string `json:"datasetName"`