hpds_jkw_web/internal/service/task.go

179 lines
5.3 KiB
Go

package service
import (
"context"
"encoding/json"
"fmt"
"git.hpds.cc/Component/logging"
"git.hpds.cc/pavement/hpds_node"
"hpds-iot-web/internal/proto"
"hpds-iot-web/model"
"hpds-iot-web/mq"
"net/http"
"time"
"xorm.io/xorm"
)
type TaskService interface {
TaskList(ctx context.Context, req proto.TaskRequest) (rsp *proto.BaseResponse, err error)
AddTask(ctx context.Context, req proto.TaskItemRequest) (rsp *proto.BaseResponse, err error)
//EditTask(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error)
}
func NewTaskService(engine *xorm.Engine, logger *logging.Logger) TaskService {
return &repo{
engine: engine,
logger: logger,
}
}
func (rp *repo) TaskList(ctx context.Context, req proto.TaskRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
data := make([]proto.TaskDetail, 0)
count, err := rp.engine.Table("task").Alias("t").
Join("inner", []string{"model", "m"}, "t.model_id = m.model_id").
Join("inner", []string{"node", "n"}, "t.node_id = n.node_id").
Cols("t.*", "m.model_name", "n.node_name").
Where("(? = 0 or m.biz_type = ?)", req.BizType, req.BizType).
And("(?='' or task_name like ?)", req.TaskName, "%"+req.TaskName+"%").
And("t.start_time >= unix_timestamp(?)", req.StartTime).
And("? = 0 or t.start_time <= unix_timestamp(?)", req.FinishTime, req.FinishTime).
And("t.status > 0").Limit(int(req.Size), int(((req.Page)-1)*req.Size)).
FindAndCount(&data)
if err != nil {
goto ReturnPoint
}
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "成功"
rsp = FillPaging(count, req.Page, req.Size, data, rsp)
rsp.Err = err
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}
func (rp *repo) AddTask(ctx context.Context, req proto.TaskItemRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
var h bool
m := new(model.Model)
h, err = rp.engine.ID(req.ModelId).Get(m)
if err != nil {
goto ReturnPoint
}
if !h {
err = fmt.Errorf("未能找到对应的模型")
goto ReturnPoint
}
ds := new(model.Dataset)
h, err = rp.engine.ID(req.ModelId).Get(ds)
if err != nil {
goto ReturnPoint
}
if !h {
err = fmt.Errorf("未能找到对应的数据集")
goto ReturnPoint
}
item := &model.Task{
ModelId: req.ModelId,
NodeId: req.NodeId,
TaskName: req.TaskName,
TaskDesc: req.TaskDesc,
DatasetArr: fmt.Sprintf("%d", req.DatasetArr),
SubDataset: req.SubDataset,
SubDataTag: req.SubDataTag,
AppointmentTime: req.AppointmentTime,
Status: 1,
CreateAt: time.Now().Unix(),
UpdateAt: time.Now().Unix(),
}
if len(req.AppointmentTime) > 0 {
var appTime time.Time
appTime, err = time.ParseInLocation("2006-01-02 15:04:05", req.AppointmentTime, time.Local)
if err != nil {
err = fmt.Errorf("时间格式不匹配")
goto ReturnPoint
}
item.StartTime = appTime.Unix()
} else {
item.StartTime = time.Now().Unix()
}
_, err = rp.engine.Insert(item)
if err != nil {
goto ReturnPoint
}
//reg, _ := regexp.Compile("\\[.*?\\]")
//if ok := reg.FindAll([]byte(item.ResultStorage), 2); len(ok) > 0 {
// item.ResultStorage = reg.ReplaceAllString(item.ResultStorage, fmt.Sprintf("%d_%d", item.TaskId, item.ModelId))
// _, err = rp.engine.ID(item.TaskId).Cols("result_storage").Update(item)
// if err != nil {
// goto ReturnPoint
// }
//}
payload := make(map[string]interface{})
payload["taskId"] = item.TaskId
payload["modelId"] = item.ModelId
payload["modelVersion"] = m.ModelVersion
payload["modelCommand"] = m.ModelCommand
payload["nodeId"] = item.NodeId
payload["inPath"] = m.InPath
payload["outPath"] = m.OutPath
payload["httpUrl"] = m.HttpUrl
payload["datasetArr"] = item.DatasetArr
payload["datasetPath"] = ds.StoreName
payload["datasetName"] = ds.DatasetName
payload["subDataset"] = item.SubDataset
payload["subDataTag"] = item.SubDataTag
payload["workflow"] = m.Workflow
mqClient := mq.GetMqClient("task-request", 1)
mqPayload := &mq.InstructionReq{
Command: mq.TaskAdd,
Payload: payload,
}
pData, _ := json.Marshal(mqPayload)
err = mq.GenerateAndSendData(mqClient.EndPoint.(hpds_node.AccessPoint), pData, rp.logger)
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "新增任务成功"
rsp.Err = ctx.Err()
rsp.Data = item
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}