151 lines
4.2 KiB
Go
151 lines
4.2 KiB
Go
package mq
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/minio/minio-go/v7"
|
|
"os"
|
|
"path"
|
|
"time"
|
|
|
|
"git.hpds.cc/pavement/hpds_node"
|
|
"go.uber.org/zap"
|
|
|
|
"data_minio/config"
|
|
"data_minio/global"
|
|
"data_minio/model"
|
|
)
|
|
|
|
var MqList []HpdsMqNode
|
|
|
|
type HpdsMqNode struct {
|
|
MqType uint
|
|
Topic string
|
|
Node config.HpdsNode
|
|
EndPoint interface{}
|
|
}
|
|
|
|
func must(err error) {
|
|
if err != nil {
|
|
global.Logger.With(zap.String("数据落地服务", "错误信息")).Error("启动错误", zap.Error(err))
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode) (mqList []HpdsMqNode, err error) {
|
|
mqList = make([]HpdsMqNode, 0)
|
|
for _, v := range funcs {
|
|
switch v.MqType {
|
|
case 2:
|
|
sf := hpds_node.NewStreamFunction(
|
|
v.Name,
|
|
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
|
|
hpds_node.WithObserveDataTags(v.DataTag),
|
|
hpds_node.WithCredential(node.Token),
|
|
)
|
|
_ = sf.SetHandler(handler)
|
|
err = sf.Connect()
|
|
nodeInfo := HpdsMqNode{
|
|
MqType: 2,
|
|
Topic: v.Name,
|
|
Node: node,
|
|
EndPoint: sf,
|
|
}
|
|
must(err)
|
|
|
|
mqList = append(mqList, nodeInfo)
|
|
default:
|
|
ap := hpds_node.NewAccessPoint(
|
|
v.Name,
|
|
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
|
|
hpds_node.WithCredential(node.Token),
|
|
)
|
|
err = ap.Connect()
|
|
nodeInfo := HpdsMqNode{
|
|
MqType: 1,
|
|
Topic: v.Name,
|
|
Node: node,
|
|
EndPoint: ap,
|
|
}
|
|
must(err)
|
|
ap.SetDataTag(v.DataTag)
|
|
mqList = append(mqList, nodeInfo)
|
|
}
|
|
|
|
}
|
|
return mqList, err
|
|
}
|
|
|
|
func GetMqClient(topic string, mqType uint) *HpdsMqNode {
|
|
for _, v := range MqList {
|
|
if v.Topic == topic && v.MqType == mqType {
|
|
return &v
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte) error {
|
|
_, err := stream.Write(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
time.Sleep(1000 * time.Millisecond)
|
|
return nil
|
|
}
|
|
|
|
func handler(data []byte) (byte, []byte) {
|
|
req := new(InstructionReq)
|
|
err := json.Unmarshal(data, req)
|
|
if err != nil {
|
|
global.Must(err)
|
|
} else {
|
|
switch req.Command {
|
|
case DatasetRequest:
|
|
payload := req.Payload.(map[string]interface{})
|
|
if int64(payload["fileSize"].(float64)) > 0 {
|
|
dt, err := model.GetDatasetById(int64(payload["datasetId"].(float64)))
|
|
global.Must(err)
|
|
var fileContent []byte
|
|
if payload["isCompress"].(bool) {
|
|
decodeBytes, _ := base64.StdEncoding.DecodeString(payload["file"].(string))
|
|
fileContent, err = UnCompress(decodeBytes)
|
|
} else {
|
|
fileContent = []byte(payload["file"].(string))
|
|
}
|
|
if err != nil {
|
|
global.Logger.With(zap.String("文件名称", payload["filePath"].(string))).With(zap.Int64("文件大小", int64(payload["fileSize"].(float64)))).
|
|
Error("错误信息", zap.Error(err))
|
|
}
|
|
fileReader := bytes.NewReader(fileContent)
|
|
info, err := global.MinioClient.PutObject(context.Background(), global.Cfg.Minio.Bucket, path.Join(model.GetDatasetScene(dt.CategoryId), dt.StoreName, payload["filePath"].(string)), fileReader, int64(payload["fileSize"].(float64)), minio.PutObjectOptions{ContentType: "application/octet-stream"})
|
|
if err != nil {
|
|
global.Logger.With(zap.String("文件名称", payload["filePath"].(string))).With(zap.Int64("文件大小", int64(payload["fileSize"].(float64)))).
|
|
Error("错误信息", zap.Error(err))
|
|
}
|
|
accessUrl := fmt.Sprintf("%s://%s", global.Cfg.Minio.Protocol, path.Join(global.Cfg.Minio.Endpoint, global.Cfg.Minio.Bucket, info.Key))
|
|
|
|
global.Logger.With(zap.String("文件名称", payload["filePath"].(string))).With(zap.String("访问路径", accessUrl)).
|
|
With(zap.Any("返回数据", info)).Info("文件上传到minio")
|
|
fm := new(model.FileManager)
|
|
fm.FileName = payload["fileName"].(string)
|
|
fm.AccessUrl = accessUrl
|
|
fm.Scene = fmt.Sprintf("%d", dt.CategoryId)
|
|
fm.DataType = int(payload["dataType"].(float64))
|
|
fm.DatasetId = dt.DatasetId
|
|
fm.FileSize = int64(payload["fileSize"].(float64))
|
|
fm.FileMd5 = payload["fileMd5"].(string)
|
|
fm.Creator = 0
|
|
fm.CreateAt = time.Now().Unix()
|
|
fm.UpdateAt = time.Now().Unix()
|
|
_, err = global.DB.Insert(fm)
|
|
global.Must(err)
|
|
}
|
|
}
|
|
}
|
|
return 0x12, nil
|
|
}
|