package mq import ( "bytes" "context" "encoding/base64" "encoding/json" "fmt" "github.com/minio/minio-go/v7" "os" "time" "git.hpds.cc/pavement/hpds_node" "go.uber.org/zap" "data_minio/config" "data_minio/global" "data_minio/model" ) var MqList []HpdsMqNode type HpdsMqNode struct { MqType uint Topic string Node config.HpdsNode EndPoint interface{} } func must(err error) { if err != nil { global.Logger.With(zap.String("数据落地服务", "错误信息")).Error("启动错误", zap.Error(err)) os.Exit(1) } } func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode) (mqList []HpdsMqNode, err error) { mqList = make([]HpdsMqNode, 0) for _, v := range funcs { switch v.MqType { case 2: sf := hpds_node.NewStreamFunction( v.Name, hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)), hpds_node.WithObserveDataTags(v.DataTag), hpds_node.WithCredential(node.Token), ) _ = sf.SetHandler(handler) err = sf.Connect() nodeInfo := HpdsMqNode{ MqType: 2, Topic: v.Name, Node: node, EndPoint: sf, } must(err) mqList = append(mqList, nodeInfo) default: ap := hpds_node.NewAccessPoint( v.Name, hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)), hpds_node.WithCredential(node.Token), ) err = ap.Connect() nodeInfo := HpdsMqNode{ MqType: 1, Topic: v.Name, Node: node, EndPoint: ap, } must(err) ap.SetDataTag(v.DataTag) mqList = append(mqList, nodeInfo) } } return mqList, err } func GetMqClient(topic string, mqType uint) *HpdsMqNode { for _, v := range MqList { if v.Topic == topic && v.MqType == mqType { return &v } } return nil } func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte) error { _, err := stream.Write(data) if err != nil { return err } time.Sleep(1000 * time.Millisecond) return nil } func handler(data []byte) (byte, []byte) { req := new(InstructionReq) err := json.Unmarshal(data, req) if err != nil { global.Must(err) } else { switch req.Command { case DatasetRequest: payload := req.Payload.(map[string]interface{}) if int64(payload["fileSize"].(float64)) > 0 { dt, err := model.GetDatasetById(int64(payload["datasetId"].(float64))) global.Must(err) var fileContent []byte if payload["isCompress"].(bool) { decodeBytes, _ := base64.StdEncoding.DecodeString(payload["file"].(string)) fileContent, err = UnCompress(decodeBytes) } else { fileContent = []byte(payload["file"].(string)) } if err != nil { global.Logger.With(zap.String("文件名称", payload["filePath"].(string))).With(zap.Int64("文件大小", int64(payload["fileSize"].(float64)))). Error("错误信息", zap.Error(err)) } fileReader := bytes.NewReader(fileContent) info, err := global.MinioClient.PutObject(context.Background(), global.Cfg.Minio.Bucket, fmt.Sprintf("%s%s", model.GetDatasetScene(dt.CategoryId), payload["filePath"].(string)), fileReader, int64(payload["fileSize"].(float64)), minio.PutObjectOptions{ContentType: "application/octet-stream"}) if err != nil { global.Logger.With(zap.String("文件名称", payload["filePath"].(string))).With(zap.Int64("文件大小", int64(payload["fileSize"].(float64)))). Error("错误信息", zap.Error(err)) } accessUrl := fmt.Sprintf("%s://%s/%s/%s%s", global.Cfg.Minio.Protocol, global.Cfg.Minio.Endpoint, global.Cfg.Minio.Bucket, model.GetDatasetScene(dt.CategoryId), payload["filePath"].(string)) global.Logger.With(zap.String("文件名称", payload["filePath"].(string))).With(zap.String("访问路径", accessUrl)). With(zap.Any("返回数据", info)).Info("文件上传到minio") fm := new(model.FileManager) fm.FileName = payload["fileName"].(string) fm.AccessUrl = accessUrl fm.Scene = fmt.Sprintf("%d", dt.CategoryId) fm.DatasetId = dt.DatasetId fm.FileSize = int64(payload["fileSize"].(float64)) fm.FileMd5 = payload["fileMd5"].(string) fm.Creator = 0 fm.CreateAt = time.Now().Unix() fm.UpdateAt = time.Now().Unix() _, err = global.DB.Insert(fm) global.Must(err) } } } return 0x12, nil }