package global import ( "crypto/md5" "encoding/base64" "encoding/hex" "encoding/json" "fmt" "folder_monitoring/config" "folder_monitoring/mq" "folder_monitoring/store" "git.hpds.cc/Component/logging" "git.hpds.cc/pavement/hpds_node" "github.com/emirpasic/gods/lists/arraylist" "github.com/fsnotify/fsnotify" "go.uber.org/zap" "io" "os" "path/filepath" "strings" "sync" "time" "unsafe" ) var ( FileList *arraylist.List Watch *fsnotify.Watcher FileHistoryMap map[string]*store.FileStatus Cfg *config.FolderMonitorConfig Logger *logging.Logger maxGoroutinueNum = 5 GoroutinueChan = make(chan bool, maxGoroutinueNum) //encoderPool *sync.Pool ) func init() { FileHistoryMap = make(map[string]*store.FileStatus) FileList = arraylist.New() Watch, _ = fsnotify.NewWatcher() //encoderPool = syncpool.NewEncoderPool(zstd.WithEncoderConcurrency(1), zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) } func InitWatchFolder(folderPath string) { err := filepath.Walk(folderPath, func(path string, fi os.FileInfo, err error) error { if err != nil { Logger.With( zap.String("监控的文件夹", path), ).Error(err.Error()) return err } if !fi.IsDir() { item, ok := FileHistoryMap[path] if ok { if !item.TransferStatus { FileList.Add(fi.Name()) } } else { mdStr, _ := FileMD5(path) item = &store.FileStatus{ FileName: fi.Name(), FilePath: path, CreateTime: fi.ModTime(), FileMd5: mdStr, } FileHistoryMap[path] = item FileList.Add(path) } } else { _ = Watch.Add(fi.Name()) } return nil }) if err != nil { Logger.With( zap.String("文件夹遍历出错", "InitWatchFolder"), ).Error(err.Error()) return } _ = Watch.Add(folderPath) } func WatchEvent() { t1 := time.NewTicker(1 * time.Second) for { select { case ev := <-Watch.Events: { //判断事件发生的类型,如下5种 // Create 创建 // Write 写入 // Remove 删除 // Rename 重命名 // Chmod 修改权限 if ev.Op&fsnotify.Create == fsnotify.Create { //判断是否问文件夹 if IsFile(ev.Name) { Logger.Info("创建文件", zap.String("文件名", ev.Name)) FileHistoryMap[ev.Name] = &store.FileStatus{ FileName: ev.Name, FilePath: ev.Name, CreateTime: time.Now(), FileMd5: "", TransferStatus: false, } FileList.Add(ev.Name) } else { _ = Watch.Add(ev.Name) } } } case <-t1.C: { if FileList.Size() > 0 { transferData(FileList) } } case err := <-Watch.Errors: { Logger.With( zap.String("文件夹监控出错", "WatchEvent"), ).Error(err.Error()) return } } } } func IsFile(f string) bool { fi, e := os.Stat(f) if e != nil { return false } return !fi.IsDir() } func PathExists(path string) bool { _, err := os.Stat(path) if err == nil { return true } if os.IsNotExist(err) { //如果返回的错误类型使用os.isNotExist()判断为true,说明文件或者文件夹不存在 return false } return false } // ReadFile 读取到file中,再利用ioutil将file直接读取到[]byte中, 这是最优 func ReadFile(fn string) []byte { f, err := os.Open(fn) if err != nil { logging.L().Error("Read File", zap.String("File Name", fn), zap.Error(err)) return nil } defer func(f *os.File) { _ = f.Close() }(f) fd, err := io.ReadAll(f) if err != nil { logging.L().Error("Read File To buff", zap.String("File Name", fn), zap.Error(err)) return nil } return fd } func transferData(list *arraylist.List) { var ( mu sync.RWMutex wg sync.WaitGroup compressRatio float64 = 0 compressTime time.Duration ) mu.Lock() defer mu.Unlock() startTime := time.Now() cli := mq.GetMqClient("dataset-request", 1) for i := 0; i < list.Size(); i++ { if fn, ok := list.Get(i); ok { GoroutinueChan <- true fName := deepCopy(fn.(string)) if PathExists(fName) && IsFile(fName) { wg.Add(1) go func(fn string) { fileName := fn src := ReadFile(fileName) if src != nil { dstContent := store.Compress(src) item := mq.FileTransferInfo{ FileName: filepath.Base(fileName), FilePath: strings.Replace(FileHistoryMap[fileName].FilePath, Cfg.MonitorPath, "", -1), DatasetId: Cfg.DatasetId, DataType: Cfg.DataType, FileSize: len(src), File: base64.StdEncoding.EncodeToString(dstContent), IsCompress: true, FileMd5: FileHistoryMap[fileName].FileMd5, } payload := mq.InstructionReq{ Command: mq.DatasetRequest, Payload: item, } s, _ := json.Marshal(payload) list.Remove(0) err := mq.GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s) if err != nil { Logger.With( zap.String("文件名称", fileName), zap.String("存储路径", item.FilePath), ).Error("文件传输", zap.Error(err)) list.Add(fileName) } FileHistoryMap[fileName].TransferStatus = true Logger.With( zap.String("文件名称", fileName), ).Info("文件传输") compressRatio += float64(len(dstContent)) * 100 / float64(len(src)) } }(fName) } else { list.Remove(0) } <-GoroutinueChan } } wg.Wait() compressTime = time.Since(startTime) Logger.Info("批量任务完成", zap.Int("压缩数量", list.Size()), zap.String("平均压缩率", fmt.Sprintf("%.2f%%", compressRatio/float64(list.Size()))), zap.Duration("运行时间", compressTime), ) } func FileMD5(filePath string) (string, error) { file, err := os.Open(filePath) if err != nil { Logger.With( zap.String("获取文件MD5错误", filePath), ).Error(err.Error()) return "", err } hash := md5.New() _, _ = io.Copy(hash, file) return hex.EncodeToString(hash.Sum(nil)), nil } func deepCopy(s string) string { b := make([]byte, len(s)) copy(b, s) return *(*string)(unsafe.Pointer(&b)) }