From 58b0f2c4c6de372aa53681b7c29b62a85c360340 Mon Sep 17 00:00:00 2001 From: wangjian Date: Thu, 23 Mar 2023 18:03:09 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E5=A2=9E=E5=8A=A0=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=8C=87=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/server.go | 17 +-- config/config-dev.yaml | 2 +- config/config-prod.yaml | 38 +++++++ config/config.go | 27 +++-- config/config.yaml | 8 +- go.mod | 15 +++ internal/handler/disease.go | 24 +++++ internal/handler/model.go | 37 ++++++- internal/handler/system.go | 84 +++++++++++++++ internal/handler/task.go | 40 +++++++ internal/handler/user.go | 16 +++ internal/proto/request.go | 176 +++++++++++++++++++++++++++++-- internal/proto/response.go | 57 ++++++++++ internal/router/router.go | 24 +++++ internal/service/dataset.go | 2 + internal/service/disease.go | 37 +++++++ internal/service/index.go | 40 ++++++- internal/service/model.go | 168 +++++++++++++++++++++++++++++- internal/service/system.go | 247 ++++++++++++++++++++++++++++++++++++++++++++ internal/service/task.go | 178 +++++++++++++++++++++++++++++++ model/dataset.go | 1 + model/disease.go | 1 + model/index.go | 14 ++- model/model.go | 9 ++ model/modelIssue.go | 11 ++ model/node.go | 28 +++-- model/nodeState.go | 21 ++++ model/systemUser.go | 1 + model/task.go | 18 ++++ model/taskResult.go | 15 +++ mq/index.go | 93 +++++++++++++++++ mq/instruction.go | 11 ++ 32 files changed, 1419 insertions(+), 41 deletions(-) create mode 100644 config/config-prod.yaml create mode 100644 internal/handler/task.go create mode 100644 internal/service/task.go create mode 100644 model/modelIssue.go create mode 100644 model/nodeState.go create mode 100644 model/task.go create mode 100644 model/taskResult.go create mode 100644 mq/index.go create mode 100644 mq/instruction.go diff --git a/cmd/server.go b/cmd/server.go index 2e9e6fc..b6cc915 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -8,6 +8,7 @@ import ( "hpds-iot-web/config" router2 "hpds-iot-web/internal/router" "hpds-iot-web/model" + "hpds-iot-web/mq" discover "hpds-iot-web/pkg/discover/consul" "net/http" "os" @@ -27,7 +28,7 @@ var ( func must(err error) { if err != nil { - fmt.Fprint(os.Stderr, err) + _, _ = fmt.Fprint(os.Stderr, err) os.Exit(1) } } @@ -79,24 +80,28 @@ func NewStartCmd() *cobra.Command { //创建注册对象 tags := make([]string, 1) tags[0] = "web" - consulCfg, err := discover.NewConsulConfig(cfg.Consul.Host, cfg.Name, cfg.Name, cfg.Consul.Host, cfg.Consul.Port, + consulCfg, err := discover.NewConsulConfig(fmt.Sprintf("%s:%d", cfg.Consul.Host, cfg.Consul.Port), cfg.Name, cfg.Name, cfg.Consul.Host, cfg.Consul.Port, tags, 300, 300, 300) must(err) //连接数据库 - model.New(cfg.Db.DriveName, cfg.Db.Conn) + model.New(cfg.Db.DriveName, cfg.Db.Conn, cfg.Mode == "dev") //连接redis model.NewCache(cfg.Cache) logger := LoadLoggerConfig(cfg.Logging) + //创建消息连接点 + mq.MqList, err = mq.NewMqClient(cfg.Funcs, cfg.Node, logger) + must(err) + // 退出channel exitChannel := make(chan os.Signal) defer close(exitChannel) // 退出信号监听 go func(c chan os.Signal) { - consulCfg.ServiceDeregister() + _ = consulCfg.ServiceDeregister() signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) }(exitChannel) router := router2.InitRouter(cfg, logger, model.DB) @@ -116,13 +121,13 @@ func NewStartCmd() *cobra.Command { //zap.L().Error("发生错误", zap.Error(err)) select { case <-ctx.Done(): - consulCfg.ServiceDeregister() + _ = consulCfg.ServiceDeregister() logger.With( zap.String("web", "exit"), ).Error(ctx.Err().Error()) return case errs := <-exitChannel: - consulCfg.ServiceDeregister() + _ = consulCfg.ServiceDeregister() logger.With( zap.String("web", "服务退出"), ).Info(errs.String()) diff --git a/config/config-dev.yaml b/config/config-dev.yaml index def5d1f..53802df 100644 --- a/config/config-dev.yaml +++ b/config/config-dev.yaml @@ -26,7 +26,7 @@ consul: timeout: 5 deregister: 1 db: - conn: root:OIxv7QptYBO3@tcp(192.168.0.200:3306)/hpds_jky?charset=utf8mb4 + conn: root:OIxv7QptYBO3@tcp(114.55.236.153:27136)/hpds_jky?charset=utf8mb4 drive_name: mysql cache: host: 192.168.0.200 diff --git a/config/config-prod.yaml b/config/config-prod.yaml new file mode 100644 index 0000000..32779ba --- /dev/null +++ b/config/config-prod.yaml @@ -0,0 +1,38 @@ +name: web +host: 0.0.0.0 +port: 8088 +mode: dev +logging: + path: ./logs + prefix: hpds-iot-web + errorFileSuffix: error.log + warnFileSuffix: warn.log + infoFileSuffix: info.log + debugFileSuffix: debug.log + maxSize: 100 + maxBackups: 3000 + maxAge: 30 + development: true +mineData: + accessKey: f0bda738033e47ffbfbd5d3f865c19e1 +minio: + protocol: http + endpoint: 127.0.0.1:9000 + accessKeyId: root + secretAccessKey: OIxv7QptYBO3 +consul: + host: http://consul.hpds.cc + port: 80 + interval: 300 + timeout: 5 + deregister: 1 +db: + conn: root:OIxv7QptYBO3@tcp(114.55.236.153:23306)/diagnostic_platform?charset=utf8mb4 + drive_name: mysql +cache: + host: 127.0.0.1 + port: 6379 + db: 0 + pool_size: 10 +functions: + - name: web-sf \ No newline at end of file diff --git a/config/config.go b/config/config.go index e47f9b9..8be8716 100644 --- a/config/config.go +++ b/config/config.go @@ -1,7 +1,6 @@ package config import ( - "bytes" "fmt" "github.com/spf13/viper" "os" @@ -22,6 +21,8 @@ type WebConfig struct { Logging LogOptions `yaml:"logging"` Minio MinioConfig `yaml:"minio"` MineData MineDataConfig `yaml:"mineData"` + Node HpdsNode `yaml:"node,omitempty"` + Funcs []FuncConfig `yaml:"functions,omitempty"` } type ConsulConfig struct { Host string `yaml:"host,omitempty"` @@ -69,6 +70,18 @@ type MineDataConfig struct { AccessKey string `yaml:"accessKey"` } +type HpdsNode struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Token string `yaml:"token,omitempty"` +} + +type FuncConfig struct { + Name string `yaml:"name"` + DataTag uint8 `yaml:"dataTag"` + MqType uint `yaml:"mqType"` //消息类型, 发布,1;订阅;2 +} + func ParseConfigByFile(path string) (cfg *WebConfig, err error) { buffer, err := os.ReadFile(path) if err != nil { @@ -81,11 +94,13 @@ func load(buf []byte) (cfg *WebConfig, err error) { cViper := viper.New() cViper.SetConfigType("yaml") cfg = new(WebConfig) - cViper.ReadConfig(bytes.NewBuffer(buf)) - err = cViper.Unmarshal(cfg) - if err != nil { - return nil, err - } + cfg.Funcs = make([]FuncConfig, 0) + //cViper.ReadConfig(bytes.NewBuffer(buf)) + err = yaml.Unmarshal(buf, cfg) + //err = cViper.Unmarshal(cfg) + //if err != nil { + // return nil, err + //} return } diff --git a/config/config.yaml b/config/config.yaml index bc8e98c..02cd19b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -34,5 +34,11 @@ cache: port: 6379 db: 0 pool_size: 10 +node: + host: 127.0.0.1 + port: 27188 + token: 06d36c6f5705507dae778fdce90d0767 functions: - - name: web-sf \ No newline at end of file + - name: task-request + dataTag : 12 + mqType: 1 \ No newline at end of file diff --git a/go.mod b/go.mod index 352d0ac..1ae3bc1 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 require ( git.hpds.cc/Component/gin_valid v0.0.0-20230104142509-f956bce255b6 git.hpds.cc/Component/logging v0.0.0-20230106105738-e378e873921b + git.hpds.cc/pavement/hpds_node v0.0.0-20230307094826-753c4fe9c877 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/gin-contrib/zap v0.1.0 github.com/gin-gonic/gin v1.8.2 @@ -25,6 +26,8 @@ require ( cloud.google.com/go/compute v1.12.1 // indirect cloud.google.com/go/compute/metadata v0.2.1 // indirect cloud.google.com/go/firestore v1.8.0 // indirect + git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect + git.hpds.cc/Component/network v0.0.0-20221012021659-2433c68452d5 // indirect github.com/armon/go-metrics v0.4.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect @@ -35,8 +38,10 @@ require ( github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-playground/validator/v10 v10.11.1 // indirect + github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.5.9 // indirect @@ -55,7 +60,11 @@ require ( github.com/klauspost/compress v1.15.9 // indirect github.com/klauspost/cpuid/v2 v2.1.0 // indirect github.com/leodido/go-urn v1.2.1 // indirect + github.com/lucas-clemente/quic-go v0.29.1 // indirect github.com/magiconair/properties v1.8.6 // indirect + github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect + github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect + github.com/matoous/go-nanoid/v2 v2.0.0 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/minio/md5-simd v1.1.2 // indirect @@ -64,6 +73,8 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nxadm/tail v1.4.8 // indirect + github.com/onsi/ginkgo v1.16.4 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -87,12 +98,15 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect + golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect + golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.4.0 // indirect golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.3.0 // indirect golang.org/x/text v0.5.0 // indirect golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect + golang.org/x/tools v0.1.12 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/api v0.102.0 // indirect google.golang.org/appengine v1.6.7 // indirect @@ -101,6 +115,7 @@ require ( google.golang.org/protobuf v1.28.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect xorm.io/builder v0.3.11-0.20220531020008-1bd24a7dc978 // indirect ) diff --git a/internal/handler/disease.go b/internal/handler/disease.go index ad28698..23f06e7 100644 --- a/internal/handler/disease.go +++ b/internal/handler/disease.go @@ -9,6 +9,30 @@ import ( e "hpds-iot-web/pkg/err" ) +func (s HandlerService) DiseaseList(c *gin.Context) (data interface{}, err error) { + repo := service.NewDiseaseService(s.AppConfig, s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.DiseaseRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("DiseaseList", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + if req.Size < 1 { + req.Size = 20 + } + if req.Size > 1000 { + req.Size = 1000 + } + if req.Page < 1 { + req.Page = 1 + } + data, err = repo.DiseaseList(c, req) + go s.SaveLog("获取病害库列表", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} + func (s HandlerService) DiseaseTypeList(c *gin.Context) (data interface{}, err error) { repo := service.NewDiseaseService(s.AppConfig, s.Engine, s.Logger) us, _ := c.Get("operatorUser") diff --git a/internal/handler/model.go b/internal/handler/model.go index f8a1c8d..ba02097 100644 --- a/internal/handler/model.go +++ b/internal/handler/model.go @@ -10,7 +10,7 @@ import ( ) func (s HandlerService) ModelList(c *gin.Context) (data interface{}, err error) { - repo := service.NewModelService(s.Engine, s.Logger) + repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger) us, _ := c.Get("operatorUser") userInfo := us.(*model.SystemUser) var req proto.ModelRequest @@ -33,7 +33,7 @@ func (s HandlerService) ModelList(c *gin.Context) (data interface{}, err error) return } func (s HandlerService) AddModel(c *gin.Context) (data interface{}, err error) { - repo := service.NewModelService(s.Engine, s.Logger) + repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger) us, _ := c.Get("operatorUser") userInfo := us.(*model.SystemUser) var req proto.ModelItemRequest @@ -47,7 +47,7 @@ func (s HandlerService) AddModel(c *gin.Context) (data interface{}, err error) { return } func (s HandlerService) EditModel(c *gin.Context) (data interface{}, err error) { - repo := service.NewModelService(s.Engine, s.Logger) + repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger) us, _ := c.Get("operatorUser") userInfo := us.(*model.SystemUser) var req proto.ModelItemRequest @@ -61,7 +61,7 @@ func (s HandlerService) EditModel(c *gin.Context) (data interface{}, err error) return } func (s HandlerService) DelModel(c *gin.Context) (data interface{}, err error) { - repo := service.NewModelService(s.Engine, s.Logger) + repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger) us, _ := c.Get("operatorUser") userInfo := us.(*model.SystemUser) var req proto.ModelItemRequest @@ -74,3 +74,32 @@ func (s HandlerService) DelModel(c *gin.Context) (data interface{}, err error) { go s.SaveLog("删除模型", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") return } + +func (s HandlerService) ModelIssue(c *gin.Context) (data interface{}, err error) { + repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.ModelIssueRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("ModelIssue", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + data, err = repo.ModelIssue(c, req) + go s.SaveLog("模型下发", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} +func (s HandlerService) GetModelWorkflow(c *gin.Context) (data interface{}, err error) { + repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.ModelItemRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("ModelIssue", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + data, err = repo.GetModelWorkflow(c, req) + go s.SaveLog("获取模型工作流", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} diff --git a/internal/handler/system.go b/internal/handler/system.go index b50e902..4031bff 100644 --- a/internal/handler/system.go +++ b/internal/handler/system.go @@ -92,3 +92,87 @@ func (s HandlerService) DeleteBrand(c *gin.Context) (data interface{}, err error go s.SaveLog("删除品牌", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") return } + +func (s HandlerService) NodeList(c *gin.Context) (data interface{}, err error) { + repo := service.NewSystemService(s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.NodeRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("NodeList", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + if req.Size < 1 { + req.Size = 20 + } + if req.Size > 100 { + req.Size = 100 + } + if req.Page < 1 { + req.Page = 1 + } + data, err = repo.NodeList(c, req) + go s.SaveLog("获取节点列表", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} + +func (s HandlerService) NodeInfo(c *gin.Context) (data interface{}, err error) { + repo := service.NewSystemService(s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.NodeItemRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("NodeInfo", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + data, err = repo.NodeInfo(c, req) + go s.SaveLog("获取节点信息", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} + +func (s HandlerService) EditNode(c *gin.Context) (data interface{}, err error) { + repo := service.NewSystemService(s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.NodeItemRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("EditNode", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + data, err = repo.EditNode(c, req) + go s.SaveLog("修改节点", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} + +func (s HandlerService) NodeState(c *gin.Context) (data interface{}, err error) { + repo := service.NewSystemService(s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.NodeInfoRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("NodeState", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + data, err = repo.NodeState(c, req) + go s.SaveLog("获取节点运行信息", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} + +func (s HandlerService) NodeLastState(c *gin.Context) (data interface{}, err error) { + repo := service.NewSystemService(s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.NodeInfoRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("NodeLastState", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + data, err = repo.NodeLastState(c, req) + go s.SaveLog("获取节点最后运行信息", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} diff --git a/internal/handler/task.go b/internal/handler/task.go new file mode 100644 index 0000000..5e73a09 --- /dev/null +++ b/internal/handler/task.go @@ -0,0 +1,40 @@ +package handler + +import ( + "fmt" + "github.com/gin-gonic/gin" + "hpds-iot-web/internal/proto" + "hpds-iot-web/internal/service" + "hpds-iot-web/model" + e "hpds-iot-web/pkg/err" +) + +func (s HandlerService) TaskList(c *gin.Context) (data interface{}, err error) { + repo := service.NewTaskService(s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.TaskRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("TaskList", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + data, err = repo.TaskList(c, req) + go s.SaveLog("任务列表", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} + +func (s HandlerService) AddTask(c *gin.Context) (data interface{}, err error) { + repo := service.NewTaskService(s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.TaskItemRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("AddTask", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + data, err = repo.AddTask(c, req) + go s.SaveLog("新增任务", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} diff --git a/internal/handler/user.go b/internal/handler/user.go index 50f0217..c6a6553 100644 --- a/internal/handler/user.go +++ b/internal/handler/user.go @@ -1,6 +1,7 @@ package handler import ( + "fmt" "github.com/gin-gonic/gin" "hpds-iot-web/internal/proto" "hpds-iot-web/internal/service" @@ -28,6 +29,21 @@ func (s HandlerService) GetUserInfo(c *gin.Context) (data interface{}, err error return } +func (s HandlerService) GetUserList(c *gin.Context) (data interface{}, err error) { + repo := service.NewUserService(s.Engine, s.Logger) + us, _ := c.Get("operatorUser") + userInfo := us.(*model.SystemUser) + var req proto.UserRequest + err = c.ShouldBindJSON(&req) + if err != nil { + go s.SaveLog("GetUserList", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return nil, e.NewValidErr(err) + } + data, err = repo.GetUserList(c, req) + go s.SaveLog("获取用户列表", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "") + return +} + func (s HandlerService) MenuList(c *gin.Context) (data interface{}, err error) { repo := service.NewUserService(s.Engine, s.Logger) us, _ := c.Get("operatorUser") diff --git a/internal/proto/request.go b/internal/proto/request.go index b7e86ba..e2268a6 100644 --- a/internal/proto/request.go +++ b/internal/proto/request.go @@ -23,6 +23,20 @@ func (us UserLogin) ToString() string { return string(data) } +type UserRequest struct { + RealName string `json:"realName"` + Phone string `json:"phone"` + BasePageList +} + +func (us UserRequest) ToString() string { + data, err := json.Marshal(us) + if err != nil { + return "" + } + return string(data) +} + type OwnerRequest struct { OwnerName string `json:"ownerName"` BasePageList @@ -301,7 +315,8 @@ func (p ServiceParamItem) ToString() string { } type ModelRequest struct { - ModelName string `json:"modelName"` + ModelName string `json:"modelName,omitempty"` + BizType int `json:"bizType,omitempty"` BasePageList } @@ -314,13 +329,22 @@ func (p ModelRequest) ToString() string { } type ModelItemRequest struct { - ModelId int `json:"modelId"` - ModelName string `json:"modelName"` - ModelVersion string `json:"modelVersion"` - ModelDesc string `json:"modelDesc"` - ModelFiles string `json:"modelFiles"` - ModelParamsFiles string `json:"modelParamsFiles"` - ModelExecScript string `json:"modelExecScript"` + ModelId int64 `json:"modelId"` + BizType int `json:"bizType"` + ModelName string `json:"modelName"` + ModelVersion string `json:"modelVersion"` + ModelDesc string `json:"modelDesc"` + ModelFiles string `json:"modelFiles"` + ModelParamsFiles string `json:"modelParamsFiles"` + ModelExecScript string `json:"modelExecScript"` + DockerFile []string `json:"dockerFile"` + MappedPort int `json:"mappedPort"` + ModelCommand string `json:"modelCommand"` + InPath string `json:"inPath"` + OutPath string `json:"outPath"` + HttpUrl string `json:"httpUrl"` + IsLightWeight bool `json:"isLightWeight"` + Workflow string `json:"workflow"` } func (p ModelItemRequest) ToString() string { @@ -331,6 +355,19 @@ func (p ModelItemRequest) ToString() string { return string(data) } +type ModelIssueRequest struct { + NodeId int64 `json:"nodeId"` + ModelId int64 `json:"modelId"` +} + +func (p ModelIssueRequest) ToString() string { + data, err := json.Marshal(p) + if err != nil { + return "" + } + return string(data) +} + type DeviceRequest struct { Key string `json:"key"` ProductId int64 `json:"productId"` @@ -404,6 +441,20 @@ func (p BrandItemRequest) ToString() string { return string(data) } +type DiseaseRequest struct { + BizType int `json:"bizType"` + Key string `json:"key"` + BasePageList +} + +func (p DiseaseRequest) ToString() string { + data, err := json.Marshal(p) + if err != nil { + return "" + } + return string(data) +} + type DiseaseTypeRequest struct { CategoryId int `json:"categoryId"` Key string `json:"key"` @@ -467,6 +518,7 @@ type ImportDatasetRequest struct { DatasetName string `json:"datasetName"` DatasetDesc string `json:"datasetDesc"` ProjectId int64 `json:"projectId"` + StoreName string `json:"storeName"` OwnerId int64 `json:"ownerId"` Creator int64 `json:"creator"` } @@ -478,3 +530,111 @@ func (p ImportDatasetRequest) ToString() string { } return string(data) } + +type NodeRequest struct { + NodeId int64 `json:"nodeId"` + NodeName string `json:"nodeName"` + NodeGuid string `json:"nodeGuid"` + NodeType int `json:"nodeType"` + Platform string `json:"platform"` + PlatformVersion string `json:"platformVersion"` + CPU string `json:"cpu"` + MemTotal uint64 `json:"memTotal"` + DiskTotal uint64 `json:"diskTotal"` + SwapTotal uint64 `json:"swapTotal"` + Arch string `json:"arch"` + Virtualization string `json:"virtualization"` + BootTime uint64 `json:"bootTime"` + IP string `json:"ip"` + CountryCode string `json:"countryCode"` + Version string `json:"version"` + CreateAt int64 `json:"createAt"` + UpdateAt int64 `json:"updateAt"` + BasePageList +} + +func (p NodeRequest) ToString() string { + data, err := json.Marshal(p) + if err != nil { + return "" + } + return string(data) +} + +type NodeItemRequest struct { + NodeId int64 `json:"nodeId"` + NodeName string `json:"nodeName"` + NodeGuid string `json:"nodeGuid"` + NodeType int `json:"nodeType"` + Platform string `json:"platform"` + PlatformVersion string `json:"platformVersion"` + CPU string `json:"cpu"` + MemTotal uint64 `json:"memTotal"` + DiskTotal uint64 `json:"diskTotal"` + SwapTotal uint64 `json:"swapTotal"` + Arch string `json:"arch"` + Virtualization string `json:"virtualization"` + BootTime uint64 `json:"bootTime"` + IP string `json:"ip"` + CountryCode string `json:"countryCode"` + Version string `json:"version"` + CreateAt int64 `json:"createAt"` + UpdateAt int64 `json:"updateAt"` +} + +func (p NodeItemRequest) ToString() string { + data, err := json.Marshal(p) + if err != nil { + return "" + } + return string(data) +} + +type NodeInfoRequest struct { + NodeGuid string `json:"nodeGuid"` + Uptime int64 `json:"uptime"` +} + +func (p NodeInfoRequest) ToString() string { + data, err := json.Marshal(p) + if err != nil { + return "" + } + return string(data) +} + +type TaskRequest struct { + BizType int `json:"bizType"` + TaskName string `json:"taskName"` + StartTime string `json:"startTime"` + FinishTime string `json:"finishTime"` + BasePageList +} + +func (p TaskRequest) ToString() string { + data, err := json.Marshal(p) + if err != nil { + return "" + } + return string(data) +} + +type TaskItemRequest struct { + TaskId int64 `json:"taskId"` + ModelId int64 `json:"modelId"` + NodeId int64 `json:"nodeId"` + TaskName string `json:"taskName"` + TaskDesc string `json:"taskDesc"` + DatasetArr int64 `json:"datasetArr"` + SubDataset string `json:"subDataset"` + SubDataTag int `json:"subDataTag"` + AppointmentTime string `json:"appointmentTime"` +} + +func (p TaskItemRequest) ToString() string { + data, err := json.Marshal(p) + if err != nil { + return "" + } + return string(data) +} diff --git a/internal/proto/response.go b/internal/proto/response.go index 8cbd9c7..d262930 100644 --- a/internal/proto/response.go +++ b/internal/proto/response.go @@ -1,5 +1,7 @@ package proto +import "hpds-iot-web/model" + // BaseResponse 基础返回结构 type BaseResponse struct { Code int `json:"code"` @@ -56,6 +58,7 @@ type DatasetItem struct { DatasetId int64 `json:"datasetId"` DatasetName string `json:"datasetName"` DatasetDesc string `json:"datasetDesc"` + StoreName string `json:"storeName"` CategoryId int `json:"categoryId"` ProjectId int64 `json:"projectId"` OwnerId int64 `json:"ownerId"` @@ -63,3 +66,57 @@ type DatasetItem struct { CreateAt int64 `json:"createAt"` DatasetCount int64 `json:"datasetCount"` } + +type NodeState struct { + List []model.NodeState `json:"list"` +} + +type NodeLastState struct { + List []NodeLastStateItem `json:"list"` +} +type NodeLastStateItem struct { + NodeId int64 `json:"nodeId"` + NodeGuid string `json:"nodeGuid"` + NodeName string `json:"nodeName"` + NodeType int `json:"nodeType"` + NodeStatus int `json:"nodeStatus"` + Platform string `json:"platform"` + PlatformVersion string `json:"platformVersion"` + CPU string `json:"CPU"` + MemTotal uint64 `json:"memTotal"` + DiskTotal uint64 `json:"diskTotal"` + SwapTotal uint64 `json:"swapTotal"` + CpuUsed float64 `json:"cpuUsed"` + MemUsed uint64 `json:"memUsed"` + SwapUsed uint64 `json:"swapUsed"` + DiskUsed uint64 `json:"diskUsed"` + NetInTransfer uint64 `json:"netInTransfer"` + NetOutTransfer uint64 `json:"netOutTransfer"` + NetInSpeed uint64 `json:"netInSpeed"` + NetOutSpeed uint64 `json:"netOutSpeed"` + Load1 float64 `json:"load1"` + Load5 float64 `json:"load5"` + Load15 float64 `json:"load15"` + TcpConnCount uint64 `json:"tcpConnCount"` + UdpConnCount uint64 `json:"udpConnCount"` + ProcessCount uint64 `json:"processCount"` + ExecTask string `json:"execTask"` +} + +type TaskDetail struct { + TaskId int64 `json:"taskId"` + ModelId int64 `json:"modelId"` + ModelName string `json:"modelName"` + NodeId int64 `json:"nodeId"` + NodeName string `json:"nodeName"` + TaskName string `json:"taskName"` + TaskDesc string `json:"taskDesc"` + DatasetArr string `json:"datasetArr"` + ResultStorage string `json:"resultStorage"` + AppointmentTime string `json:"appointmentTime"` + StartTime int64 `json:"startTime"` + FinishTime int64 `json:"finishTime"` + Status int `json:"status"` + CreateAt int64 `xorm:"created" json:"createAt"` + UpdateAt int64 `xorm:"updated" json:"updateAt"` +} diff --git a/internal/router/router.go b/internal/router/router.go index 573cc88..ed150a5 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -24,6 +24,7 @@ func InitRouter(cfg *config.WebConfig, logger *logging.Logger, engine *xorm.Engi { user.Use(middleware.JwtAuthMiddleware(logger.Logger)) user.POST("/login", e.ErrorWrapper(hs.Login)) + user.POST("/list", e.ErrorWrapper(hs.GetUserList)) user.GET("/getUserInfo", e.ErrorWrapper(hs.GetUserInfo)) menu := user.Group("/menu") @@ -113,6 +114,10 @@ func InitRouter(cfg *config.WebConfig, logger *logging.Logger, engine *xorm.Engi device.POST("/delete", e.ErrorWrapper(hs.DeleteDevice)) } } + //detection := manage.Group("/detection") + //{ + // + //} } model := r.Group("/model") { @@ -121,6 +126,8 @@ func InitRouter(cfg *config.WebConfig, logger *logging.Logger, engine *xorm.Engi model.POST("/add", e.ErrorWrapper(hs.AddModel)) model.POST("/edit", e.ErrorWrapper(hs.EditModel)) model.POST("/delete", e.ErrorWrapper(hs.DelModel)) + model.POST("/issue", e.ErrorWrapper(hs.ModelIssue)) + model.POST("/workflow", e.ErrorWrapper(hs.GetModelWorkflow)) } file := r.Group("/file") { @@ -138,10 +145,27 @@ func InitRouter(cfg *config.WebConfig, logger *logging.Logger, engine *xorm.Engi brand.POST("/edit", e.ErrorWrapper(hs.EditBrand)) brand.POST("/delete", e.ErrorWrapper(hs.DeleteBrand)) } + node := system.Group("/node") + { + node.POST("/list", e.ErrorWrapper(hs.NodeList)) + node.POST("/info", e.ErrorWrapper(hs.NodeInfo)) + node.POST("/edit", e.ErrorWrapper(hs.EditNode)) + + node.POST("/state", e.ErrorWrapper(hs.NodeState)) + node.POST("/last", e.ErrorWrapper(hs.NodeLastState)) + + } + } + task := r.Group("/task") + { + task.Use(middleware.JwtAuthMiddleware(logger.Logger)) + task.POST("/list", e.ErrorWrapper(hs.TaskList)) + task.POST("/add", e.ErrorWrapper(hs.AddTask)) } disease := r.Group("/disease") { disease.Use(middleware.JwtAuthMiddleware(logger.Logger)) + disease.POST("/list", e.ErrorWrapper(hs.DiseaseList)) diseaseType := disease.Group("/type") { diseaseType.POST("/list", e.ErrorWrapper(hs.DiseaseTypeList)) diff --git a/internal/service/dataset.go b/internal/service/dataset.go index 6e78714..d7a5b0c 100644 --- a/internal/service/dataset.go +++ b/internal/service/dataset.go @@ -106,6 +106,7 @@ func (rp *repo) DatasetList(ctx context.Context, req proto.DatasetRequest) (rsp DatasetId: v.DatasetId, DatasetName: v.DatasetName, DatasetDesc: v.DatasetDesc, + StoreName: v.StoreName, CategoryId: v.CategoryId, ProjectId: v.ProjectId, OwnerId: v.OwnerId, @@ -148,6 +149,7 @@ func (rp *repo) ImportDataset(ctx context.Context, req proto.ImportDatasetReques CategoryId: req.CategoryId, ProjectId: req.ProjectId, OwnerId: req.OwnerId, + StoreName: req.StoreName, Creator: req.Creator, Status: 1, CreateAt: time.Now().Unix(), diff --git a/internal/service/disease.go b/internal/service/disease.go index 0c634a3..873148f 100644 --- a/internal/service/disease.go +++ b/internal/service/disease.go @@ -13,6 +13,7 @@ import ( ) type DiseaseService interface { + DiseaseList(ctx context.Context, req proto.DiseaseRequest) (rsp *proto.BaseResponse, err error) DiseaseTypeList(ctx context.Context, req proto.DiseaseTypeRequest) (rsp *proto.BaseResponse, err error) AddDiseaseType(ctx context.Context, req proto.DiseaseTypeItemRequest) (rsp *proto.BaseResponse, err error) EditDiseaseType(ctx context.Context, req proto.DiseaseTypeItemRequest) (rsp *proto.BaseResponse, err error) @@ -27,6 +28,42 @@ func NewDiseaseService(cfg *config.WebConfig, engine *xorm.Engine, logger *loggi } } +func (rp *repo) DiseaseList(ctx context.Context, req proto.DiseaseRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + data := make([]model.Disease, 0) + count, err := rp.engine.Where("(? = '' or disease_name like ?)", req.Key, "%"+req.Key+"%"). + And("(? = 0 or category_id = ?)", req.BizType, req.BizType). + Limit(int(req.Size), int(((req.Page)-1)*req.Size)). + FindAndCount(&data) + if err != nil { + goto ReturnPoint + } + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "成功" + rsp = FillPaging(count, req.Page, req.Size, data, rsp) + rsp.Err = err + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} + func (rp *repo) DiseaseTypeList(ctx context.Context, req proto.DiseaseTypeRequest) (rsp *proto.BaseResponse, err error) { rsp = new(proto.BaseResponse) select { diff --git a/internal/service/index.go b/internal/service/index.go index f9d92d8..c4a0e7a 100644 --- a/internal/service/index.go +++ b/internal/service/index.go @@ -21,13 +21,14 @@ type PagingStruct struct { } // FillPaging 填充分页数据 -func FillPaging(count int64, pageNum int64, pageSize int64, list interface{}, data *proto.BaseResponse) *proto.BaseResponse { +func FillPaging(count, pageNum, pageSize int64, list interface{}, data *proto.BaseResponse) *proto.BaseResponse { //var tp int64 //if count%pageSize > 0 { // tp = count/pageSize + 1 //} else { // tp = count / pageSize //} + _ = fmt.Sprintf("%d, %d", pageNum, pageSize) ps := new(PagingStruct) ps.List = list ps.Total = count @@ -50,6 +51,7 @@ type UserService interface { Login(ctx context.Context, userName, pass string) (rsp *proto.BaseResponse, err error) GetUserInfo(ctx context.Context, userId int64) (rsp *proto.BaseResponse, err error) MenuList(ctx context.Context, userId int64) (rsp *proto.BaseResponse, err error) + GetUserList(ctx context.Context, req proto.UserRequest) (rsp *proto.BaseResponse, err error) } func NewUserService(engine *xorm.Engine, logger *logging.Logger) UserService { @@ -245,3 +247,39 @@ ReturnPoint: } return rsp, err } + +func (rp *repo) GetUserList(ctx context.Context, req proto.UserRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + data := make([]model.SystemUser, 0) + count, err := rp.engine.Where("(? = '' or phone like ?)", req.Phone, "%"+req.Phone+"%"). + And("(? = '' or real_name like ?)", req.RealName, req.RealName). + And("status = 1").Limit(int(req.Size), int(((req.Page)-1)*req.Size)). + FindAndCount(&data) + if err != nil { + goto ReturnPoint + } + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "成功" + rsp = FillPaging(count, req.Page, req.Size, data, rsp) + rsp.Err = err + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} diff --git a/internal/service/model.go b/internal/service/model.go index 73afa72..4282d86 100644 --- a/internal/service/model.go +++ b/internal/service/model.go @@ -2,11 +2,16 @@ package service import ( "context" + "encoding/json" "fmt" "git.hpds.cc/Component/logging" + "git.hpds.cc/pavement/hpds_node" + "hpds-iot-web/config" "hpds-iot-web/internal/proto" "hpds-iot-web/model" + "hpds-iot-web/mq" "net/http" + "strings" "time" "xorm.io/xorm" ) @@ -16,12 +21,15 @@ type ModelService interface { AddModel(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error) EditModel(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error) DelModel(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error) + ModelIssue(ctx context.Context, req proto.ModelIssueRequest) (rsp *proto.BaseResponse, err error) + GetModelWorkflow(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error) } -func NewModelService(engine *xorm.Engine, logger *logging.Logger) ModelService { +func NewModelService(cfg *config.WebConfig, engine *xorm.Engine, logger *logging.Logger) ModelService { return &repo{ - engine: engine, - logger: logger, + AppConfig: cfg, + engine: engine, + logger: logger, } } @@ -73,11 +81,19 @@ func (rp *repo) AddModel(ctx context.Context, req proto.ModelItemRequest) (rsp * default: item := &model.Model{ ModelName: req.ModelName, + BizType: req.BizType, ModelVersion: req.ModelVersion, ModelDesc: req.ModelDesc, ModelFiles: req.ModelFiles, ModelParamsFiles: req.ModelParamsFiles, ModelExecScript: req.ModelExecScript, + DockerFile: strings.Join(req.DockerFile, "|"), + MappedPort: req.MappedPort, + ModelCommand: req.ModelCommand, + InPath: req.InPath, + OutPath: req.OutPath, + HttpUrl: req.HttpUrl, + IsLightWeight: req.IsLightWeight, Status: 1, CreateAt: time.Now().Unix(), UpdateAt: time.Now().Unix(), @@ -142,6 +158,32 @@ func (rp *repo) EditModel(ctx context.Context, req proto.ModelItemRequest) (rsp if len(req.ModelExecScript) > 0 { item.ModelExecScript = req.ModelExecScript } + if req.BizType > 0 { + item.BizType = req.BizType + } + + if len(req.DockerFile) > 0 { + item.DockerFile = strings.Join(req.DockerFile, "|") + } + if req.MappedPort > 0 { + item.MappedPort = req.MappedPort + } + if len(req.ModelCommand) > 0 { + item.ModelCommand = req.ModelCommand + } + if len(req.InPath) > 0 { + item.InPath = req.InPath + } + if len(req.OutPath) > 0 { + item.OutPath = req.OutPath + } + if len(req.HttpUrl) > 0 { + item.HttpUrl = req.HttpUrl + } + if len(req.Workflow) > 0 { + item.Workflow = req.Workflow + } + item.IsLightWeight = req.IsLightWeight item.UpdateAt = time.Now().Unix() _, err = rp.engine.ID(req.ModelId).AllCols().Update(item) if err != nil { @@ -206,3 +248,123 @@ ReturnPoint: } return rsp, err } + +func (rp *repo) ModelIssue(ctx context.Context, req proto.ModelIssueRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + var h bool + m := new(model.Model) + h, err = rp.engine.ID(req.ModelId).Get(m) + if err != nil { + goto ReturnPoint + } + if !h { + err = fmt.Errorf("未能找到对应的模型") + goto ReturnPoint + } + node := new(model.Node) + h, err = rp.engine.ID(req.NodeId).Get(node) + if err != nil { + goto ReturnPoint + } + if !h { + err = fmt.Errorf("未能找到对应的节点") + goto ReturnPoint + } + item := new(model.IssueModel) + h, err = rp.engine.Where("model_id = ? and node_id= ?", req.ModelId, req.NodeId).Get(item) + if err != nil { + goto ReturnPoint + } + if h { + err = fmt.Errorf("已经有该模型") + goto ReturnPoint + } + item.ModelId = req.ModelId + item.NodeId = req.NodeId + item.CreateAt = time.Now().Unix() + item.UpdateAt = time.Now().Unix() + _, err = rp.engine.Insert(item) + if err != nil { + goto ReturnPoint + } + mqClient := mq.GetMqClient("task-request", 1) + payload := make(map[string]interface{}) + payload["modelId"] = item.ModelId + payload["modelVersion"] = m.ModelVersion + payload["modelCommand"] = m.ModelCommand + payload["nodeId"] = item.NodeId + payload["dockerFile"] = m.DockerFile + payload["mappedPort"] = m.MappedPort + payload["inPath"] = m.InPath + payload["outPath"] = m.OutPath + payload["httpUrl"] = m.HttpUrl + payload["nodeGuid"] = node.NodeGuid + mqPayload := &mq.InstructionReq{ + Command: mq.ModelIssue, + Payload: payload, + } + b, _ := json.Marshal(mqPayload) + err = mq.GenerateAndSendData(mqClient.EndPoint.(hpds_node.AccessPoint), b, rp.logger) + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "模型下发成功" + rsp.Err = ctx.Err() + rsp.Data = item + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} + +func (rp *repo) GetModelWorkflow(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + var h bool + item := new(model.Model) + h, err = rp.engine.ID(req.ModelId).Get(item) + if err != nil { + goto ReturnPoint + } + if !h { + err = fmt.Errorf("未能找到对应的模型") + goto ReturnPoint + } + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "删除模型成功" + rsp.Err = ctx.Err() + rsp.Data = item.Workflow + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} diff --git a/internal/service/system.go b/internal/service/system.go index 319c9fd..af9b305 100644 --- a/internal/service/system.go +++ b/internal/service/system.go @@ -17,6 +17,12 @@ type SystemService interface { AddBrand(ctx context.Context, req proto.BrandItemRequest) (rsp *proto.BaseResponse, err error) EditBrand(ctx context.Context, req proto.BrandItemRequest) (rsp *proto.BaseResponse, err error) DeleteBrand(ctx context.Context, req proto.BrandItemRequest) (rsp *proto.BaseResponse, err error) + + NodeList(ctx context.Context, req proto.NodeRequest) (rsp *proto.BaseResponse, err error) + NodeInfo(ctx context.Context, req proto.NodeItemRequest) (rsp *proto.BaseResponse, err error) + EditNode(ctx context.Context, req proto.NodeItemRequest) (rsp *proto.BaseResponse, err error) + NodeState(ctx context.Context, req proto.NodeInfoRequest) (rsp *proto.BaseResponse, err error) + NodeLastState(ctx context.Context, req proto.NodeInfoRequest) (rsp *proto.BaseResponse, err error) } func NewSystemService(engine *xorm.Engine, logger *logging.Logger) SystemService { @@ -231,3 +237,244 @@ ReturnPoint: } return rsp, err } + +func (rp *repo) NodeList(ctx context.Context, req proto.NodeRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + data := make([]model.Node, 0) + var count int64 + count, err = rp.engine.Where("(? = '' or node_name like ?)", req.NodeName, "%"+req.NodeName+"%"). + And("(? = '' or node_guid like ?)", req.NodeGuid, "%"+req.NodeGuid+"%"). + And("(? = 0 or node_type = ?)", req.NodeType, req.NodeType). + And("node_status = 1").Limit(int(req.Size), int(((req.Page)-1)*req.Size)). + FindAndCount(&data) + if err != nil { + goto ReturnPoint + } + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "成功" + rsp = FillPaging(count, req.Page, req.Size, data, rsp) + rsp.Err = err + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} + +func (rp *repo) NodeInfo(ctx context.Context, req proto.NodeItemRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + var h bool + item := new(model.Node) + h, err = rp.engine.ID(req.NodeId).Get(item) + if err != nil { + goto ReturnPoint + } + if !h { + err = fmt.Errorf("未能找到对应的节点") + goto ReturnPoint + } + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "获取节点成功" + rsp.Err = ctx.Err() + rsp.Data = item + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} + +func (rp *repo) EditNode(ctx context.Context, req proto.NodeItemRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + var h bool + item := new(model.Node) + h, err = rp.engine.ID(req.NodeId).Get(item) + if err != nil { + goto ReturnPoint + } + if !h { + err = fmt.Errorf("未能找到对应的节点") + goto ReturnPoint + } + if len(req.NodeName) > 0 { + item.NodeName = req.NodeName + } + if req.NodeType > 0 { + item.NodeType = req.NodeType + } + if len(req.Platform) > 0 { + item.Platform = req.Platform + } + if len(req.PlatformVersion) > 0 { + item.PlatformVersion = req.PlatformVersion + } + if len(req.CPU) > 0 { + item.CPU = req.CPU + } + if req.MemTotal > 0 { + item.MemTotal = req.MemTotal + } + if req.DiskTotal > 0 { + item.DiskTotal = req.DiskTotal + } + if req.SwapTotal > 0 { + item.SwapTotal = req.SwapTotal + } + if len(req.Arch) > 0 { + item.Arch = req.Arch + } + if len(req.Virtualization) > 0 { + item.Virtualization = req.Virtualization + } + if req.BootTime > 0 { + item.BootTime = req.BootTime + } + if len(req.IP) > 0 { + item.IP = req.IP + } + if len(req.CountryCode) > 0 { + item.CountryCode = req.CountryCode + } + if len(req.Version) > 0 { + item.Version = req.Version + } + item.UpdateAt = time.Now().Unix() + _, err = rp.engine.ID(req.NodeId).AllCols().Update(item) + if err != nil { + goto ReturnPoint + } + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "修改节点成功" + rsp.Err = ctx.Err() + rsp.Data = item + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} + +func (rp *repo) NodeState(ctx context.Context, req proto.NodeInfoRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + list := make([]model.NodeState, 0) + err = rp.engine.Where("node_name = ?", req.NodeGuid). + And(" uptime > UNIX_TIMESTAMP(DATE_ADD(NOW(),INTERVAL -24 HOUR))"). + Find(&list) + if err != nil { + goto ReturnPoint + } + state := new(proto.NodeState) + state.List = list + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "获取节点状态信息成功" + rsp.Err = ctx.Err() + rsp.Data = state + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} + +func (rp *repo) NodeLastState(ctx context.Context, req proto.NodeInfoRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + list := make([]proto.NodeLastStateItem, 0) + //err = rp.engine.Where("node_name = ?", req.NodeGuid). + // And("? + uptime > UNIX_TIMESTAMP(DATE_ADD(NOW(),INTERVAL -24 HOUR))", req.Uptime).Desc("uptime"). + // Find(&list) + err = rp.engine.SQL(`select c.node_id,c.node_name,c.node_guid,c.node_type,c.node_type,c.platform,c.platform_version,c.c_p_u,c.mem_total, +c.disk_total,c.swap_total,a.c_p_u cpu_used,c.node_status, a.mem_used,a.swap_used, a.disk_used, a.net_in_transfer, a.net_in_speed, +a.net_out_speed, a.net_out_transfer, a.load1, a.load5, a.load15, a.tcp_conn_count, a.udp_conn_count, a.process_count, +d.task_name exec_task from node_state a , (select node_name, max(uptime) uptime from node_state group by node_name) b, node c +left join (select t2.node_id, t2.task_name from task t2, (select node_id, max(start_time) start from task group by node_id) t1 where t2.node_id = t1.node_id and t2.start_time = t1.start and t2.status = 1) d on c.node_id = d.node_id + where a.node_name = b.node_name and a.uptime = b.uptime and a.node_name = c.node_guid and c.node_status > 0 and (? = '' or a.node_name = ?) `, req.NodeGuid, req.NodeGuid).Find(&list) + if err != nil { + goto ReturnPoint + } + state := new(proto.NodeLastState) + state.List = list + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "获取节点状态信息成功" + rsp.Err = ctx.Err() + rsp.Data = state + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} diff --git a/internal/service/task.go b/internal/service/task.go new file mode 100644 index 0000000..118c2be --- /dev/null +++ b/internal/service/task.go @@ -0,0 +1,178 @@ +package service + +import ( + "context" + "encoding/json" + "fmt" + "git.hpds.cc/Component/logging" + "git.hpds.cc/pavement/hpds_node" + "hpds-iot-web/internal/proto" + "hpds-iot-web/model" + "hpds-iot-web/mq" + "net/http" + "time" + "xorm.io/xorm" +) + +type TaskService interface { + TaskList(ctx context.Context, req proto.TaskRequest) (rsp *proto.BaseResponse, err error) + AddTask(ctx context.Context, req proto.TaskItemRequest) (rsp *proto.BaseResponse, err error) + //EditTask(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error) +} + +func NewTaskService(engine *xorm.Engine, logger *logging.Logger) TaskService { + return &repo{ + engine: engine, + logger: logger, + } +} + +func (rp *repo) TaskList(ctx context.Context, req proto.TaskRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + data := make([]proto.TaskDetail, 0) + count, err := rp.engine.Table("task").Alias("t"). + Join("inner", []string{"model", "m"}, "t.model_id = m.model_id"). + Join("inner", []string{"node", "n"}, "t.node_id = n.node_id"). + Cols("t.*", "m.model_name", "n.node_name"). + Where("(? = 0 or m.biz_type = ?)", req.BizType, req.BizType). + And("(?='' or task_name like ?)", req.TaskName, "%"+req.TaskName+"%"). + And("t.start_time >= unix_timestamp(?)", req.StartTime). + And("? = 0 or t.start_time <= unix_timestamp(?)", req.FinishTime, req.FinishTime). + And("t.status > 0").Limit(int(req.Size), int(((req.Page)-1)*req.Size)). + FindAndCount(&data) + if err != nil { + goto ReturnPoint + } + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "成功" + rsp = FillPaging(count, req.Page, req.Size, data, rsp) + rsp.Err = err + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} + +func (rp *repo) AddTask(ctx context.Context, req proto.TaskItemRequest) (rsp *proto.BaseResponse, err error) { + rsp = new(proto.BaseResponse) + select { + case <-ctx.Done(): + err = fmt.Errorf("超时/取消") + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Message = "超时/取消" + rsp.Err = ctx.Err() + return rsp, ctx.Err() + default: + var h bool + m := new(model.Model) + h, err = rp.engine.ID(req.ModelId).Get(m) + if err != nil { + goto ReturnPoint + } + if !h { + err = fmt.Errorf("未能找到对应的模型") + goto ReturnPoint + } + + ds := new(model.Dataset) + h, err = rp.engine.ID(req.ModelId).Get(ds) + if err != nil { + goto ReturnPoint + } + if !h { + err = fmt.Errorf("未能找到对应的数据集") + goto ReturnPoint + } + + item := &model.Task{ + ModelId: req.ModelId, + NodeId: req.NodeId, + TaskName: req.TaskName, + TaskDesc: req.TaskDesc, + DatasetArr: fmt.Sprintf("%d", req.DatasetArr), + SubDataset: req.SubDataset, + SubDataTag: req.SubDataTag, + AppointmentTime: req.AppointmentTime, + Status: 1, + CreateAt: time.Now().Unix(), + UpdateAt: time.Now().Unix(), + } + if len(req.AppointmentTime) > 0 { + var appTime time.Time + appTime, err = time.ParseInLocation("2006-01-02 15:04:05", req.AppointmentTime, time.Local) + if err != nil { + err = fmt.Errorf("时间格式不匹配") + goto ReturnPoint + } + item.StartTime = appTime.Unix() + } else { + item.StartTime = time.Now().Unix() + } + _, err = rp.engine.Insert(item) + if err != nil { + goto ReturnPoint + } + //reg, _ := regexp.Compile("\\[.*?\\]") + //if ok := reg.FindAll([]byte(item.ResultStorage), 2); len(ok) > 0 { + // item.ResultStorage = reg.ReplaceAllString(item.ResultStorage, fmt.Sprintf("%d_%d", item.TaskId, item.ModelId)) + // _, err = rp.engine.ID(item.TaskId).Cols("result_storage").Update(item) + // if err != nil { + // goto ReturnPoint + // } + //} + payload := make(map[string]interface{}) + payload["taskId"] = item.TaskId + payload["modelId"] = item.ModelId + payload["modelVersion"] = m.ModelVersion + payload["modelCommand"] = m.ModelCommand + payload["nodeId"] = item.NodeId + payload["inPath"] = m.InPath + payload["outPath"] = m.OutPath + payload["httpUrl"] = m.HttpUrl + payload["datasetArr"] = item.DatasetArr + payload["datasetPath"] = ds.StoreName + payload["datasetName"] = ds.DatasetName + payload["subDataset"] = item.SubDataset + payload["subDataTag"] = item.SubDataTag + payload["workflow"] = m.Workflow + + mqClient := mq.GetMqClient("task-request", 1) + mqPayload := &mq.InstructionReq{ + Command: mq.TaskAdd, + Payload: payload, + } + pData, _ := json.Marshal(mqPayload) + err = mq.GenerateAndSendData(mqClient.EndPoint.(hpds_node.AccessPoint), pData, rp.logger) + rsp.Code = http.StatusOK + rsp.Status = http.StatusText(http.StatusOK) + rsp.Message = "新增任务成功" + rsp.Err = ctx.Err() + rsp.Data = item + return rsp, err + } +ReturnPoint: + if err != nil { + rsp.Code = http.StatusInternalServerError + rsp.Status = http.StatusText(http.StatusInternalServerError) + rsp.Err = err + rsp.Message = "失败" + } + return rsp, err +} diff --git a/model/dataset.go b/model/dataset.go index b9b21ee..b032cb2 100644 --- a/model/dataset.go +++ b/model/dataset.go @@ -4,6 +4,7 @@ type Dataset struct { DatasetId int64 `xorm:"not null pk autoincr INT(11)" json:"datasetId"` DatasetName string `xorm:"varchar(200) not null" json:"datasetName"` DatasetDesc string `xorm:"varchar(200)" json:"datasetDesc"` + StoreName string `xorm:"varchar(200)" json:"storeName"` CategoryId int `xorm:"not null SMALLINT default 1" json:"categoryId"` //业务分类, 1:道路 2:桥梁 3:隧道 4:边坡 ProjectId int64 `xorm:"INT(11) not null default 0 index" json:"projectId"` OwnerId int64 `xorm:"INT(11) not null default 0 index" json:"ownerId"` diff --git a/model/disease.go b/model/disease.go index 8983058..ecb4c7e 100644 --- a/model/disease.go +++ b/model/disease.go @@ -10,6 +10,7 @@ type Disease struct { DetectionMethod string `xorm:"varchar(200) not null " json:"detectionMethod"` DiseaseDesc string `xorm:"TEXT" json:"diseaseDesc"` Reference string `xorm:"varchar(200) not null " json:"reference"` //参照标准 + StoreName string `xorm:"varchar(200)" json:"storeName"` //存储路径 CreateAt int64 `xorm:"created" json:"createAt"` UpdateAt int64 `xorm:"updated" json:"updateAt"` } diff --git a/model/index.go b/model/index.go index 7b175be..609a6b7 100644 --- a/model/index.go +++ b/model/index.go @@ -17,9 +17,9 @@ var ( Redis *redis.Client ) -func New(driveName, dsn string) { - DB, _ = NewDbConnection(dsn) - DB.ShowSQL(true) +func New(driveName, dsn string, showSql bool) { + DB, _ = NewDbConnection(driveName, dsn) + DB.ShowSQL(showSql) DB.Dialect().SetQuotePolicy(dialects.QuotePolicyReserved) err := DB.Sync2( &Brand{}, @@ -42,6 +42,8 @@ func New(driveName, dsn string) { &Model{}, &ModelVersion{}, &Node{}, + &NodeState{}, + &IssueModel{}, &OriginalData{}, &Owner{}, &Project{}, @@ -52,6 +54,8 @@ func New(driveName, dsn string) { &SystemRole{}, &SystemUser{}, &SystemUserRole{}, + &Task{}, + &TaskResult{}, ) if err != nil { zap.L().Error("同步数据库表结构", zap.Error(err)) @@ -59,8 +63,8 @@ func New(driveName, dsn string) { } } -func NewDbConnection(dsn string) (db *xorm.Engine, err error) { - db, err = xorm.NewEngine("mysql", dsn) +func NewDbConnection(driveName, dsn string) (db *xorm.Engine, err error) { + db, err = xorm.NewEngine(driveName, dsn) if err != nil { zap.L().Error("创建数据库连接", zap.Error(err)) os.Exit(-1) diff --git a/model/model.go b/model/model.go index 162e6c2..663a859 100644 --- a/model/model.go +++ b/model/model.go @@ -3,11 +3,20 @@ package model type Model struct { ModelId int `xorm:"not null pk autoincr INT(11)" json:"modelId"` ModelName string `xorm:"varchar(200) not null" json:"modelName"` + BizType int `xorm:"int not null default 1" json:"bizType"` ModelVersion string `xorm:"varchar(50) not null" json:"modelVersion"` ModelDesc string `xorm:"varchar(200) not null" json:"modelDesc"` ModelFiles string `xorm:"varchar(200) not null" json:"modelFiles"` ModelParamsFiles string `xorm:"varchar(200)" json:"modelParamsFiles"` ModelExecScript string `xorm:"varchar(200)" json:"modelExecScript"` + DockerFile string `xorm:"varchar(200)" json:"dockerFile"` + MappedPort int `xorm:"int" json:"mappedPort"` + ModelCommand string `xorm:"varchar(200)" json:"modelCommand"` + InPath string `xorm:"varchar(200)" json:"inPath"` + OutPath string `xorm:"varchar(200)" json:"outPath"` + HttpUrl string `xorm:"varchar(200)" json:"httpUrl"` + IsLightWeight bool `xorm:"TINYINT(1) default 0" json:"isLightWeight"` + Workflow string `xorm:"TEXT" json:"workflow"` Status int `xorm:"not null SMALLINT default 0" json:"status"` CreateAt int64 `xorm:"created" json:"createAt"` UpdateAt int64 `xorm:"updated" json:"updateAt"` diff --git a/model/modelIssue.go b/model/modelIssue.go new file mode 100644 index 0000000..e5505d8 --- /dev/null +++ b/model/modelIssue.go @@ -0,0 +1,11 @@ +package model + +type IssueModel struct { + Id int64 `xorm:"not null pk autoincr INT(11)" json:"id"` + ModelId int64 `xorm:"INT(11) index" json:"modelId"` + NodeId int64 `xorm:"INT(11) index" json:"nodeId"` + Status int `xorm:"not null SMALLINT default 0" json:"status"` + IssueResult string `xorm:"TEXT" json:"issueResult"` + CreateAt int64 `xorm:"created" json:"createAt"` + UpdateAt int64 `xorm:"updated" json:"updateAt"` +} diff --git a/model/node.go b/model/node.go index f553191..453fcbf 100644 --- a/model/node.go +++ b/model/node.go @@ -1,10 +1,26 @@ package model +// Node 节点信息 type Node struct { - NodeId int `xorm:"not null pk autoincr INT(11)" json:"nodeId"` - NodeName string `xorm:"varchar(50) not null" json:"nodeName"` - NodeType int `xorm:"not null SMALLINT default 0" json:"nodeType"` - NodeStatus int `xorm:"not null SMALLINT default 0" json:"nodeStatus"` - CreateAt int64 `xorm:"created" json:"createAt"` - UpdateAt int64 `xorm:"updated" json:"updateAt"` + NodeId int64 `xorm:"not null pk autoincr INT(11)" json:"nodeId"` + NodeGuid string `xorm:"varchar(100) index" json:"nodeGuid"` + NodeName string `xorm:"varchar(100)" json:"nodeName"` + NodeType int `xorm:"not null SMALLINT default 0" json:"nodeType"` + NodeStatus int `xorm:"not null SMALLINT default 0" json:"nodeStatus"` + Platform string `xorm:"varchar(100)" json:"platform,omitempty"` + PlatformVersion string `xorm:"varchar(100)" json:"platformVersion,omitempty"` + CPU string `xorm:"varchar(1000)" json:"cpu,omitempty"` + MemTotal uint64 `xorm:"BIGINT" json:"memTotal,omitempty"` + DiskTotal uint64 `xorm:"BIGINT" json:"diskTotal,omitempty"` + SwapTotal uint64 `xorm:"BIGINT" json:"swapTotal,omitempty"` + Arch string `xorm:"varchar(1000)" json:"arch,omitempty"` + Virtualization string `xorm:"varchar(1000)" json:"virtualization,omitempty"` + BootTime uint64 `xorm:"BIGINT" json:"bootTime,omitempty"` + IP string `xorm:"varchar(100)" json:"ip"` + IsGpu bool `xorm:"TINYINT(1) default 0" json:"isGpu"` + IsTaskExecute bool `xorm:"TINYINT(1) default 0" json:"isTaskExecute"` + CountryCode string `xorm:"varchar(100)" json:"countryCode,omitempty"` + Version string `xorm:"varchar(100)" json:"version,omitempty"` + CreateAt int64 `xorm:"created" json:"createAt"` + UpdateAt int64 `xorm:"updated" json:"updateAt"` } diff --git a/model/nodeState.go b/model/nodeState.go new file mode 100644 index 0000000..b23b5b5 --- /dev/null +++ b/model/nodeState.go @@ -0,0 +1,21 @@ +package model + +// NodeState 节点状态信息 +type NodeState struct { + Uptime uint64 `xorm:"BIGINT pk" json:"uptime,omitempty"` + NodeName string `xorm:"varchar(100) pk" json:"nodeName"` + CPU float64 `xorm:"DECIMAL(18,4)" json:"cpu,omitempty"` + MemUsed uint64 `xorm:"BIGINT" json:"memUsed,omitempty"` + SwapUsed uint64 `xorm:"BIGINT" json:"swapUsed,omitempty"` + DiskUsed uint64 `xorm:"BIGINT" json:"diskUsed,omitempty"` + NetInTransfer uint64 `xorm:"BIGINT" json:"netInTransfer,omitempty"` + NetOutTransfer uint64 `xorm:"BIGINT" json:"netOutTransfer,omitempty"` + NetInSpeed uint64 `xorm:"BIGINT" json:"netInSpeed,omitempty"` + NetOutSpeed uint64 `xorm:"BIGINT" json:"netOutSpeed,omitempty"` + Load1 float64 `xorm:"DECIMAL(18,4)" json:"load1,omitempty"` + Load5 float64 `xorm:"DECIMAL(18,4)" json:"load5,omitempty"` + Load15 float64 `xorm:"DECIMAL(18,4)" json:"load15,omitempty"` + TcpConnCount uint64 `xorm:"BIGINT" json:"tcpConnCount,omitempty"` + UdpConnCount uint64 `xorm:"BIGINT" json:"udpConnCount,omitempty"` + ProcessCount uint64 `xorm:"BIGINT" json:"processCount,omitempty"` +} diff --git a/model/systemUser.go b/model/systemUser.go index 6531366..7e1d417 100644 --- a/model/systemUser.go +++ b/model/systemUser.go @@ -14,6 +14,7 @@ type SystemUser struct { Pass string `xorm:"VARCHAR(128) not null" json:"pass"` //密码 Salt string `xorm:"VARCHAR(32) not null" json:"salt"` //盐 RealName string `xorm:"VARCHAR(50)" json:"realName"` //真实姓名 + Status int `xorm:"not null SMALLINT default 1" json:"status"` //是否禁用 CreateAt int64 `xorm:"created" json:"createAt"` //创建时间 UpdateAt int64 `xorm:"updated" json:"updateAt"` //更新时间 } diff --git a/model/task.go b/model/task.go new file mode 100644 index 0000000..b7edb83 --- /dev/null +++ b/model/task.go @@ -0,0 +1,18 @@ +package model + +type Task struct { + TaskId int64 `xorm:"not null pk autoincr INT(11)" json:"taskId"` + ModelId int64 `xorm:"INT(11) index" json:"modelId"` + NodeId int64 `xorm:"INT(11) index" json:"nodeId"` + TaskName string `xorm:"VARCHAR(200)" json:"taskName"` + TaskDesc string `xorm:"VARCHAR(500)" json:"taskDesc"` + DatasetArr string `xorm:"TEXT" json:"datasetArr"` + SubDataset string `xorm:"varchar(100)" json:"subDataset"` + SubDataTag int `xorm:"tinyint" json:"subDataTag"` + AppointmentTime string `xorm:"VARCHAR(30)" json:"appointmentTime"` + StartTime int64 `xorm:"BIGINT" json:"startTime"` + FinishTime int64 `xorm:"BIGINT" json:"finishTime"` + Status int `xorm:"not null SMALLINT default 0" json:"status"` // 1:等待执行; 2:执行中; 3:执行完成; 4:任务分配失败; 5:任务执行失败 + CreateAt int64 `xorm:"created" json:"createAt"` + UpdateAt int64 `xorm:"updated" json:"updateAt"` +} diff --git a/model/taskResult.go b/model/taskResult.go new file mode 100644 index 0000000..b055467 --- /dev/null +++ b/model/taskResult.go @@ -0,0 +1,15 @@ +package model + +type TaskResult struct { + ResultId int64 `xorm:"not null pk autoincr INT(11)" json:"resultId"` + TaskId int64 `xorm:"INT(11) index" json:"taskId"` + TaskCode string `xorm:"varchar(200)" json:"taskCode"` + ModelId int64 `xorm:"INT(11)" json:"modelId"` + NodeId int64 `xorm:"INT(11)" json:"nodeId"` + StartTime int64 `xorm:"BIGINT" json:"startTime"` + FinishTime int64 `xorm:"BIGINT" json:"finishTime"` + SubDataset string `xorm:"varchar(200)" json:"subDataset"` + DatasetId int64 `xorm:"INT(11)" json:"datasetId"` + SrcPath string `xorm:"varchar(500)" json:"srcPath"` + Result string `xorm:"TEXT" json:"result"` +} diff --git a/mq/index.go b/mq/index.go new file mode 100644 index 0000000..5d54ff6 --- /dev/null +++ b/mq/index.go @@ -0,0 +1,93 @@ +package mq + +import ( + "fmt" + "git.hpds.cc/Component/logging" + "go.uber.org/zap" + "hpds-iot-web/config" + "os" + "time" + + "git.hpds.cc/pavement/hpds_node" +) + +var MqList []HpdsMqNode + +type HpdsMqNode struct { + MqType uint + Topic string + Node config.HpdsNode + EndPoint interface{} +} + +func must(logger *logging.Logger, err error) { + if err != nil { + if logger != nil { + logger.With(zap.String("web节点", "错误信息")).Error("启动错误", zap.Error(err)) + } else { + _, _ = fmt.Fprint(os.Stderr, err) + } + os.Exit(1) + } +} + +func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *logging.Logger) (mqList []HpdsMqNode, err error) { + mqList = make([]HpdsMqNode, 0) + for _, v := range funcs { + switch v.MqType { + case 2: + sf := hpds_node.NewStreamFunction( + v.Name, + hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)), + hpds_node.WithObserveDataTags(v.DataTag), + hpds_node.WithCredential(node.Token), + ) + err = sf.Connect() + nodeInfo := HpdsMqNode{ + MqType: 2, + Topic: v.Name, + Node: node, + EndPoint: sf, + } + must(logger, err) + mqList = append(mqList, nodeInfo) + default: + ap := hpds_node.NewAccessPoint( + v.Name, + hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)), + hpds_node.WithCredential(node.Token), + ) + err = ap.Connect() + nodeInfo := HpdsMqNode{ + MqType: 1, + Topic: v.Name, + Node: node, + EndPoint: ap, + } + must(logger, err) + ap.SetDataTag(v.DataTag) + mqList = append(mqList, nodeInfo) + } + + } + return mqList, err +} + +func GetMqClient(topic string, mqType uint) *HpdsMqNode { + for _, v := range MqList { + if v.Topic == topic && v.MqType == mqType { + return &v + } + } + return nil +} + +func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte, logger *logging.Logger) error { + logger.With(zap.String("web节点", "发送消息")).Info("数据", zap.String("发送的数据", string(data))) + _, err := stream.Write(data) + if err != nil { + return err + } + time.Sleep(1000 * time.Millisecond) + return nil +} diff --git a/mq/instruction.go b/mq/instruction.go new file mode 100644 index 0000000..4bfe37e --- /dev/null +++ b/mq/instruction.go @@ -0,0 +1,11 @@ +package mq + +const ( + TaskAdd = iota + 1 + ModelIssue +) + +type InstructionReq struct { + Command int `json:"command"` + Payload interface{} `json:"payload"` +}