package service import ( "context" "encoding/json" "fmt" "git.hpds.cc/Component/logging" "git.hpds.cc/pavement/hpds_node" "hpds-iot-web/internal/proto" "hpds-iot-web/model" "hpds-iot-web/mq" "net/http" "time" "xorm.io/xorm" ) type TaskService interface { TaskList(ctx context.Context, req proto.TaskRequest) (rsp *proto.BaseResponse, err error) AddTask(ctx context.Context, req proto.TaskItemRequest) (rsp *proto.BaseResponse, err error) //EditTask(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error) } func NewTaskService(engine *xorm.Engine, logger *logging.Logger) TaskService { return &repo{ engine: engine, logger: logger, } } func (rp *repo) TaskList(ctx context.Context, req proto.TaskRequest) (rsp *proto.BaseResponse, err error) { rsp = new(proto.BaseResponse) select { case <-ctx.Done(): err = fmt.Errorf("超时/取消") rsp.Code = http.StatusInternalServerError rsp.Status = http.StatusText(http.StatusInternalServerError) rsp.Message = "超时/取消" rsp.Err = ctx.Err() return rsp, ctx.Err() default: data := make([]proto.TaskDetail, 0) count, err := rp.engine.Table("task").Alias("t"). Join("inner", []string{"model", "m"}, "t.model_id = m.model_id"). Join("inner", []string{"node", "n"}, "t.node_id = n.node_id"). Cols("t.*", "m.model_name", "n.node_name"). Where("(? = 0 or m.biz_type = ?)", req.BizType, req.BizType). And("(?='' or task_name like ?)", req.TaskName, "%"+req.TaskName+"%"). And("t.start_time >= unix_timestamp(?)", req.StartTime). And("? = 0 or t.start_time <= unix_timestamp(?)", req.FinishTime, req.FinishTime). And("t.status > 0").Limit(int(req.Size), int(((req.Page)-1)*req.Size)). FindAndCount(&data) if err != nil { goto ReturnPoint } rsp.Code = http.StatusOK rsp.Status = http.StatusText(http.StatusOK) rsp.Message = "成功" rsp = FillPaging(count, req.Page, req.Size, data, rsp) rsp.Err = err return rsp, err } ReturnPoint: if err != nil { rsp.Code = http.StatusInternalServerError rsp.Status = http.StatusText(http.StatusInternalServerError) rsp.Err = err rsp.Message = "失败" } return rsp, err } func (rp *repo) AddTask(ctx context.Context, req proto.TaskItemRequest) (rsp *proto.BaseResponse, err error) { rsp = new(proto.BaseResponse) select { case <-ctx.Done(): err = fmt.Errorf("超时/取消") rsp.Code = http.StatusInternalServerError rsp.Status = http.StatusText(http.StatusInternalServerError) rsp.Message = "超时/取消" rsp.Err = ctx.Err() return rsp, ctx.Err() default: var h bool m := new(model.Model) h, err = rp.engine.ID(req.ModelId).Get(m) if err != nil { goto ReturnPoint } if !h { err = fmt.Errorf("未能找到对应的模型") goto ReturnPoint } ds := new(model.Dataset) h, err = rp.engine.ID(req.ModelId).Get(ds) if err != nil { goto ReturnPoint } if !h { err = fmt.Errorf("未能找到对应的数据集") goto ReturnPoint } item := &model.Task{ ModelId: req.ModelId, NodeId: req.NodeId, TaskName: req.TaskName, TaskDesc: req.TaskDesc, DatasetArr: fmt.Sprintf("%d", req.DatasetArr), SubDataset: req.SubDataset, SubDataTag: req.SubDataTag, AppointmentTime: req.AppointmentTime, Status: 1, CreateAt: time.Now().Unix(), UpdateAt: time.Now().Unix(), } if len(req.AppointmentTime) > 0 { var appTime time.Time appTime, err = time.ParseInLocation("2006-01-02 15:04:05", req.AppointmentTime, time.Local) if err != nil { err = fmt.Errorf("时间格式不匹配") goto ReturnPoint } item.StartTime = appTime.Unix() } else { item.StartTime = time.Now().Unix() } _, err = rp.engine.Insert(item) if err != nil { goto ReturnPoint } //reg, _ := regexp.Compile("\\[.*?\\]") //if ok := reg.FindAll([]byte(item.ResultStorage), 2); len(ok) > 0 { // item.ResultStorage = reg.ReplaceAllString(item.ResultStorage, fmt.Sprintf("%d_%d", item.TaskId, item.ModelId)) // _, err = rp.engine.ID(item.TaskId).Cols("result_storage").Update(item) // if err != nil { // goto ReturnPoint // } //} payload := make(map[string]interface{}) payload["taskId"] = item.TaskId payload["modelId"] = item.ModelId payload["modelVersion"] = m.ModelVersion payload["modelCommand"] = m.ModelCommand payload["nodeId"] = item.NodeId payload["inPath"] = m.InPath payload["outPath"] = m.OutPath payload["httpUrl"] = m.HttpUrl payload["datasetArr"] = item.DatasetArr payload["datasetPath"] = ds.StoreName payload["datasetName"] = ds.DatasetName payload["subDataset"] = item.SubDataset payload["subDataTag"] = item.SubDataTag payload["workflow"] = m.Workflow mqClient := mq.GetMqClient("task-request", 1) mqPayload := &mq.InstructionReq{ Command: mq.TaskAdd, Payload: payload, } pData, _ := json.Marshal(mqPayload) err = mq.GenerateAndSendData(mqClient.EndPoint.(hpds_node.AccessPoint), pData, rp.logger) rsp.Code = http.StatusOK rsp.Status = http.StatusText(http.StatusOK) rsp.Message = "新增任务成功" rsp.Err = ctx.Err() rsp.Data = item return rsp, err } ReturnPoint: if err != nil { rsp.Code = http.StatusInternalServerError rsp.Status = http.StatusText(http.StatusInternalServerError) rsp.Err = err rsp.Message = "失败" } return rsp, err }