taskExecute/mq/handler.go

402 lines
12 KiB
Go
Raw Normal View History

2023-03-23 14:35:24 +08:00
package mq
import (
"bufio"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"git.hpds.cc/pavement/hpds_node"
"github.com/emirpasic/gods/lists/arraylist"
"github.com/fsnotify/fsnotify"
"github.com/gammazero/workerpool"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/shirou/gopsutil/v3/host"
"go.uber.org/zap"
"io"
"net/http"
"os"
"os/exec"
"path"
"strconv"
"strings"
"sync"
"taskExecute/config"
"taskExecute/pkg/compress"
"taskExecute/pkg/docker"
"taskExecute/pkg/download"
"taskExecute/pkg/utils"
"taskExecute/proto"
"time"
)
var (
wg sync.WaitGroup
TaskList map[string]docker.ContainerStore
)
func TaskExecuteHandler(data []byte) (byte, []byte) {
fmt.Println("接收数据", string(data))
cmd := new(InstructionReq)
err := json.Unmarshal(data, cmd)
if err != nil {
return 0x0B, []byte(err.Error())
}
switch cmd.Command {
case TaskExecute:
//任务执行
waitWorkerStartFinish(config.WorkPool, cmd.Payload.(map[string]interface{}), ModelTaskExecuteHandler)
case ModelIssueRepeater:
//模型下发
waitWorkerStartFinish(config.WorkPool, cmd.Payload.(map[string]interface{}), ModelIssueRepeaterHandler)
}
return byte(cmd.Command), nil
}
func waitWorkerStartFinish(wp *workerpool.WorkerPool, payload map[string]interface{}, f func(payload map[string]interface{})) {
startStop := make(chan time.Time, 2)
wp.Submit(func() {
startStop <- time.Now()
f(payload)
startStop <- time.Now()
})
fmt.Println("Task started at:", <-startStop)
fmt.Println("Task finished at:", <-startStop)
}
// execCommand 执行命令
func execCommandWait(commandName string, params []string) bool {
cmd := exec.Command(commandName, params...)
//显示运行的命令
fmt.Println(cmd.Args)
stdout, err := cmd.StdoutPipe()
if err != nil {
fmt.Println(err)
return false
}
_ = cmd.Start()
reader := bufio.NewReader(stdout)
//实时循环读取输出流中的一行内容
for {
wg.Add(1)
line, err2 := reader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
config.Logger.Info("执行命令",
zap.String("execCommandWait", line))
}
_ = cmd.Wait()
wg.Done()
return true
}
func ModelIssueRepeaterHandler(payload map[string]interface{}) {
hi, _ := host.Info()
if payload["nodeGuid"].(string) == hi.HostID {
fileUrl := payload["dockerFile"].(string)
modelVersion := payload["modelVersion"].(string)
downFileName := path.Base(fileUrl)
//判断文件后缀名
fileType := path.Ext(downFileName)
fileNameOnly := strings.TrimSuffix(downFileName, fileType)
dFile := path.Join(config.Cfg.TmpPath, fileNameOnly, downFileName)
//执行文件下载
controller := download.ThreadController{
ThreadCount: download.ThreadCount,
FileUrl: fileUrl,
DownloadFolder: dFile,
DownloadFileName: downFileName,
Logger: config.Logger,
}
controller.Download(download.OneThreadDownloadSize)
if strings.ToLower(fileType) == ".zip" {
err := compress.UnzipFromFile(path.Join(config.Cfg.TmpPath, fileNameOnly), dFile)
if err != nil {
controller.Logger.With(zap.String("文件解压缩", path.Join(config.Cfg.TmpPath, downFileName))).
Error("发生错误", zap.Error(err))
return
}
dFile = path.Join(config.Cfg.TmpPath, fileNameOnly, fileNameOnly+".tar")
}
//docker 导入并运行
imgName := fmt.Sprintf("%s:%s", fileNameOnly, modelVersion)
if strings.ToLower(path.Ext(dFile)) == ".tar" {
dCli := docker.NewDockerClient()
err := dCli.ImportImage(imgName, "latest", dFile)
//err = dCli.LoadImage(dFile)
if err != nil {
controller.Logger.With(zap.String("导入docker的文件", dFile)).
Error("发生错误", zap.Error(err))
}
//设置data目录
dataPath := path.Join(config.Cfg.TmpPath, fileNameOnly, "data")
_ = os.MkdirAll(dataPath, os.ModePerm)
vol := make(map[string]string)
vol[path.Join(dataPath, payload["inPath"].(string))] = payload["inPath"].(string)
vol[path.Join(dataPath, payload["outPath"].(string))] = payload["outPath"].(string)
//docker运行
modelCommand := strings.Split(payload["modelCommand"].(string), " ")
dstPort := dCli.CreateContainer(fileNameOnly, imgName, modelCommand, vol, strconv.Itoa(payload["mappedPort"].(int)))
//保存到本地临时文件
item := docker.ContainerStore{
ModelId: payload["modelId"].(int64),
NodeId: payload["nodeId"].(int64),
Name: fileNameOnly,
ImgName: imgName,
Volumes: vol,
SrcPort: strconv.Itoa(payload["mappedPort"].(int)),
DstPort: dstPort,
Command: modelCommand,
HttpUrl: payload["httpUrl"].(string),
}
docker.ContainerList = append(docker.ContainerList, item)
docker.SaveStore()
cli := GetMqClient("task-response", 1)
ap := cli.EndPoint.(hpds_node.AccessPoint)
res := new(InstructionReq)
res.Command = ModelIssueResponse
res.Payload = item
pData, _ := json.Marshal(res)
_ = GenerateAndSendData(ap, pData)
}
}
}
func ModelTaskExecuteHandler(payload map[string]interface{}) {
hi, _ := host.Info()
if payload["nodeGuid"] == hi.HostID {
if len(payload["subDataset"].(string)) > 0 {
sf := hpds_node.NewStreamFunction(
payload["subDataset"].(string),
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", config.Cfg.Node.Host, config.Cfg.Node.Port)),
hpds_node.WithObserveDataTags(payload["subDataTag"].(byte)),
hpds_node.WithCredential(config.Cfg.Node.Token),
)
err := sf.Connect()
must(config.Logger, err)
nodeInfo := HpdsMqNode{
MqType: 2,
Topic: payload["subDataset"].(string),
Node: config.Cfg.Node,
EndPoint: sf,
}
_ = sf.SetHandler(func(data []byte) (byte, []byte) {
//查询docker是否已经开启
issue := new(docker.ContainerStore)
_ = json.Unmarshal([]byte(payload["issueResult"].(string)), issue)
dCli := docker.NewDockerClient()
cList, err := dCli.SearchImage(issue.Name)
if err != nil {
}
if len(cList) > 0 {
if len(payload["workflow"].(string)) > 0 {
//是否设置工作流程
wf := new(Workflow)
err = json.Unmarshal([]byte(payload["workflow"].(string)), wf)
if err != nil {
}
if len(payload["datasetPath"].(string)) > 0 {
//数据集处理
opt := &minio.Options{
Creds: credentials.NewStaticV4(config.Cfg.Minio.AccessKeyId, config.Cfg.Minio.SecretAccessKey, ""),
Secure: false,
}
cli, _ := minio.New(config.Cfg.Minio.Endpoint, opt)
doneCh := make(chan struct{})
defer close(doneCh)
objectCh := cli.ListObjects(context.Background(), config.Cfg.Minio.Bucket, minio.ListObjectsOptions{
Prefix: payload["datasetPath"].(string),
Recursive: true,
})
for object := range objectCh {
file, _ := cli.GetObject(context.Background(), config.Cfg.Minio.Bucket, object.Key, minio.GetObjectOptions{})
imgByte, _ := io.ReadAll(file)
f := proto.FileCapture{
FileName: object.Key,
File: base64.StdEncoding.EncodeToString(imgByte),
DatasetName: payload["datasetName"].(string),
CaptureTime: object.LastModified.Unix(),
}
ProcessWorkflow(payload, f, wf)
}
}
} else {
f := new(proto.FileCapture)
err := json.Unmarshal(data, f)
if err != nil {
}
if len(f.File) > 0 {
i := strings.Index(f.File, ",")
dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(f.File[i+1:]))
if len(payload["httpUrl"].(string)) > 0 {
_ = os.MkdirAll(path.Join(config.Cfg.TmpPath, payload["subDataset"].(string)), os.ModePerm)
tmpFile, _ := os.Create(path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))
_, err = io.Copy(tmpFile, dec)
reqUrl := fmt.Sprintf("http://localhost:%s/%s", issue.DstPort, issue.HttpUrl)
response, err := http.Post(reqUrl, "multipart/form-data", dec)
if err != nil {
config.Logger.With(zap.String("源文件名", f.FileName)).
With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))).
Error("文件提交", zap.Error(err))
}
defer func() {
_ = response.Body.Close()
config.Logger.With(zap.String("源文件名", f.FileName)).
With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))).
Info("模型识别")
}()
body, err := io.ReadAll(response.Body)
if err != nil {
config.Logger.With(zap.String("源文件名", f.FileName)).
With(zap.String("临时文件名", path.Join(config.Cfg.TmpPath, payload["subDataset"].(string), f.FileName))).
Error("模型识别", zap.Error(err))
}
cli := GetMqClient("task-response", 1)
ap := cli.EndPoint.(hpds_node.AccessPoint)
res := new(InstructionReq)
res.Command = TaskResponse
res.Payload = body
pData, _ := json.Marshal(res)
_ = GenerateAndSendData(ap, pData)
}
if len(payload["inPath"].(string)) > 0 {
outPath := ""
for k, v := range issue.Volumes {
if v == payload["outPath"].(string) {
outPath = k
break
}
}
//创建一个监控对象
watch, err := fsnotify.NewWatcher()
if err != nil {
config.Logger.Error("创建文件监控", zap.Error(err))
}
defer func(watch *fsnotify.Watcher) {
_ = watch.Close()
}(watch)
err = watch.Add(outPath)
if err != nil {
config.Logger.With(zap.String("监控目录", outPath)).
Error("创建文件监控", zap.Error(err))
}
for k, v := range issue.Volumes {
if v == payload["inPath"].(string) {
_ = os.MkdirAll(k, os.ModePerm)
tmpFile, _ := os.Create(path.Join(k, f.FileName))
_, err = io.Copy(tmpFile, dec)
break
}
}
list := arraylist.New() // empty
t1 := time.NewTicker(1 * time.Second)
go func() {
for {
select {
case ev := <-watch.Events:
{
//判断事件发生的类型如下5种
// Create 创建
// Write 写入
// Remove 删除
// Rename 重命名
// Chmod 修改权限
if ev.Op&fsnotify.Create == fsnotify.Create {
config.Logger.Info("创建文件", zap.String("文件名", ev.Name))
list.Add(ev.Name)
}
}
case <-t1.C:
{
if list.Size() > 0 {
returnFileHandleResult(list, payload, issue)
}
}
case err = <-watch.Errors:
{
config.Logger.With(zap.String("监控目录", outPath)).
Error("文件监控", zap.Error(err))
return
}
}
}
}()
}
}
}
}
return payload["subDataTag"].(byte), nil
})
MqList = append(MqList, nodeInfo)
}
}
}
func returnFileHandleResult(list *arraylist.List, payload map[string]interface{}, issue *docker.ContainerStore) {
var (
mu sync.RWMutex
wgp sync.WaitGroup
resTime time.Duration
)
mu.Lock()
defer mu.Unlock()
startTime := time.Now()
for i := 0; i < list.Size(); i++ {
if fn, ok := list.Get(0); ok {
if utils.PathExists(fn.(string)) {
wgp.Add(1)
go func() {
mr := new(proto.ModelResult)
src := utils.ReadFile(fn.(string))
if src != nil {
mr.File = base64.StdEncoding.EncodeToString(src)
mr.TaskCode = utils.GetFileName(fn.(string))
mr.TaskId = int64(payload["taskId"].(float64))
mr.FileName = utils.GetFileNameAndExt(fn.(string))
mr.DatasetName = payload["datasetName"].(string)
mr.SubDataset = payload["subDataset"].(string)
mr.FileMd5 = utils.GetFileMd5(src)
mr.ModelId = int64(payload["modelId"].(float64))
mr.NodeId = int64(payload["nodeId"].(float64))
mr.StartTime = startTime.Unix()
mr.FinishTime = time.Now().Unix()
cli := GetMqClient("task-response", 1)
ap := cli.EndPoint.(hpds_node.AccessPoint)
res := new(InstructionReq)
res.Command = TaskResponse
res.Payload = mr
pData, _ := json.Marshal(res)
_ = GenerateAndSendData(ap, pData)
}
wg.Done()
}()
wg.Wait()
resTime = time.Since(startTime)
config.Logger.Info("返回任务完成",
zap.String("文件名", fn.(string)),
zap.Duration("运行时间", resTime),
)
}
}
}
}