1、增加任务指令

This commit is contained in:
wangjian 2023-03-23 18:03:09 +08:00
parent a2c95fe43f
commit 58b0f2c4c6
32 changed files with 1419 additions and 41 deletions

View File

@ -8,6 +8,7 @@ import (
"hpds-iot-web/config"
router2 "hpds-iot-web/internal/router"
"hpds-iot-web/model"
"hpds-iot-web/mq"
discover "hpds-iot-web/pkg/discover/consul"
"net/http"
"os"
@ -27,7 +28,7 @@ var (
func must(err error) {
if err != nil {
fmt.Fprint(os.Stderr, err)
_, _ = fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
}
@ -79,24 +80,28 @@ func NewStartCmd() *cobra.Command {
//创建注册对象
tags := make([]string, 1)
tags[0] = "web"
consulCfg, err := discover.NewConsulConfig(cfg.Consul.Host, cfg.Name, cfg.Name, cfg.Consul.Host, cfg.Consul.Port,
consulCfg, err := discover.NewConsulConfig(fmt.Sprintf("%s:%d", cfg.Consul.Host, cfg.Consul.Port), cfg.Name, cfg.Name, cfg.Consul.Host, cfg.Consul.Port,
tags, 300, 300, 300)
must(err)
//连接数据库
model.New(cfg.Db.DriveName, cfg.Db.Conn)
model.New(cfg.Db.DriveName, cfg.Db.Conn, cfg.Mode == "dev")
//连接redis
model.NewCache(cfg.Cache)
logger := LoadLoggerConfig(cfg.Logging)
//创建消息连接点
mq.MqList, err = mq.NewMqClient(cfg.Funcs, cfg.Node, logger)
must(err)
// 退出channel
exitChannel := make(chan os.Signal)
defer close(exitChannel)
// 退出信号监听
go func(c chan os.Signal) {
consulCfg.ServiceDeregister()
_ = consulCfg.ServiceDeregister()
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
}(exitChannel)
router := router2.InitRouter(cfg, logger, model.DB)
@ -116,13 +121,13 @@ func NewStartCmd() *cobra.Command {
//zap.L().Error("发生错误", zap.Error(err))
select {
case <-ctx.Done():
consulCfg.ServiceDeregister()
_ = consulCfg.ServiceDeregister()
logger.With(
zap.String("web", "exit"),
).Error(ctx.Err().Error())
return
case errs := <-exitChannel:
consulCfg.ServiceDeregister()
_ = consulCfg.ServiceDeregister()
logger.With(
zap.String("web", "服务退出"),
).Info(errs.String())

View File

@ -26,7 +26,7 @@ consul:
timeout: 5
deregister: 1
db:
conn: root:OIxv7QptYBO3@tcp(192.168.0.200:3306)/hpds_jky?charset=utf8mb4
conn: root:OIxv7QptYBO3@tcp(114.55.236.153:27136)/hpds_jky?charset=utf8mb4
drive_name: mysql
cache:
host: 192.168.0.200

38
config/config-prod.yaml Normal file
View File

@ -0,0 +1,38 @@
name: web
host: 0.0.0.0
port: 8088
mode: dev
logging:
path: ./logs
prefix: hpds-iot-web
errorFileSuffix: error.log
warnFileSuffix: warn.log
infoFileSuffix: info.log
debugFileSuffix: debug.log
maxSize: 100
maxBackups: 3000
maxAge: 30
development: true
mineData:
accessKey: f0bda738033e47ffbfbd5d3f865c19e1
minio:
protocol: http
endpoint: 127.0.0.1:9000
accessKeyId: root
secretAccessKey: OIxv7QptYBO3
consul:
host: http://consul.hpds.cc
port: 80
interval: 300
timeout: 5
deregister: 1
db:
conn: root:OIxv7QptYBO3@tcp(114.55.236.153:23306)/diagnostic_platform?charset=utf8mb4
drive_name: mysql
cache:
host: 127.0.0.1
port: 6379
db: 0
pool_size: 10
functions:
- name: web-sf

View File

@ -1,7 +1,6 @@
package config
import (
"bytes"
"fmt"
"github.com/spf13/viper"
"os"
@ -22,6 +21,8 @@ type WebConfig struct {
Logging LogOptions `yaml:"logging"`
Minio MinioConfig `yaml:"minio"`
MineData MineDataConfig `yaml:"mineData"`
Node HpdsNode `yaml:"node,omitempty"`
Funcs []FuncConfig `yaml:"functions,omitempty"`
}
type ConsulConfig struct {
Host string `yaml:"host,omitempty"`
@ -69,6 +70,18 @@ type MineDataConfig struct {
AccessKey string `yaml:"accessKey"`
}
type HpdsNode struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Token string `yaml:"token,omitempty"`
}
type FuncConfig struct {
Name string `yaml:"name"`
DataTag uint8 `yaml:"dataTag"`
MqType uint `yaml:"mqType"` //消息类型, 发布1订阅2
}
func ParseConfigByFile(path string) (cfg *WebConfig, err error) {
buffer, err := os.ReadFile(path)
if err != nil {
@ -81,11 +94,13 @@ func load(buf []byte) (cfg *WebConfig, err error) {
cViper := viper.New()
cViper.SetConfigType("yaml")
cfg = new(WebConfig)
cViper.ReadConfig(bytes.NewBuffer(buf))
err = cViper.Unmarshal(cfg)
if err != nil {
return nil, err
}
cfg.Funcs = make([]FuncConfig, 0)
//cViper.ReadConfig(bytes.NewBuffer(buf))
err = yaml.Unmarshal(buf, cfg)
//err = cViper.Unmarshal(cfg)
//if err != nil {
// return nil, err
//}
return
}

View File

@ -34,5 +34,11 @@ cache:
port: 6379
db: 0
pool_size: 10
node:
host: 127.0.0.1
port: 27188
token: 06d36c6f5705507dae778fdce90d0767
functions:
- name: web-sf
- name: task-request
dataTag : 12
mqType: 1

15
go.mod
View File

@ -5,6 +5,7 @@ go 1.18
require (
git.hpds.cc/Component/gin_valid v0.0.0-20230104142509-f956bce255b6
git.hpds.cc/Component/logging v0.0.0-20230106105738-e378e873921b
git.hpds.cc/pavement/hpds_node v0.0.0-20230307094826-753c4fe9c877
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/gin-contrib/zap v0.1.0
github.com/gin-gonic/gin v1.8.2
@ -25,6 +26,8 @@ require (
cloud.google.com/go/compute v1.12.1 // indirect
cloud.google.com/go/compute/metadata v0.2.1 // indirect
cloud.google.com/go/firestore v1.8.0 // indirect
git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect
git.hpds.cc/Component/network v0.0.0-20221012021659-2433c68452d5 // indirect
github.com/armon/go-metrics v0.4.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
@ -35,8 +38,10 @@ require (
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.11.1 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
@ -55,7 +60,11 @@ require (
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lucas-clemente/quic-go v0.29.1 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect
github.com/matoous/go-nanoid/v2 v2.0.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
@ -64,6 +73,8 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pkg/errors v0.9.1 // indirect
@ -87,12 +98,15 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/text v0.5.0 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.102.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
@ -101,6 +115,7 @@ require (
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
xorm.io/builder v0.3.11-0.20220531020008-1bd24a7dc978 // indirect
)

View File

@ -9,6 +9,30 @@ import (
e "hpds-iot-web/pkg/err"
)
func (s HandlerService) DiseaseList(c *gin.Context) (data interface{}, err error) {
repo := service.NewDiseaseService(s.AppConfig, s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.DiseaseRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("DiseaseList", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
if req.Size < 1 {
req.Size = 20
}
if req.Size > 1000 {
req.Size = 1000
}
if req.Page < 1 {
req.Page = 1
}
data, err = repo.DiseaseList(c, req)
go s.SaveLog("获取病害库列表", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}
func (s HandlerService) DiseaseTypeList(c *gin.Context) (data interface{}, err error) {
repo := service.NewDiseaseService(s.AppConfig, s.Engine, s.Logger)
us, _ := c.Get("operatorUser")

View File

@ -10,7 +10,7 @@ import (
)
func (s HandlerService) ModelList(c *gin.Context) (data interface{}, err error) {
repo := service.NewModelService(s.Engine, s.Logger)
repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.ModelRequest
@ -33,7 +33,7 @@ func (s HandlerService) ModelList(c *gin.Context) (data interface{}, err error)
return
}
func (s HandlerService) AddModel(c *gin.Context) (data interface{}, err error) {
repo := service.NewModelService(s.Engine, s.Logger)
repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.ModelItemRequest
@ -47,7 +47,7 @@ func (s HandlerService) AddModel(c *gin.Context) (data interface{}, err error) {
return
}
func (s HandlerService) EditModel(c *gin.Context) (data interface{}, err error) {
repo := service.NewModelService(s.Engine, s.Logger)
repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.ModelItemRequest
@ -61,7 +61,7 @@ func (s HandlerService) EditModel(c *gin.Context) (data interface{}, err error)
return
}
func (s HandlerService) DelModel(c *gin.Context) (data interface{}, err error) {
repo := service.NewModelService(s.Engine, s.Logger)
repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.ModelItemRequest
@ -74,3 +74,32 @@ func (s HandlerService) DelModel(c *gin.Context) (data interface{}, err error) {
go s.SaveLog("删除模型", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}
func (s HandlerService) ModelIssue(c *gin.Context) (data interface{}, err error) {
repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.ModelIssueRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("ModelIssue", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
data, err = repo.ModelIssue(c, req)
go s.SaveLog("模型下发", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}
func (s HandlerService) GetModelWorkflow(c *gin.Context) (data interface{}, err error) {
repo := service.NewModelService(s.AppConfig, s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.ModelItemRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("ModelIssue", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
data, err = repo.GetModelWorkflow(c, req)
go s.SaveLog("获取模型工作流", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}

View File

@ -92,3 +92,87 @@ func (s HandlerService) DeleteBrand(c *gin.Context) (data interface{}, err error
go s.SaveLog("删除品牌", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}
func (s HandlerService) NodeList(c *gin.Context) (data interface{}, err error) {
repo := service.NewSystemService(s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.NodeRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("NodeList", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
if req.Size < 1 {
req.Size = 20
}
if req.Size > 100 {
req.Size = 100
}
if req.Page < 1 {
req.Page = 1
}
data, err = repo.NodeList(c, req)
go s.SaveLog("获取节点列表", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}
func (s HandlerService) NodeInfo(c *gin.Context) (data interface{}, err error) {
repo := service.NewSystemService(s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.NodeItemRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("NodeInfo", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
data, err = repo.NodeInfo(c, req)
go s.SaveLog("获取节点信息", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}
func (s HandlerService) EditNode(c *gin.Context) (data interface{}, err error) {
repo := service.NewSystemService(s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.NodeItemRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("EditNode", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
data, err = repo.EditNode(c, req)
go s.SaveLog("修改节点", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}
func (s HandlerService) NodeState(c *gin.Context) (data interface{}, err error) {
repo := service.NewSystemService(s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.NodeInfoRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("NodeState", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
data, err = repo.NodeState(c, req)
go s.SaveLog("获取节点运行信息", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}
func (s HandlerService) NodeLastState(c *gin.Context) (data interface{}, err error) {
repo := service.NewSystemService(s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.NodeInfoRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("NodeLastState", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
data, err = repo.NodeLastState(c, req)
go s.SaveLog("获取节点最后运行信息", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}

40
internal/handler/task.go Normal file
View File

@ -0,0 +1,40 @@
package handler
import (
"fmt"
"github.com/gin-gonic/gin"
"hpds-iot-web/internal/proto"
"hpds-iot-web/internal/service"
"hpds-iot-web/model"
e "hpds-iot-web/pkg/err"
)
func (s HandlerService) TaskList(c *gin.Context) (data interface{}, err error) {
repo := service.NewTaskService(s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.TaskRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("TaskList", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
data, err = repo.TaskList(c, req)
go s.SaveLog("任务列表", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}
func (s HandlerService) AddTask(c *gin.Context) (data interface{}, err error) {
repo := service.NewTaskService(s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.TaskItemRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("AddTask", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
data, err = repo.AddTask(c, req)
go s.SaveLog("新增任务", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}

View File

@ -1,6 +1,7 @@
package handler
import (
"fmt"
"github.com/gin-gonic/gin"
"hpds-iot-web/internal/proto"
"hpds-iot-web/internal/service"
@ -28,6 +29,21 @@ func (s HandlerService) GetUserInfo(c *gin.Context) (data interface{}, err error
return
}
func (s HandlerService) GetUserList(c *gin.Context) (data interface{}, err error) {
repo := service.NewUserService(s.Engine, s.Logger)
us, _ := c.Get("operatorUser")
userInfo := us.(*model.SystemUser)
var req proto.UserRequest
err = c.ShouldBindJSON(&req)
if err != nil {
go s.SaveLog("GetUserList", "Manage", "", "", req.ToString(), fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return nil, e.NewValidErr(err)
}
data, err = repo.GetUserList(c, req)
go s.SaveLog("获取用户列表", "Manage", "", "", "", fmt.Sprintf("%d", userInfo.UserId), c.Request.RemoteAddr, "")
return
}
func (s HandlerService) MenuList(c *gin.Context) (data interface{}, err error) {
repo := service.NewUserService(s.Engine, s.Logger)
us, _ := c.Get("operatorUser")

View File

@ -23,6 +23,20 @@ func (us UserLogin) ToString() string {
return string(data)
}
type UserRequest struct {
RealName string `json:"realName"`
Phone string `json:"phone"`
BasePageList
}
func (us UserRequest) ToString() string {
data, err := json.Marshal(us)
if err != nil {
return ""
}
return string(data)
}
type OwnerRequest struct {
OwnerName string `json:"ownerName"`
BasePageList
@ -301,7 +315,8 @@ func (p ServiceParamItem) ToString() string {
}
type ModelRequest struct {
ModelName string `json:"modelName"`
ModelName string `json:"modelName,omitempty"`
BizType int `json:"bizType,omitempty"`
BasePageList
}
@ -314,13 +329,22 @@ func (p ModelRequest) ToString() string {
}
type ModelItemRequest struct {
ModelId int `json:"modelId"`
ModelId int64 `json:"modelId"`
BizType int `json:"bizType"`
ModelName string `json:"modelName"`
ModelVersion string `json:"modelVersion"`
ModelDesc string `json:"modelDesc"`
ModelFiles string `json:"modelFiles"`
ModelParamsFiles string `json:"modelParamsFiles"`
ModelExecScript string `json:"modelExecScript"`
DockerFile []string `json:"dockerFile"`
MappedPort int `json:"mappedPort"`
ModelCommand string `json:"modelCommand"`
InPath string `json:"inPath"`
OutPath string `json:"outPath"`
HttpUrl string `json:"httpUrl"`
IsLightWeight bool `json:"isLightWeight"`
Workflow string `json:"workflow"`
}
func (p ModelItemRequest) ToString() string {
@ -331,6 +355,19 @@ func (p ModelItemRequest) ToString() string {
return string(data)
}
type ModelIssueRequest struct {
NodeId int64 `json:"nodeId"`
ModelId int64 `json:"modelId"`
}
func (p ModelIssueRequest) ToString() string {
data, err := json.Marshal(p)
if err != nil {
return ""
}
return string(data)
}
type DeviceRequest struct {
Key string `json:"key"`
ProductId int64 `json:"productId"`
@ -404,6 +441,20 @@ func (p BrandItemRequest) ToString() string {
return string(data)
}
type DiseaseRequest struct {
BizType int `json:"bizType"`
Key string `json:"key"`
BasePageList
}
func (p DiseaseRequest) ToString() string {
data, err := json.Marshal(p)
if err != nil {
return ""
}
return string(data)
}
type DiseaseTypeRequest struct {
CategoryId int `json:"categoryId"`
Key string `json:"key"`
@ -467,6 +518,7 @@ type ImportDatasetRequest struct {
DatasetName string `json:"datasetName"`
DatasetDesc string `json:"datasetDesc"`
ProjectId int64 `json:"projectId"`
StoreName string `json:"storeName"`
OwnerId int64 `json:"ownerId"`
Creator int64 `json:"creator"`
}
@ -478,3 +530,111 @@ func (p ImportDatasetRequest) ToString() string {
}
return string(data)
}
type NodeRequest struct {
NodeId int64 `json:"nodeId"`
NodeName string `json:"nodeName"`
NodeGuid string `json:"nodeGuid"`
NodeType int `json:"nodeType"`
Platform string `json:"platform"`
PlatformVersion string `json:"platformVersion"`
CPU string `json:"cpu"`
MemTotal uint64 `json:"memTotal"`
DiskTotal uint64 `json:"diskTotal"`
SwapTotal uint64 `json:"swapTotal"`
Arch string `json:"arch"`
Virtualization string `json:"virtualization"`
BootTime uint64 `json:"bootTime"`
IP string `json:"ip"`
CountryCode string `json:"countryCode"`
Version string `json:"version"`
CreateAt int64 `json:"createAt"`
UpdateAt int64 `json:"updateAt"`
BasePageList
}
func (p NodeRequest) ToString() string {
data, err := json.Marshal(p)
if err != nil {
return ""
}
return string(data)
}
type NodeItemRequest struct {
NodeId int64 `json:"nodeId"`
NodeName string `json:"nodeName"`
NodeGuid string `json:"nodeGuid"`
NodeType int `json:"nodeType"`
Platform string `json:"platform"`
PlatformVersion string `json:"platformVersion"`
CPU string `json:"cpu"`
MemTotal uint64 `json:"memTotal"`
DiskTotal uint64 `json:"diskTotal"`
SwapTotal uint64 `json:"swapTotal"`
Arch string `json:"arch"`
Virtualization string `json:"virtualization"`
BootTime uint64 `json:"bootTime"`
IP string `json:"ip"`
CountryCode string `json:"countryCode"`
Version string `json:"version"`
CreateAt int64 `json:"createAt"`
UpdateAt int64 `json:"updateAt"`
}
func (p NodeItemRequest) ToString() string {
data, err := json.Marshal(p)
if err != nil {
return ""
}
return string(data)
}
type NodeInfoRequest struct {
NodeGuid string `json:"nodeGuid"`
Uptime int64 `json:"uptime"`
}
func (p NodeInfoRequest) ToString() string {
data, err := json.Marshal(p)
if err != nil {
return ""
}
return string(data)
}
type TaskRequest struct {
BizType int `json:"bizType"`
TaskName string `json:"taskName"`
StartTime string `json:"startTime"`
FinishTime string `json:"finishTime"`
BasePageList
}
func (p TaskRequest) ToString() string {
data, err := json.Marshal(p)
if err != nil {
return ""
}
return string(data)
}
type TaskItemRequest struct {
TaskId int64 `json:"taskId"`
ModelId int64 `json:"modelId"`
NodeId int64 `json:"nodeId"`
TaskName string `json:"taskName"`
TaskDesc string `json:"taskDesc"`
DatasetArr int64 `json:"datasetArr"`
SubDataset string `json:"subDataset"`
SubDataTag int `json:"subDataTag"`
AppointmentTime string `json:"appointmentTime"`
}
func (p TaskItemRequest) ToString() string {
data, err := json.Marshal(p)
if err != nil {
return ""
}
return string(data)
}

View File

@ -1,5 +1,7 @@
package proto
import "hpds-iot-web/model"
// BaseResponse 基础返回结构
type BaseResponse struct {
Code int `json:"code"`
@ -56,6 +58,7 @@ type DatasetItem struct {
DatasetId int64 `json:"datasetId"`
DatasetName string `json:"datasetName"`
DatasetDesc string `json:"datasetDesc"`
StoreName string `json:"storeName"`
CategoryId int `json:"categoryId"`
ProjectId int64 `json:"projectId"`
OwnerId int64 `json:"ownerId"`
@ -63,3 +66,57 @@ type DatasetItem struct {
CreateAt int64 `json:"createAt"`
DatasetCount int64 `json:"datasetCount"`
}
type NodeState struct {
List []model.NodeState `json:"list"`
}
type NodeLastState struct {
List []NodeLastStateItem `json:"list"`
}
type NodeLastStateItem struct {
NodeId int64 `json:"nodeId"`
NodeGuid string `json:"nodeGuid"`
NodeName string `json:"nodeName"`
NodeType int `json:"nodeType"`
NodeStatus int `json:"nodeStatus"`
Platform string `json:"platform"`
PlatformVersion string `json:"platformVersion"`
CPU string `json:"CPU"`
MemTotal uint64 `json:"memTotal"`
DiskTotal uint64 `json:"diskTotal"`
SwapTotal uint64 `json:"swapTotal"`
CpuUsed float64 `json:"cpuUsed"`
MemUsed uint64 `json:"memUsed"`
SwapUsed uint64 `json:"swapUsed"`
DiskUsed uint64 `json:"diskUsed"`
NetInTransfer uint64 `json:"netInTransfer"`
NetOutTransfer uint64 `json:"netOutTransfer"`
NetInSpeed uint64 `json:"netInSpeed"`
NetOutSpeed uint64 `json:"netOutSpeed"`
Load1 float64 `json:"load1"`
Load5 float64 `json:"load5"`
Load15 float64 `json:"load15"`
TcpConnCount uint64 `json:"tcpConnCount"`
UdpConnCount uint64 `json:"udpConnCount"`
ProcessCount uint64 `json:"processCount"`
ExecTask string `json:"execTask"`
}
type TaskDetail struct {
TaskId int64 `json:"taskId"`
ModelId int64 `json:"modelId"`
ModelName string `json:"modelName"`
NodeId int64 `json:"nodeId"`
NodeName string `json:"nodeName"`
TaskName string `json:"taskName"`
TaskDesc string `json:"taskDesc"`
DatasetArr string `json:"datasetArr"`
ResultStorage string `json:"resultStorage"`
AppointmentTime string `json:"appointmentTime"`
StartTime int64 `json:"startTime"`
FinishTime int64 `json:"finishTime"`
Status int `json:"status"`
CreateAt int64 `xorm:"created" json:"createAt"`
UpdateAt int64 `xorm:"updated" json:"updateAt"`
}

View File

@ -24,6 +24,7 @@ func InitRouter(cfg *config.WebConfig, logger *logging.Logger, engine *xorm.Engi
{
user.Use(middleware.JwtAuthMiddleware(logger.Logger))
user.POST("/login", e.ErrorWrapper(hs.Login))
user.POST("/list", e.ErrorWrapper(hs.GetUserList))
user.GET("/getUserInfo", e.ErrorWrapper(hs.GetUserInfo))
menu := user.Group("/menu")
@ -113,6 +114,10 @@ func InitRouter(cfg *config.WebConfig, logger *logging.Logger, engine *xorm.Engi
device.POST("/delete", e.ErrorWrapper(hs.DeleteDevice))
}
}
//detection := manage.Group("/detection")
//{
//
//}
}
model := r.Group("/model")
{
@ -121,6 +126,8 @@ func InitRouter(cfg *config.WebConfig, logger *logging.Logger, engine *xorm.Engi
model.POST("/add", e.ErrorWrapper(hs.AddModel))
model.POST("/edit", e.ErrorWrapper(hs.EditModel))
model.POST("/delete", e.ErrorWrapper(hs.DelModel))
model.POST("/issue", e.ErrorWrapper(hs.ModelIssue))
model.POST("/workflow", e.ErrorWrapper(hs.GetModelWorkflow))
}
file := r.Group("/file")
{
@ -138,10 +145,27 @@ func InitRouter(cfg *config.WebConfig, logger *logging.Logger, engine *xorm.Engi
brand.POST("/edit", e.ErrorWrapper(hs.EditBrand))
brand.POST("/delete", e.ErrorWrapper(hs.DeleteBrand))
}
node := system.Group("/node")
{
node.POST("/list", e.ErrorWrapper(hs.NodeList))
node.POST("/info", e.ErrorWrapper(hs.NodeInfo))
node.POST("/edit", e.ErrorWrapper(hs.EditNode))
node.POST("/state", e.ErrorWrapper(hs.NodeState))
node.POST("/last", e.ErrorWrapper(hs.NodeLastState))
}
}
task := r.Group("/task")
{
task.Use(middleware.JwtAuthMiddleware(logger.Logger))
task.POST("/list", e.ErrorWrapper(hs.TaskList))
task.POST("/add", e.ErrorWrapper(hs.AddTask))
}
disease := r.Group("/disease")
{
disease.Use(middleware.JwtAuthMiddleware(logger.Logger))
disease.POST("/list", e.ErrorWrapper(hs.DiseaseList))
diseaseType := disease.Group("/type")
{
diseaseType.POST("/list", e.ErrorWrapper(hs.DiseaseTypeList))

View File

@ -106,6 +106,7 @@ func (rp *repo) DatasetList(ctx context.Context, req proto.DatasetRequest) (rsp
DatasetId: v.DatasetId,
DatasetName: v.DatasetName,
DatasetDesc: v.DatasetDesc,
StoreName: v.StoreName,
CategoryId: v.CategoryId,
ProjectId: v.ProjectId,
OwnerId: v.OwnerId,
@ -148,6 +149,7 @@ func (rp *repo) ImportDataset(ctx context.Context, req proto.ImportDatasetReques
CategoryId: req.CategoryId,
ProjectId: req.ProjectId,
OwnerId: req.OwnerId,
StoreName: req.StoreName,
Creator: req.Creator,
Status: 1,
CreateAt: time.Now().Unix(),

View File

@ -13,6 +13,7 @@ import (
)
type DiseaseService interface {
DiseaseList(ctx context.Context, req proto.DiseaseRequest) (rsp *proto.BaseResponse, err error)
DiseaseTypeList(ctx context.Context, req proto.DiseaseTypeRequest) (rsp *proto.BaseResponse, err error)
AddDiseaseType(ctx context.Context, req proto.DiseaseTypeItemRequest) (rsp *proto.BaseResponse, err error)
EditDiseaseType(ctx context.Context, req proto.DiseaseTypeItemRequest) (rsp *proto.BaseResponse, err error)
@ -27,6 +28,42 @@ func NewDiseaseService(cfg *config.WebConfig, engine *xorm.Engine, logger *loggi
}
}
func (rp *repo) DiseaseList(ctx context.Context, req proto.DiseaseRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
data := make([]model.Disease, 0)
count, err := rp.engine.Where("(? = '' or disease_name like ?)", req.Key, "%"+req.Key+"%").
And("(? = 0 or category_id = ?)", req.BizType, req.BizType).
Limit(int(req.Size), int(((req.Page)-1)*req.Size)).
FindAndCount(&data)
if err != nil {
goto ReturnPoint
}
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "成功"
rsp = FillPaging(count, req.Page, req.Size, data, rsp)
rsp.Err = err
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}
func (rp *repo) DiseaseTypeList(ctx context.Context, req proto.DiseaseTypeRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {

View File

@ -21,13 +21,14 @@ type PagingStruct struct {
}
// FillPaging 填充分页数据
func FillPaging(count int64, pageNum int64, pageSize int64, list interface{}, data *proto.BaseResponse) *proto.BaseResponse {
func FillPaging(count, pageNum, pageSize int64, list interface{}, data *proto.BaseResponse) *proto.BaseResponse {
//var tp int64
//if count%pageSize > 0 {
// tp = count/pageSize + 1
//} else {
// tp = count / pageSize
//}
_ = fmt.Sprintf("%d, %d", pageNum, pageSize)
ps := new(PagingStruct)
ps.List = list
ps.Total = count
@ -50,6 +51,7 @@ type UserService interface {
Login(ctx context.Context, userName, pass string) (rsp *proto.BaseResponse, err error)
GetUserInfo(ctx context.Context, userId int64) (rsp *proto.BaseResponse, err error)
MenuList(ctx context.Context, userId int64) (rsp *proto.BaseResponse, err error)
GetUserList(ctx context.Context, req proto.UserRequest) (rsp *proto.BaseResponse, err error)
}
func NewUserService(engine *xorm.Engine, logger *logging.Logger) UserService {
@ -245,3 +247,39 @@ ReturnPoint:
}
return rsp, err
}
func (rp *repo) GetUserList(ctx context.Context, req proto.UserRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
data := make([]model.SystemUser, 0)
count, err := rp.engine.Where("(? = '' or phone like ?)", req.Phone, "%"+req.Phone+"%").
And("(? = '' or real_name like ?)", req.RealName, req.RealName).
And("status = 1").Limit(int(req.Size), int(((req.Page)-1)*req.Size)).
FindAndCount(&data)
if err != nil {
goto ReturnPoint
}
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "成功"
rsp = FillPaging(count, req.Page, req.Size, data, rsp)
rsp.Err = err
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}

View File

@ -2,11 +2,16 @@ package service
import (
"context"
"encoding/json"
"fmt"
"git.hpds.cc/Component/logging"
"git.hpds.cc/pavement/hpds_node"
"hpds-iot-web/config"
"hpds-iot-web/internal/proto"
"hpds-iot-web/model"
"hpds-iot-web/mq"
"net/http"
"strings"
"time"
"xorm.io/xorm"
)
@ -16,10 +21,13 @@ type ModelService interface {
AddModel(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error)
EditModel(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error)
DelModel(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error)
ModelIssue(ctx context.Context, req proto.ModelIssueRequest) (rsp *proto.BaseResponse, err error)
GetModelWorkflow(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error)
}
func NewModelService(engine *xorm.Engine, logger *logging.Logger) ModelService {
func NewModelService(cfg *config.WebConfig, engine *xorm.Engine, logger *logging.Logger) ModelService {
return &repo{
AppConfig: cfg,
engine: engine,
logger: logger,
}
@ -73,11 +81,19 @@ func (rp *repo) AddModel(ctx context.Context, req proto.ModelItemRequest) (rsp *
default:
item := &model.Model{
ModelName: req.ModelName,
BizType: req.BizType,
ModelVersion: req.ModelVersion,
ModelDesc: req.ModelDesc,
ModelFiles: req.ModelFiles,
ModelParamsFiles: req.ModelParamsFiles,
ModelExecScript: req.ModelExecScript,
DockerFile: strings.Join(req.DockerFile, "|"),
MappedPort: req.MappedPort,
ModelCommand: req.ModelCommand,
InPath: req.InPath,
OutPath: req.OutPath,
HttpUrl: req.HttpUrl,
IsLightWeight: req.IsLightWeight,
Status: 1,
CreateAt: time.Now().Unix(),
UpdateAt: time.Now().Unix(),
@ -142,6 +158,32 @@ func (rp *repo) EditModel(ctx context.Context, req proto.ModelItemRequest) (rsp
if len(req.ModelExecScript) > 0 {
item.ModelExecScript = req.ModelExecScript
}
if req.BizType > 0 {
item.BizType = req.BizType
}
if len(req.DockerFile) > 0 {
item.DockerFile = strings.Join(req.DockerFile, "|")
}
if req.MappedPort > 0 {
item.MappedPort = req.MappedPort
}
if len(req.ModelCommand) > 0 {
item.ModelCommand = req.ModelCommand
}
if len(req.InPath) > 0 {
item.InPath = req.InPath
}
if len(req.OutPath) > 0 {
item.OutPath = req.OutPath
}
if len(req.HttpUrl) > 0 {
item.HttpUrl = req.HttpUrl
}
if len(req.Workflow) > 0 {
item.Workflow = req.Workflow
}
item.IsLightWeight = req.IsLightWeight
item.UpdateAt = time.Now().Unix()
_, err = rp.engine.ID(req.ModelId).AllCols().Update(item)
if err != nil {
@ -206,3 +248,123 @@ ReturnPoint:
}
return rsp, err
}
func (rp *repo) ModelIssue(ctx context.Context, req proto.ModelIssueRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
var h bool
m := new(model.Model)
h, err = rp.engine.ID(req.ModelId).Get(m)
if err != nil {
goto ReturnPoint
}
if !h {
err = fmt.Errorf("未能找到对应的模型")
goto ReturnPoint
}
node := new(model.Node)
h, err = rp.engine.ID(req.NodeId).Get(node)
if err != nil {
goto ReturnPoint
}
if !h {
err = fmt.Errorf("未能找到对应的节点")
goto ReturnPoint
}
item := new(model.IssueModel)
h, err = rp.engine.Where("model_id = ? and node_id= ?", req.ModelId, req.NodeId).Get(item)
if err != nil {
goto ReturnPoint
}
if h {
err = fmt.Errorf("已经有该模型")
goto ReturnPoint
}
item.ModelId = req.ModelId
item.NodeId = req.NodeId
item.CreateAt = time.Now().Unix()
item.UpdateAt = time.Now().Unix()
_, err = rp.engine.Insert(item)
if err != nil {
goto ReturnPoint
}
mqClient := mq.GetMqClient("task-request", 1)
payload := make(map[string]interface{})
payload["modelId"] = item.ModelId
payload["modelVersion"] = m.ModelVersion
payload["modelCommand"] = m.ModelCommand
payload["nodeId"] = item.NodeId
payload["dockerFile"] = m.DockerFile
payload["mappedPort"] = m.MappedPort
payload["inPath"] = m.InPath
payload["outPath"] = m.OutPath
payload["httpUrl"] = m.HttpUrl
payload["nodeGuid"] = node.NodeGuid
mqPayload := &mq.InstructionReq{
Command: mq.ModelIssue,
Payload: payload,
}
b, _ := json.Marshal(mqPayload)
err = mq.GenerateAndSendData(mqClient.EndPoint.(hpds_node.AccessPoint), b, rp.logger)
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "模型下发成功"
rsp.Err = ctx.Err()
rsp.Data = item
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}
func (rp *repo) GetModelWorkflow(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
var h bool
item := new(model.Model)
h, err = rp.engine.ID(req.ModelId).Get(item)
if err != nil {
goto ReturnPoint
}
if !h {
err = fmt.Errorf("未能找到对应的模型")
goto ReturnPoint
}
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "删除模型成功"
rsp.Err = ctx.Err()
rsp.Data = item.Workflow
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}

View File

@ -17,6 +17,12 @@ type SystemService interface {
AddBrand(ctx context.Context, req proto.BrandItemRequest) (rsp *proto.BaseResponse, err error)
EditBrand(ctx context.Context, req proto.BrandItemRequest) (rsp *proto.BaseResponse, err error)
DeleteBrand(ctx context.Context, req proto.BrandItemRequest) (rsp *proto.BaseResponse, err error)
NodeList(ctx context.Context, req proto.NodeRequest) (rsp *proto.BaseResponse, err error)
NodeInfo(ctx context.Context, req proto.NodeItemRequest) (rsp *proto.BaseResponse, err error)
EditNode(ctx context.Context, req proto.NodeItemRequest) (rsp *proto.BaseResponse, err error)
NodeState(ctx context.Context, req proto.NodeInfoRequest) (rsp *proto.BaseResponse, err error)
NodeLastState(ctx context.Context, req proto.NodeInfoRequest) (rsp *proto.BaseResponse, err error)
}
func NewSystemService(engine *xorm.Engine, logger *logging.Logger) SystemService {
@ -231,3 +237,244 @@ ReturnPoint:
}
return rsp, err
}
func (rp *repo) NodeList(ctx context.Context, req proto.NodeRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
data := make([]model.Node, 0)
var count int64
count, err = rp.engine.Where("(? = '' or node_name like ?)", req.NodeName, "%"+req.NodeName+"%").
And("(? = '' or node_guid like ?)", req.NodeGuid, "%"+req.NodeGuid+"%").
And("(? = 0 or node_type = ?)", req.NodeType, req.NodeType).
And("node_status = 1").Limit(int(req.Size), int(((req.Page)-1)*req.Size)).
FindAndCount(&data)
if err != nil {
goto ReturnPoint
}
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "成功"
rsp = FillPaging(count, req.Page, req.Size, data, rsp)
rsp.Err = err
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}
func (rp *repo) NodeInfo(ctx context.Context, req proto.NodeItemRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
var h bool
item := new(model.Node)
h, err = rp.engine.ID(req.NodeId).Get(item)
if err != nil {
goto ReturnPoint
}
if !h {
err = fmt.Errorf("未能找到对应的节点")
goto ReturnPoint
}
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "获取节点成功"
rsp.Err = ctx.Err()
rsp.Data = item
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}
func (rp *repo) EditNode(ctx context.Context, req proto.NodeItemRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
var h bool
item := new(model.Node)
h, err = rp.engine.ID(req.NodeId).Get(item)
if err != nil {
goto ReturnPoint
}
if !h {
err = fmt.Errorf("未能找到对应的节点")
goto ReturnPoint
}
if len(req.NodeName) > 0 {
item.NodeName = req.NodeName
}
if req.NodeType > 0 {
item.NodeType = req.NodeType
}
if len(req.Platform) > 0 {
item.Platform = req.Platform
}
if len(req.PlatformVersion) > 0 {
item.PlatformVersion = req.PlatformVersion
}
if len(req.CPU) > 0 {
item.CPU = req.CPU
}
if req.MemTotal > 0 {
item.MemTotal = req.MemTotal
}
if req.DiskTotal > 0 {
item.DiskTotal = req.DiskTotal
}
if req.SwapTotal > 0 {
item.SwapTotal = req.SwapTotal
}
if len(req.Arch) > 0 {
item.Arch = req.Arch
}
if len(req.Virtualization) > 0 {
item.Virtualization = req.Virtualization
}
if req.BootTime > 0 {
item.BootTime = req.BootTime
}
if len(req.IP) > 0 {
item.IP = req.IP
}
if len(req.CountryCode) > 0 {
item.CountryCode = req.CountryCode
}
if len(req.Version) > 0 {
item.Version = req.Version
}
item.UpdateAt = time.Now().Unix()
_, err = rp.engine.ID(req.NodeId).AllCols().Update(item)
if err != nil {
goto ReturnPoint
}
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "修改节点成功"
rsp.Err = ctx.Err()
rsp.Data = item
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}
func (rp *repo) NodeState(ctx context.Context, req proto.NodeInfoRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
list := make([]model.NodeState, 0)
err = rp.engine.Where("node_name = ?", req.NodeGuid).
And(" uptime > UNIX_TIMESTAMP(DATE_ADD(NOW(),INTERVAL -24 HOUR))").
Find(&list)
if err != nil {
goto ReturnPoint
}
state := new(proto.NodeState)
state.List = list
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "获取节点状态信息成功"
rsp.Err = ctx.Err()
rsp.Data = state
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}
func (rp *repo) NodeLastState(ctx context.Context, req proto.NodeInfoRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
list := make([]proto.NodeLastStateItem, 0)
//err = rp.engine.Where("node_name = ?", req.NodeGuid).
// And("? + uptime > UNIX_TIMESTAMP(DATE_ADD(NOW(),INTERVAL -24 HOUR))", req.Uptime).Desc("uptime").
// Find(&list)
err = rp.engine.SQL(`select c.node_id,c.node_name,c.node_guid,c.node_type,c.node_type,c.platform,c.platform_version,c.c_p_u,c.mem_total,
c.disk_total,c.swap_total,a.c_p_u cpu_used,c.node_status, a.mem_used,a.swap_used, a.disk_used, a.net_in_transfer, a.net_in_speed,
a.net_out_speed, a.net_out_transfer, a.load1, a.load5, a.load15, a.tcp_conn_count, a.udp_conn_count, a.process_count,
d.task_name exec_task from node_state a , (select node_name, max(uptime) uptime from node_state group by node_name) b, node c
left join (select t2.node_id, t2.task_name from task t2, (select node_id, max(start_time) start from task group by node_id) t1 where t2.node_id = t1.node_id and t2.start_time = t1.start and t2.status = 1) d on c.node_id = d.node_id
where a.node_name = b.node_name and a.uptime = b.uptime and a.node_name = c.node_guid and c.node_status > 0 and (? = '' or a.node_name = ?) `, req.NodeGuid, req.NodeGuid).Find(&list)
if err != nil {
goto ReturnPoint
}
state := new(proto.NodeLastState)
state.List = list
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "获取节点状态信息成功"
rsp.Err = ctx.Err()
rsp.Data = state
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}

178
internal/service/task.go Normal file
View File

@ -0,0 +1,178 @@
package service
import (
"context"
"encoding/json"
"fmt"
"git.hpds.cc/Component/logging"
"git.hpds.cc/pavement/hpds_node"
"hpds-iot-web/internal/proto"
"hpds-iot-web/model"
"hpds-iot-web/mq"
"net/http"
"time"
"xorm.io/xorm"
)
type TaskService interface {
TaskList(ctx context.Context, req proto.TaskRequest) (rsp *proto.BaseResponse, err error)
AddTask(ctx context.Context, req proto.TaskItemRequest) (rsp *proto.BaseResponse, err error)
//EditTask(ctx context.Context, req proto.ModelItemRequest) (rsp *proto.BaseResponse, err error)
}
func NewTaskService(engine *xorm.Engine, logger *logging.Logger) TaskService {
return &repo{
engine: engine,
logger: logger,
}
}
func (rp *repo) TaskList(ctx context.Context, req proto.TaskRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
data := make([]proto.TaskDetail, 0)
count, err := rp.engine.Table("task").Alias("t").
Join("inner", []string{"model", "m"}, "t.model_id = m.model_id").
Join("inner", []string{"node", "n"}, "t.node_id = n.node_id").
Cols("t.*", "m.model_name", "n.node_name").
Where("(? = 0 or m.biz_type = ?)", req.BizType, req.BizType).
And("(?='' or task_name like ?)", req.TaskName, "%"+req.TaskName+"%").
And("t.start_time >= unix_timestamp(?)", req.StartTime).
And("? = 0 or t.start_time <= unix_timestamp(?)", req.FinishTime, req.FinishTime).
And("t.status > 0").Limit(int(req.Size), int(((req.Page)-1)*req.Size)).
FindAndCount(&data)
if err != nil {
goto ReturnPoint
}
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "成功"
rsp = FillPaging(count, req.Page, req.Size, data, rsp)
rsp.Err = err
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}
func (rp *repo) AddTask(ctx context.Context, req proto.TaskItemRequest) (rsp *proto.BaseResponse, err error) {
rsp = new(proto.BaseResponse)
select {
case <-ctx.Done():
err = fmt.Errorf("超时/取消")
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Message = "超时/取消"
rsp.Err = ctx.Err()
return rsp, ctx.Err()
default:
var h bool
m := new(model.Model)
h, err = rp.engine.ID(req.ModelId).Get(m)
if err != nil {
goto ReturnPoint
}
if !h {
err = fmt.Errorf("未能找到对应的模型")
goto ReturnPoint
}
ds := new(model.Dataset)
h, err = rp.engine.ID(req.ModelId).Get(ds)
if err != nil {
goto ReturnPoint
}
if !h {
err = fmt.Errorf("未能找到对应的数据集")
goto ReturnPoint
}
item := &model.Task{
ModelId: req.ModelId,
NodeId: req.NodeId,
TaskName: req.TaskName,
TaskDesc: req.TaskDesc,
DatasetArr: fmt.Sprintf("%d", req.DatasetArr),
SubDataset: req.SubDataset,
SubDataTag: req.SubDataTag,
AppointmentTime: req.AppointmentTime,
Status: 1,
CreateAt: time.Now().Unix(),
UpdateAt: time.Now().Unix(),
}
if len(req.AppointmentTime) > 0 {
var appTime time.Time
appTime, err = time.ParseInLocation("2006-01-02 15:04:05", req.AppointmentTime, time.Local)
if err != nil {
err = fmt.Errorf("时间格式不匹配")
goto ReturnPoint
}
item.StartTime = appTime.Unix()
} else {
item.StartTime = time.Now().Unix()
}
_, err = rp.engine.Insert(item)
if err != nil {
goto ReturnPoint
}
//reg, _ := regexp.Compile("\\[.*?\\]")
//if ok := reg.FindAll([]byte(item.ResultStorage), 2); len(ok) > 0 {
// item.ResultStorage = reg.ReplaceAllString(item.ResultStorage, fmt.Sprintf("%d_%d", item.TaskId, item.ModelId))
// _, err = rp.engine.ID(item.TaskId).Cols("result_storage").Update(item)
// if err != nil {
// goto ReturnPoint
// }
//}
payload := make(map[string]interface{})
payload["taskId"] = item.TaskId
payload["modelId"] = item.ModelId
payload["modelVersion"] = m.ModelVersion
payload["modelCommand"] = m.ModelCommand
payload["nodeId"] = item.NodeId
payload["inPath"] = m.InPath
payload["outPath"] = m.OutPath
payload["httpUrl"] = m.HttpUrl
payload["datasetArr"] = item.DatasetArr
payload["datasetPath"] = ds.StoreName
payload["datasetName"] = ds.DatasetName
payload["subDataset"] = item.SubDataset
payload["subDataTag"] = item.SubDataTag
payload["workflow"] = m.Workflow
mqClient := mq.GetMqClient("task-request", 1)
mqPayload := &mq.InstructionReq{
Command: mq.TaskAdd,
Payload: payload,
}
pData, _ := json.Marshal(mqPayload)
err = mq.GenerateAndSendData(mqClient.EndPoint.(hpds_node.AccessPoint), pData, rp.logger)
rsp.Code = http.StatusOK
rsp.Status = http.StatusText(http.StatusOK)
rsp.Message = "新增任务成功"
rsp.Err = ctx.Err()
rsp.Data = item
return rsp, err
}
ReturnPoint:
if err != nil {
rsp.Code = http.StatusInternalServerError
rsp.Status = http.StatusText(http.StatusInternalServerError)
rsp.Err = err
rsp.Message = "失败"
}
return rsp, err
}

View File

@ -4,6 +4,7 @@ type Dataset struct {
DatasetId int64 `xorm:"not null pk autoincr INT(11)" json:"datasetId"`
DatasetName string `xorm:"varchar(200) not null" json:"datasetName"`
DatasetDesc string `xorm:"varchar(200)" json:"datasetDesc"`
StoreName string `xorm:"varchar(200)" json:"storeName"`
CategoryId int `xorm:"not null SMALLINT default 1" json:"categoryId"` //业务分类, 1:道路 2:桥梁 3:隧道 4:边坡
ProjectId int64 `xorm:"INT(11) not null default 0 index" json:"projectId"`
OwnerId int64 `xorm:"INT(11) not null default 0 index" json:"ownerId"`

View File

@ -10,6 +10,7 @@ type Disease struct {
DetectionMethod string `xorm:"varchar(200) not null " json:"detectionMethod"`
DiseaseDesc string `xorm:"TEXT" json:"diseaseDesc"`
Reference string `xorm:"varchar(200) not null " json:"reference"` //参照标准
StoreName string `xorm:"varchar(200)" json:"storeName"` //存储路径
CreateAt int64 `xorm:"created" json:"createAt"`
UpdateAt int64 `xorm:"updated" json:"updateAt"`
}

View File

@ -17,9 +17,9 @@ var (
Redis *redis.Client
)
func New(driveName, dsn string) {
DB, _ = NewDbConnection(dsn)
DB.ShowSQL(true)
func New(driveName, dsn string, showSql bool) {
DB, _ = NewDbConnection(driveName, dsn)
DB.ShowSQL(showSql)
DB.Dialect().SetQuotePolicy(dialects.QuotePolicyReserved)
err := DB.Sync2(
&Brand{},
@ -42,6 +42,8 @@ func New(driveName, dsn string) {
&Model{},
&ModelVersion{},
&Node{},
&NodeState{},
&IssueModel{},
&OriginalData{},
&Owner{},
&Project{},
@ -52,6 +54,8 @@ func New(driveName, dsn string) {
&SystemRole{},
&SystemUser{},
&SystemUserRole{},
&Task{},
&TaskResult{},
)
if err != nil {
zap.L().Error("同步数据库表结构", zap.Error(err))
@ -59,8 +63,8 @@ func New(driveName, dsn string) {
}
}
func NewDbConnection(dsn string) (db *xorm.Engine, err error) {
db, err = xorm.NewEngine("mysql", dsn)
func NewDbConnection(driveName, dsn string) (db *xorm.Engine, err error) {
db, err = xorm.NewEngine(driveName, dsn)
if err != nil {
zap.L().Error("创建数据库连接", zap.Error(err))
os.Exit(-1)

View File

@ -3,11 +3,20 @@ package model
type Model struct {
ModelId int `xorm:"not null pk autoincr INT(11)" json:"modelId"`
ModelName string `xorm:"varchar(200) not null" json:"modelName"`
BizType int `xorm:"int not null default 1" json:"bizType"`
ModelVersion string `xorm:"varchar(50) not null" json:"modelVersion"`
ModelDesc string `xorm:"varchar(200) not null" json:"modelDesc"`
ModelFiles string `xorm:"varchar(200) not null" json:"modelFiles"`
ModelParamsFiles string `xorm:"varchar(200)" json:"modelParamsFiles"`
ModelExecScript string `xorm:"varchar(200)" json:"modelExecScript"`
DockerFile string `xorm:"varchar(200)" json:"dockerFile"`
MappedPort int `xorm:"int" json:"mappedPort"`
ModelCommand string `xorm:"varchar(200)" json:"modelCommand"`
InPath string `xorm:"varchar(200)" json:"inPath"`
OutPath string `xorm:"varchar(200)" json:"outPath"`
HttpUrl string `xorm:"varchar(200)" json:"httpUrl"`
IsLightWeight bool `xorm:"TINYINT(1) default 0" json:"isLightWeight"`
Workflow string `xorm:"TEXT" json:"workflow"`
Status int `xorm:"not null SMALLINT default 0" json:"status"`
CreateAt int64 `xorm:"created" json:"createAt"`
UpdateAt int64 `xorm:"updated" json:"updateAt"`

11
model/modelIssue.go Normal file
View File

@ -0,0 +1,11 @@
package model
type IssueModel struct {
Id int64 `xorm:"not null pk autoincr INT(11)" json:"id"`
ModelId int64 `xorm:"INT(11) index" json:"modelId"`
NodeId int64 `xorm:"INT(11) index" json:"nodeId"`
Status int `xorm:"not null SMALLINT default 0" json:"status"`
IssueResult string `xorm:"TEXT" json:"issueResult"`
CreateAt int64 `xorm:"created" json:"createAt"`
UpdateAt int64 `xorm:"updated" json:"updateAt"`
}

View File

@ -1,10 +1,26 @@
package model
// Node 节点信息
type Node struct {
NodeId int `xorm:"not null pk autoincr INT(11)" json:"nodeId"`
NodeName string `xorm:"varchar(50) not null" json:"nodeName"`
NodeId int64 `xorm:"not null pk autoincr INT(11)" json:"nodeId"`
NodeGuid string `xorm:"varchar(100) index" json:"nodeGuid"`
NodeName string `xorm:"varchar(100)" json:"nodeName"`
NodeType int `xorm:"not null SMALLINT default 0" json:"nodeType"`
NodeStatus int `xorm:"not null SMALLINT default 0" json:"nodeStatus"`
Platform string `xorm:"varchar(100)" json:"platform,omitempty"`
PlatformVersion string `xorm:"varchar(100)" json:"platformVersion,omitempty"`
CPU string `xorm:"varchar(1000)" json:"cpu,omitempty"`
MemTotal uint64 `xorm:"BIGINT" json:"memTotal,omitempty"`
DiskTotal uint64 `xorm:"BIGINT" json:"diskTotal,omitempty"`
SwapTotal uint64 `xorm:"BIGINT" json:"swapTotal,omitempty"`
Arch string `xorm:"varchar(1000)" json:"arch,omitempty"`
Virtualization string `xorm:"varchar(1000)" json:"virtualization,omitempty"`
BootTime uint64 `xorm:"BIGINT" json:"bootTime,omitempty"`
IP string `xorm:"varchar(100)" json:"ip"`
IsGpu bool `xorm:"TINYINT(1) default 0" json:"isGpu"`
IsTaskExecute bool `xorm:"TINYINT(1) default 0" json:"isTaskExecute"`
CountryCode string `xorm:"varchar(100)" json:"countryCode,omitempty"`
Version string `xorm:"varchar(100)" json:"version,omitempty"`
CreateAt int64 `xorm:"created" json:"createAt"`
UpdateAt int64 `xorm:"updated" json:"updateAt"`
}

21
model/nodeState.go Normal file
View File

@ -0,0 +1,21 @@
package model
// NodeState 节点状态信息
type NodeState struct {
Uptime uint64 `xorm:"BIGINT pk" json:"uptime,omitempty"`
NodeName string `xorm:"varchar(100) pk" json:"nodeName"`
CPU float64 `xorm:"DECIMAL(18,4)" json:"cpu,omitempty"`
MemUsed uint64 `xorm:"BIGINT" json:"memUsed,omitempty"`
SwapUsed uint64 `xorm:"BIGINT" json:"swapUsed,omitempty"`
DiskUsed uint64 `xorm:"BIGINT" json:"diskUsed,omitempty"`
NetInTransfer uint64 `xorm:"BIGINT" json:"netInTransfer,omitempty"`
NetOutTransfer uint64 `xorm:"BIGINT" json:"netOutTransfer,omitempty"`
NetInSpeed uint64 `xorm:"BIGINT" json:"netInSpeed,omitempty"`
NetOutSpeed uint64 `xorm:"BIGINT" json:"netOutSpeed,omitempty"`
Load1 float64 `xorm:"DECIMAL(18,4)" json:"load1,omitempty"`
Load5 float64 `xorm:"DECIMAL(18,4)" json:"load5,omitempty"`
Load15 float64 `xorm:"DECIMAL(18,4)" json:"load15,omitempty"`
TcpConnCount uint64 `xorm:"BIGINT" json:"tcpConnCount,omitempty"`
UdpConnCount uint64 `xorm:"BIGINT" json:"udpConnCount,omitempty"`
ProcessCount uint64 `xorm:"BIGINT" json:"processCount,omitempty"`
}

View File

@ -14,6 +14,7 @@ type SystemUser struct {
Pass string `xorm:"VARCHAR(128) not null" json:"pass"` //密码
Salt string `xorm:"VARCHAR(32) not null" json:"salt"` //盐
RealName string `xorm:"VARCHAR(50)" json:"realName"` //真实姓名
Status int `xorm:"not null SMALLINT default 1" json:"status"` //是否禁用
CreateAt int64 `xorm:"created" json:"createAt"` //创建时间
UpdateAt int64 `xorm:"updated" json:"updateAt"` //更新时间
}

18
model/task.go Normal file
View File

@ -0,0 +1,18 @@
package model
type Task struct {
TaskId int64 `xorm:"not null pk autoincr INT(11)" json:"taskId"`
ModelId int64 `xorm:"INT(11) index" json:"modelId"`
NodeId int64 `xorm:"INT(11) index" json:"nodeId"`
TaskName string `xorm:"VARCHAR(200)" json:"taskName"`
TaskDesc string `xorm:"VARCHAR(500)" json:"taskDesc"`
DatasetArr string `xorm:"TEXT" json:"datasetArr"`
SubDataset string `xorm:"varchar(100)" json:"subDataset"`
SubDataTag int `xorm:"tinyint" json:"subDataTag"`
AppointmentTime string `xorm:"VARCHAR(30)" json:"appointmentTime"`
StartTime int64 `xorm:"BIGINT" json:"startTime"`
FinishTime int64 `xorm:"BIGINT" json:"finishTime"`
Status int `xorm:"not null SMALLINT default 0" json:"status"` // 1:等待执行; 2:执行中; 3:执行完成; 4:任务分配失败; 5:任务执行失败
CreateAt int64 `xorm:"created" json:"createAt"`
UpdateAt int64 `xorm:"updated" json:"updateAt"`
}

15
model/taskResult.go Normal file
View File

@ -0,0 +1,15 @@
package model
type TaskResult struct {
ResultId int64 `xorm:"not null pk autoincr INT(11)" json:"resultId"`
TaskId int64 `xorm:"INT(11) index" json:"taskId"`
TaskCode string `xorm:"varchar(200)" json:"taskCode"`
ModelId int64 `xorm:"INT(11)" json:"modelId"`
NodeId int64 `xorm:"INT(11)" json:"nodeId"`
StartTime int64 `xorm:"BIGINT" json:"startTime"`
FinishTime int64 `xorm:"BIGINT" json:"finishTime"`
SubDataset string `xorm:"varchar(200)" json:"subDataset"`
DatasetId int64 `xorm:"INT(11)" json:"datasetId"`
SrcPath string `xorm:"varchar(500)" json:"srcPath"`
Result string `xorm:"TEXT" json:"result"`
}

93
mq/index.go Normal file
View File

@ -0,0 +1,93 @@
package mq
import (
"fmt"
"git.hpds.cc/Component/logging"
"go.uber.org/zap"
"hpds-iot-web/config"
"os"
"time"
"git.hpds.cc/pavement/hpds_node"
)
var MqList []HpdsMqNode
type HpdsMqNode struct {
MqType uint
Topic string
Node config.HpdsNode
EndPoint interface{}
}
func must(logger *logging.Logger, err error) {
if err != nil {
if logger != nil {
logger.With(zap.String("web节点", "错误信息")).Error("启动错误", zap.Error(err))
} else {
_, _ = fmt.Fprint(os.Stderr, err)
}
os.Exit(1)
}
}
func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *logging.Logger) (mqList []HpdsMqNode, err error) {
mqList = make([]HpdsMqNode, 0)
for _, v := range funcs {
switch v.MqType {
case 2:
sf := hpds_node.NewStreamFunction(
v.Name,
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
hpds_node.WithObserveDataTags(v.DataTag),
hpds_node.WithCredential(node.Token),
)
err = sf.Connect()
nodeInfo := HpdsMqNode{
MqType: 2,
Topic: v.Name,
Node: node,
EndPoint: sf,
}
must(logger, err)
mqList = append(mqList, nodeInfo)
default:
ap := hpds_node.NewAccessPoint(
v.Name,
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
hpds_node.WithCredential(node.Token),
)
err = ap.Connect()
nodeInfo := HpdsMqNode{
MqType: 1,
Topic: v.Name,
Node: node,
EndPoint: ap,
}
must(logger, err)
ap.SetDataTag(v.DataTag)
mqList = append(mqList, nodeInfo)
}
}
return mqList, err
}
func GetMqClient(topic string, mqType uint) *HpdsMqNode {
for _, v := range MqList {
if v.Topic == topic && v.MqType == mqType {
return &v
}
}
return nil
}
func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte, logger *logging.Logger) error {
logger.With(zap.String("web节点", "发送消息")).Info("数据", zap.String("发送的数据", string(data)))
_, err := stream.Write(data)
if err != nil {
return err
}
time.Sleep(1000 * time.Millisecond)
return nil
}

11
mq/instruction.go Normal file
View File

@ -0,0 +1,11 @@
package mq
const (
TaskAdd = iota + 1
ModelIssue
)
type InstructionReq struct {
Command int `json:"command"`
Payload interface{} `json:"payload"`
}