diff --git a/global/global.go b/global/global.go index 2811928..31a2883 100644 --- a/global/global.go +++ b/global/global.go @@ -16,8 +16,10 @@ import ( "io" "os" "path/filepath" + "strings" "sync" "time" + "unsafe" ) var ( @@ -46,7 +48,7 @@ func InitWatchFolder(folderPath string) { } if !fi.IsDir() { - item, ok := FileHistoryMap[fi.Name()] + item, ok := FileHistoryMap[path] if ok { if !item.TransferStatus { FileList.Add(fi.Name()) @@ -87,15 +89,20 @@ func WatchEvent() { // Rename 重命名 // Chmod 修改权限 if ev.Op&fsnotify.Create == fsnotify.Create { - Logger.Info("创建文件", zap.String("文件名", ev.Name)) - FileHistoryMap[ev.Name] = &store.FileStatus{ - FileName: ev.Name, - FilePath: ev.Name, - CreateTime: time.Now(), - FileMd5: "", - TransferStatus: false, + //判断是否问文件夹 + if IsFile(ev.Name) { + Logger.Info("创建文件", zap.String("文件名", ev.Name)) + FileHistoryMap[ev.Name] = &store.FileStatus{ + FileName: ev.Name, + FilePath: ev.Name, + CreateTime: time.Now(), + FileMd5: "", + TransferStatus: false, + } + FileList.Add(ev.Name) + } else { + _ = Watch.Add(ev.Name) } - FileList.Add(ev.Name) } } case <-t1.C: @@ -114,7 +121,13 @@ func WatchEvent() { } } } - +func IsFile(f string) bool { + fi, e := os.Stat(f) + if e != nil { + return false + } + return !fi.IsDir() +} func PathExists(path string) bool { _, err := os.Stat(path) if err == nil { @@ -157,19 +170,22 @@ func transferData(list *arraylist.List) { defer mu.Unlock() startTime := time.Now() cli := mq.GetMqClient("dataset-request", 1) + fmt.Println("list.Size() =====>>>", list.Size()) for i := 0; i < list.Size(); i++ { - if fn, ok := list.Get(0); ok { - if PathExists(fn.(string)) { + if fn, ok := list.Get(i); ok { + fName := deepCopy(fn.(string)) + if PathExists(fName) { wg.Add(1) - go func() { - fileName := fn.(string) + go func(fn string) { + fileName := fn src := ReadFile(fileName) if src != nil { dstContent := store.Compress(src) item := mq.FileTransferInfo{ - FileName: fileName, - FilePath: FileHistoryMap[fileName].FilePath, + FileName: filepath.Base(fileName), + FilePath: strings.Replace(FileHistoryMap[fileName].FilePath, Cfg.MonitorPath, "", -1), DatasetId: Cfg.DatasetId, + FileSize: len(src), File: string(dstContent), IsCompress: true, FileMd5: FileHistoryMap[fileName].FileMd5, @@ -180,7 +196,7 @@ func transferData(list *arraylist.List) { } s, _ := json.Marshal(payload) list.Remove(0) - err := mq.GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s, Logger) + err := mq.GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s) if err != nil { Logger.With( zap.String("文件名称", fileName), @@ -193,17 +209,19 @@ func transferData(list *arraylist.List) { ).Info("文件传输") compressRatio += float64(len(dstContent)) * 100 / float64(len(src)) } - }() + }(fName) + } else { + list.Remove(0) } } - wg.Wait() - compressTime = time.Since(startTime) - Logger.Info("批量任务完成", - zap.Int("压缩数量", list.Size()), - zap.String("平均压缩率", fmt.Sprintf("%.2f%%", compressRatio/float64(list.Size()))), - zap.Duration("运行时间", compressTime), - ) } + wg.Wait() + compressTime = time.Since(startTime) + Logger.Info("批量任务完成", + zap.Int("压缩数量", list.Size()), + zap.String("平均压缩率", fmt.Sprintf("%.2f%%", compressRatio/float64(list.Size()))), + zap.Duration("运行时间", compressTime), + ) } func FileMD5(filePath string) (string, error) { file, err := os.Open(filePath) @@ -217,3 +235,8 @@ func FileMD5(filePath string) (string, error) { _, _ = io.Copy(hash, file) return hex.EncodeToString(hash.Sum(nil)), nil } +func deepCopy(s string) string { + b := make([]byte, len(s)) + copy(b, s) + return *(*string)(unsafe.Pointer(&b)) +} diff --git a/go.mod b/go.mod index d9ea18b..1a1ade9 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.19 require ( git.hpds.cc/Component/logging v0.0.0-20230106105738-e378e873921b - git.hpds.cc/pavement/hpds_node v0.0.0-20230326152949-a1c0ad2f7052 github.com/emirpasic/gods v1.18.1 github.com/fsnotify/fsnotify v1.4.9 github.com/klauspost/compress v1.16.3 @@ -14,6 +13,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require git.hpds.cc/pavement/hpds_node v0.0.0-20230326152949-a1c0ad2f7052 + require ( git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect git.hpds.cc/Component/network v0.0.0-20230326151855-3c157f531d86 // indirect diff --git a/mq/index.go b/mq/index.go index 05bc90d..9313c9b 100644 --- a/mq/index.go +++ b/mq/index.go @@ -82,8 +82,7 @@ func GetMqClient(topic string, mqType uint) *HpdsMqNode { return nil } -func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte, logger *logging.Logger) error { - logger.With(zap.String("web节点", "发送消息")).Info("数据", zap.String("发送的数据", string(data))) +func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte) error { _, err := stream.Write(data) if err != nil { return err diff --git a/mq/instruction.go b/mq/instruction.go index b7ecc25..be8cade 100644 --- a/mq/instruction.go +++ b/mq/instruction.go @@ -2,7 +2,6 @@ package mq const ( DatasetRequest = iota + 10 - DatasetResponse ) type InstructionReq struct { @@ -11,10 +10,11 @@ type InstructionReq struct { } type FileTransferInfo struct { - FileName string - FilePath string - DatasetId int - File string - IsCompress bool - FileMd5 string + FileName string `json:"fileName"` + FilePath string `json:"filePath"` + DatasetId int `json:"datasetId"` + FileSize int `json:"fileSize"` + File string `json:"file"` + IsCompress bool `json:"isCompress"` + FileMd5 string `json:"fileMd5"` } diff --git a/store/index.go b/store/index.go index c8d4953..5c04e1b 100644 --- a/store/index.go +++ b/store/index.go @@ -78,7 +78,7 @@ func Save(storePath string, list map[string]*FileStatus) { if b, _ := PathExists(storeFile); !b { NewFile(storeFile) } - f, _ := os.OpenFile(storeFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) + f, _ := os.OpenFile(storeFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) defer func() { _ = f.Close() }()