package mq import ( "bufio" "bytes" "context" "encoding/base64" "encoding/json" "fmt" "git.hpds.cc/Component/network/frame" "git.hpds.cc/pavement/hpds_node" "github.com/docker/docker/api/types/registry" "github.com/emirpasic/gods/lists/arraylist" "github.com/fsnotify/fsnotify" "github.com/gammazero/workerpool" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/shirou/gopsutil/v3/host" "go.uber.org/zap" "io" "mime/multipart" "net/http" "os" "os/exec" "path" "strconv" "strings" "sync" "taskExecute/config" "taskExecute/pkg/compress" "taskExecute/pkg/docker" "taskExecute/pkg/download" "taskExecute/pkg/utils" "taskExecute/proto" "time" ) var ( wg sync.WaitGroup TaskList map[string]docker.ContainerStore ) func TaskExecuteHandler(data []byte) (frame.Tag, []byte) { //fmt.Println("接收数据", string(data)) cmd := new(InstructionReq) err := json.Unmarshal(data, cmd) if err != nil { return 0x0B, []byte(err.Error()) } switch cmd.Command { case TaskExecute: //任务执行 waitWorkerStartFinish(config.WorkPool, cmd.Payload.(map[string]interface{}), ModelTaskExecuteHandler) case ModelIssueRepeater: //模型下发 waitWorkerStartFinish(config.WorkPool, cmd.Payload.(map[string]interface{}), ModelIssueRepeaterHandler) } return frame.Tag(cmd.Command), nil } func waitWorkerStartFinish(wp *workerpool.WorkerPool, payload map[string]interface{}, f func(payload map[string]interface{})) { startStop := make(chan time.Time, 2) wp.Submit(func() { startStop <- time.Now() f(payload) startStop <- time.Now() }) fmt.Println("Task started at:", <-startStop) fmt.Println("Task finished at:", <-startStop) } // execCommand 执行命令 func execCommandWait(commandName string, params []string) bool { cmd := exec.Command(commandName, params...) //显示运行的命令 fmt.Println(cmd.Args) stdout, err := cmd.StdoutPipe() if err != nil { fmt.Println(err) return false } _ = cmd.Start() reader := bufio.NewReader(stdout) //实时循环读取输出流中的一行内容 for { wg.Add(1) line, err2 := reader.ReadString('\n') if err2 != nil || io.EOF == err2 { break } config.Logger.Info("执行命令", zap.String("execCommandWait", line)) } _ = cmd.Wait() wg.Done() return true } func ModelIssueRepeaterHandler(payload map[string]interface{}) { hi, _ := host.Info() if payload["nodeGuid"].(string) == hi.HostID { fileUrl := payload["dockerFile"].(string) modelVersion := payload["modelVersion"].(string) downFileName := path.Base(fileUrl) //判断文件后缀名 fileType := path.Ext(downFileName) fileNameOnly := strings.TrimSuffix(downFileName, fileType) dFile := path.Join(config.Cfg.TmpPath, fileNameOnly, downFileName) //执行文件下载 controller := download.ThreadController{ ThreadCount: download.ThreadCount, FileUrl: fileUrl, DownloadFolder: dFile, DownloadFileName: downFileName, Logger: config.Logger, } controller.Download(download.OneThreadDownloadSize) if strings.ToLower(fileType) == ".zip" { err := compress.UnzipFromFile(path.Join(config.Cfg.TmpPath, fileNameOnly), dFile) if err != nil { controller.Logger.With(zap.String("文件解压缩", path.Join(config.Cfg.TmpPath, downFileName))). Error("发生错误", zap.Error(err)) return } dFile = path.Join(config.Cfg.TmpPath, fileNameOnly, fileNameOnly+".tar") } //docker 导入并运行 imgName := fmt.Sprintf("%s:%s", fileNameOnly, modelVersion) if strings.ToLower(path.Ext(dFile)) == ".tar" { dCli := docker.NewDockerClient() err := dCli.ImportImage(imgName, "latest", dFile) //err = dCli.LoadImage(dFile) if err != nil { controller.Logger.With(zap.String("导入docker的文件", dFile)). Error("发生错误", zap.Error(err)) } //设置data目录 dataPath := path.Join(config.Cfg.TmpPath, fileNameOnly, "data") _ = os.MkdirAll(dataPath, os.ModePerm) vol := make(map[string]string) vol[path.Join(dataPath, payload["inPath"].(string))] = payload["inPath"].(string) vol[path.Join(dataPath, payload["outPath"].(string))] = payload["outPath"].(string) //docker运行 modelCommand := strings.Split(payload["modelCommand"].(string), " ") dstPort := dCli.CreateContainer(fileNameOnly, imgName, modelCommand, vol, strconv.Itoa(payload["mappedPort"].(int))) //保存到本地临时文件 item := docker.ContainerStore{ ModelId: payload["modelId"].(int64), NodeId: payload["nodeId"].(int64), Name: fileNameOnly, ImgName: imgName, Volumes: vol, SrcPort: strconv.Itoa(payload["mappedPort"].(int)), DstPort: dstPort, Command: modelCommand, HttpUrl: payload["httpUrl"].(string), } docker.ContainerList = append(docker.ContainerList, item) docker.SaveStore() cli := GetMqClient("task-response", 1) ap := cli.EndPoint.(hpds_node.AccessPoint) res := new(InstructionReq) res.Command = ModelIssueResponse res.Payload = item pData, _ := json.Marshal(res) _ = GenerateAndSendData(ap, pData) } } } func ModelTaskExecuteHandler(payload map[string]interface{}) { hi, _ := host.Info() currTime := time.Now().Unix() //config.Logger.With(zap.Any("收到数据", payload)) if payload["nodeGuid"] == hi.HostID { if len(payload["subDataset"].(string)) > 0 { //如果是订阅数据 sf := hpds_node.NewStreamFunction( payload["subDataset"].(string), hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", config.Cfg.Node.Host, config.Cfg.Node.Port)), hpds_node.WithObserveDataTags(frame.Tag(payload["subDataTag"].(byte))), hpds_node.WithCredential(config.Cfg.Node.Token), ) err := sf.Connect() must(config.Logger, err) nodeInfo := HpdsMqNode{ MqType: 2, Topic: payload["subDataset"].(string), Node: config.Cfg.Node, EndPoint: sf, } _ = sf.SetHandler(func(data []byte) (frame.Tag, []byte) { //查询docker是否已经开启 issue := new(docker.ContainerStore) _ = json.Unmarshal([]byte(payload["issueResult"].(string)), issue) dCli := docker.NewDockerClient() cList, err := dCli.SearchImage(issue.Name) if err != nil { } if len(cList) > 0 { if len(payload["workflow"].(string)) > 0 { //是否设置工作流程 wf := new(Workflow) err = json.Unmarshal([]byte(payload["workflow"].(string)), wf) if err != nil { } if len(payload["datasetPath"].(string)) > 0 { //数据集处理 opt := &minio.Options{ Creds: credentials.NewStaticV4(config.Cfg.Minio.AccessKeyId, config.Cfg.Minio.SecretAccessKey, ""), Secure: false, } cli, _ := minio.New(config.Cfg.Minio.Endpoint, opt) doneCh := make(chan struct{}) defer close(doneCh) objectCh := cli.ListObjects(context.Background(), config.Cfg.Minio.Bucket, minio.ListObjectsOptions{ Prefix: payload["datasetPath"].(string), Recursive: true, }) for object := range objectCh { file, _ := cli.GetObject(context.Background(), config.Cfg.Minio.Bucket, object.Key, minio.GetObjectOptions{}) imgByte, _ := io.ReadAll(file) f := proto.FileCapture{ FileName: object.Key, File: base64.StdEncoding.EncodeToString(imgByte), DatasetName: payload["datasetName"].(string), CaptureTime: object.LastModified.Unix(), } ProcessWorkflow(payload, f, wf) } } } else { f := new(proto.FileCapture) err := json.Unmarshal(data, f) if err != nil { } if len(f.File) > 0 { i := strings.Index(f.File, ",") dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(f.File[i+1:])) if len(payload["httpUrl"].(string)) > 0 { _ = os.MkdirAll(path.Join(config.Cfg.TmpPath, payload["subDataset"].(string)), os.ModePerm) tmpFile, _ := os.Create(path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName)) _, err = io.Copy(tmpFile, dec) reqUrl := fmt.Sprintf("http://localhost:%s/%s", issue.DstPort, issue.HttpUrl) response, err := http.Post(reqUrl, "multipart/form-data", dec) if err != nil { config.Logger.With(zap.String("源文件名", f.FileName)). With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). Error("文件提交", zap.Error(err)) } defer func() { _ = response.Body.Close() config.Logger.With(zap.String("源文件名", f.FileName)). With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). Info("模型识别") }() body, err := io.ReadAll(response.Body) if err != nil { config.Logger.With(zap.String("源文件名", f.FileName)). With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). Error("模型识别", zap.Error(err)) } cli := GetMqClient("task-response", 1) ap := cli.EndPoint.(hpds_node.AccessPoint) res := new(InstructionReq) res.Command = TaskResponse res.Payload = body pData, _ := json.Marshal(res) _ = GenerateAndSendData(ap, pData) } if len(payload["inPath"].(string)) > 0 { outPath := "" for k, v := range issue.Volumes { if v == payload["outPath"].(string) { outPath = k break } } //创建一个监控对象 watch, err := fsnotify.NewWatcher() if err != nil { config.Logger.Error("创建文件监控", zap.Error(err)) } defer func(watch *fsnotify.Watcher) { _ = watch.Close() }(watch) err = watch.Add(outPath) if err != nil { config.Logger.With(zap.String("监控目录", outPath)). Error("创建文件监控", zap.Error(err)) } for k, v := range issue.Volumes { if v == payload["inPath"].(string) { _ = os.MkdirAll(k, os.ModePerm) tmpFile, _ := os.Create(path.Join(k, f.FileName)) _, err = io.Copy(tmpFile, dec) break } } list := arraylist.New() // empty t1 := time.NewTicker(1 * time.Second) go func() { for { select { case ev := <-watch.Events: { //判断事件发生的类型,如下5种 // Create 创建 // Write 写入 // Remove 删除 // Rename 重命名 // Chmod 修改权限 if ev.Op&fsnotify.Create == fsnotify.Create { config.Logger.Info("创建文件", zap.String("文件名", ev.Name)) list.Add(ev.Name) } } case <-t1.C: { if list.Size() > 0 { returnFileHandleResult(list, payload, issue) } } case err = <-watch.Errors: { config.Logger.With(zap.String("监控目录", outPath)). Error("文件监控", zap.Error(err)) return } } } }() } } } } return frame.Tag(payload["subDataTag"].(byte)), nil }) MqList = append(MqList, nodeInfo) } else { //查询docker是否已经开启 issue := new(docker.ContainerStore) cList := make([]registry.SearchResult, 0) var err error issueResult, ok := payload["issueResult"] if ok { if len(issueResult.(string)) > 0 { _ = json.Unmarshal([]byte(payload["issueResult"].(string)), issue) dCli := docker.NewDockerClient() cList, err = dCli.SearchImage(issue.Name) if err != nil { } if len(cList) < 1 { //TODO: 启动容器 } } } else { issue.DstPort = "5000" issue.HttpUrl = "" } //如果是数据集任务 if _, ok := payload["single"]; ok { f := payload["single"].(map[string]interface{}) if len(payload["workflow"].(string)) > 0 { //是否设置工作流程 wf := new(Workflow) err = json.Unmarshal([]byte(payload["workflow"].(string)), wf) if err != nil { } fn := proto.FileCapture{ FileId: int64(f["fileId"].(float64)), FileName: f["fileName"].(string), File: f["file"].(string), DatasetName: payload["datasetName"].(string), CaptureTime: int64(f["captureTime"].(float64)), } ProcessWorkflow(payload, fn, wf) //if len(payload["datasetPath"].(string)) > 0 { // //数据集处理 // opt := &minio.Options{ // Creds: credentials.NewStaticV4(config.Cfg.Minio.AccessKeyId, config.Cfg.Minio.SecretAccessKey, ""), // Secure: false, // } // cli, _ := minio.New(config.Cfg.Minio.Endpoint, opt) // doneCh := make(chan struct{}) // defer close(doneCh) // objectCh := cli.ListObjects(context.Background(), config.Cfg.Minio.Bucket, minio.ListObjectsOptions{ // Prefix: payload["datasetPath"].(string), // Recursive: true, // }) // for object := range objectCh { // file, _ := cli.GetObject(context.Background(), config.Cfg.Minio.Bucket, object.Key, minio.GetObjectOptions{}) // imgByte, _ := io.ReadAll(file) // // f := proto.FileCapture{ // FileName: object.Key, // File: base64.StdEncoding.EncodeToString(imgByte), // DatasetName: payload["datasetName"].(string), // CaptureTime: object.LastModified.Unix(), // } // ProcessWorkflow(payload, f, wf) // } // //} } else { reqUrl := fmt.Sprintf("http://127.0.0.1:%s/%s", issue.DstPort, issue.HttpUrl) if len(payload["httpUrl"].(string)) > 0 { reqUrl = payload["httpUrl"].(string) } fc := f["file"].(string) dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(fc)) buff := new(bytes.Buffer) writer := multipart.NewWriter(buff) formFile, _ := writer.CreateFormFile("image", f["fileName"].(string)) _, _ = io.Copy(formFile, dec) _ = writer.Close() response, err := http.Post(reqUrl, writer.FormDataContentType(), buff) if err != nil { config.Logger.With(zap.String("源文件名", f["fileName"].(string))). With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f["fileName"].(string)))). Error("文件提交", zap.Error(err)) } defer func() { _ = response.Body.Close() }() body, err := io.ReadAll(response.Body) if err != nil { config.Logger.With(zap.String("源文件名", f["fileName"].(string))). With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f["fileName"].(string)))). Error("模型识别", zap.Error(err)) } cli := GetMqClient("task-response", 1) pay := make(map[string]interface{}) pay["taskId"] = payload["taskId"] pay["fileId"] = f["fileId"] pay["taskCode"] = payload["taskCode"] pay["nodeId"] = payload["nodeId"] pay["modelId"] = payload["modelId"] pay["startTime"] = currTime pay["finishTime"] = time.Now().Unix() pay["subDataset"] = payload["subDataset"] pay["datasetArr"] = payload["datasetArr"] pay["srcPath"] = f["fileName"] pay["body"] = string(body) ap := cli.EndPoint.(hpds_node.AccessPoint) res := new(InstructionReq) res.Command = TaskResponse res.Payload = pay pData, _ := json.Marshal(res) _ = GenerateAndSendData(ap, pData) } } //if len(cList) > 0 { //if len(payload["workflow"].(string)) > 0 { // //是否设置工作流程 // wf := new(Workflow) // err = json.Unmarshal([]byte(payload["workflow"].(string)), wf) // if err != nil { // // } // if len(payload["datasetPath"].(string)) > 0 { // //数据集处理 // opt := &minio.Options{ // Creds: credentials.NewStaticV4(config.Cfg.Minio.AccessKeyId, config.Cfg.Minio.SecretAccessKey, ""), // Secure: false, // } // cli, _ := minio.New(config.Cfg.Minio.Endpoint, opt) // doneCh := make(chan struct{}) // defer close(doneCh) // objectCh := cli.ListObjects(context.Background(), config.Cfg.Minio.Bucket, minio.ListObjectsOptions{ // Prefix: payload["datasetPath"].(string), // Recursive: true, // }) // for object := range objectCh { // file, _ := cli.GetObject(context.Background(), config.Cfg.Minio.Bucket, object.Key, minio.GetObjectOptions{}) // imgByte, _ := io.ReadAll(file) // // f := proto.FileCapture{ // FileName: object.Key, // File: base64.StdEncoding.EncodeToString(imgByte), // DatasetName: payload["datasetName"].(string), // CaptureTime: object.LastModified.Unix(), // } // ProcessWorkflow(payload, f, wf) // } // // } //} else { // if len(payload["datasetPath"].(string)) > 0 { // //数据集处理 // opt := &minio.Options{ // Creds: credentials.NewStaticV4(config.Cfg.Minio.AccessKeyId, config.Cfg.Minio.SecretAccessKey, ""), // Secure: false, // } // cli, _ := minio.New(config.Cfg.Minio.Endpoint, opt) // doneCh := make(chan struct{}) // defer close(doneCh) // objectCh := cli.ListObjects(context.Background(), config.Cfg.Minio.Bucket, minio.ListObjectsOptions{ // Prefix: payload["datasetPath"].(string), // Recursive: true, // }) // for object := range objectCh { // file, _ := cli.GetObject(context.Background(), config.Cfg.Minio.Bucket, object.Key, minio.GetObjectOptions{}) // imgByte, _ := io.ReadAll(file) // // f := proto.FileCapture{ // FileName: object.Key, // File: base64.StdEncoding.EncodeToString(imgByte), // DatasetName: payload["datasetName"].(string), // CaptureTime: object.LastModified.Unix(), // } // // if len(payload["httpUrl"].(string)) > 0 { // dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(f.File)) // reqUrl := fmt.Sprintf("http://localhost:%s/%s", issue.DstPort, issue.HttpUrl) // response, err := http.Post(reqUrl, "multipart/form-data", dec) // if err != nil { // config.Logger.With(zap.String("源文件名", f.FileName)). // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). // Error("文件提交", zap.Error(err)) // } // defer func() { // _ = response.Body.Close() // config.Logger.With(zap.String("源文件名", f.FileName)). // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). // Info("模型识别") // }() // body, err := io.ReadAll(response.Body) // if err != nil { // config.Logger.With(zap.String("源文件名", f.FileName)). // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). // Error("模型识别", zap.Error(err)) // } // cli := GetMqClient("task-response", 1) // ap := cli.EndPoint.(hpds_node.AccessPoint) // res := new(InstructionReq) // res.Command = TaskResponse // res.Payload = body // pData, _ := json.Marshal(res) // _ = GenerateAndSendData(ap, pData) // } // } // } //f := new(proto.FileCapture) //err := json.Unmarshal(data, f) //if err != nil { // //} //if len(f.File) > 0 { // // i := strings.Index(f.File, ",") // dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(f.File[i+1:])) // if len(payload["httpUrl"].(string)) > 0 { // _ = os.MkdirAll(path.Join(config.Cfg.TmpPath, payload["subDataset"].(string)), os.ModePerm) // tmpFile, _ := os.Create(path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName)) // _, err = io.Copy(tmpFile, dec) // // reqUrl := fmt.Sprintf("http://localhost:%s/%s", issue.DstPort, issue.HttpUrl) // response, err := http.Post(reqUrl, "multipart/form-data", dec) // if err != nil { // config.Logger.With(zap.String("源文件名", f.FileName)). // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). // Error("文件提交", zap.Error(err)) // } // defer func() { // _ = response.Body.Close() // config.Logger.With(zap.String("源文件名", f.FileName)). // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). // Info("模型识别") // }() // body, err := io.ReadAll(response.Body) // if err != nil { // config.Logger.With(zap.String("源文件名", f.FileName)). // With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))). // Error("模型识别", zap.Error(err)) // } // cli := GetMqClient("task-response", 1) // ap := cli.EndPoint.(hpds_node.AccessPoint) // res := new(InstructionReq) // res.Command = TaskResponse // res.Payload = body // pData, _ := json.Marshal(res) // _ = GenerateAndSendData(ap, pData) // } // if len(payload["inPath"].(string)) > 0 { // outPath := "" // for k, v := range issue.Volumes { // if v == payload["outPath"].(string) { // outPath = k // break // } // } // //创建一个监控对象 // watch, err := fsnotify.NewWatcher() // if err != nil { // config.Logger.Error("创建文件监控", zap.Error(err)) // } // defer func(watch *fsnotify.Watcher) { // _ = watch.Close() // }(watch) // // err = watch.Add(outPath) // if err != nil { // config.Logger.With(zap.String("监控目录", outPath)). // Error("创建文件监控", zap.Error(err)) // } // for k, v := range issue.Volumes { // if v == payload["inPath"].(string) { // _ = os.MkdirAll(k, os.ModePerm) // tmpFile, _ := os.Create(path.Join(k, f.FileName)) // _, err = io.Copy(tmpFile, dec) // break // } // } // list := arraylist.New() // empty // t1 := time.NewTicker(1 * time.Second) // go func() { // for { // select { // case ev := <-watch.Events: // { // //判断事件发生的类型,如下5种 // // Create 创建 // // Write 写入 // // Remove 删除 // // Rename 重命名 // // Chmod 修改权限 // if ev.Op&fsnotify.Create == fsnotify.Create { // config.Logger.Info("创建文件", zap.String("文件名", ev.Name)) // list.Add(ev.Name) // } // } // case <-t1.C: // { // if list.Size() > 0 { // returnFileHandleResult(list, payload, issue) // } // } // case err = <-watch.Errors: // { // config.Logger.With(zap.String("监控目录", outPath)). // Error("文件监控", zap.Error(err)) // return // } // } // } // }() // } //} } //} } } func returnFileHandleResult(list *arraylist.List, payload map[string]interface{}, issue *docker.ContainerStore) { var ( mu sync.RWMutex wgp sync.WaitGroup resTime time.Duration ) mu.Lock() defer mu.Unlock() startTime := time.Now() for i := 0; i < list.Size(); i++ { if fn, ok := list.Get(0); ok { if utils.PathExists(fn.(string)) { wgp.Add(1) go func() { mr := new(proto.ModelResult) src := utils.ReadFile(fn.(string)) if src != nil { mr.File = base64.StdEncoding.EncodeToString(src) mr.TaskCode = utils.GetFileName(fn.(string)) mr.TaskId = int64(payload["taskId"].(float64)) mr.FileName = utils.GetFileNameAndExt(fn.(string)) mr.DatasetName = payload["datasetName"].(string) mr.SubDataset = payload["subDataset"].(string) mr.FileMd5 = utils.GetFileMd5(src) mr.ModelId = int64(payload["modelId"].(float64)) mr.NodeId = int64(payload["nodeId"].(float64)) mr.StartTime = startTime.Unix() mr.FinishTime = time.Now().Unix() cli := GetMqClient("task-response", 1) ap := cli.EndPoint.(hpds_node.AccessPoint) res := new(InstructionReq) res.Command = TaskResponse res.Payload = mr pData, _ := json.Marshal(res) _ = GenerateAndSendData(ap, pData) } wg.Done() }() wg.Wait() resTime = time.Since(startTime) config.Logger.Info("返回任务完成", zap.String("文件名", fn.(string)), zap.Duration("运行时间", resTime), ) } } } }