folder_monitoring/global/global.go

252 lines
6.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package global
import (
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"folder_monitoring/config"
"folder_monitoring/mq"
"folder_monitoring/store"
"git.hpds.cc/Component/logging"
"git.hpds.cc/pavement/hpds_node"
"github.com/emirpasic/gods/lists/arraylist"
"github.com/fsnotify/fsnotify"
"go.uber.org/zap"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
"unsafe"
)
var (
FileList *arraylist.List
Watch *fsnotify.Watcher
FileHistoryMap map[string]*store.FileStatus
Cfg *config.FolderMonitorConfig
Logger *logging.Logger
maxGoroutinueNum = 5
GoroutinueChan = make(chan bool, maxGoroutinueNum)
//encoderPool *sync.Pool
)
func init() {
FileHistoryMap = make(map[string]*store.FileStatus)
FileList = arraylist.New()
Watch, _ = fsnotify.NewWatcher()
//encoderPool = syncpool.NewEncoderPool(zstd.WithEncoderConcurrency(1), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
}
func InitWatchFolder(folderPath string) {
err := filepath.Walk(folderPath, func(path string, fi os.FileInfo, err error) error {
if err != nil {
Logger.With(
zap.String("监控的文件夹", path),
).Error(err.Error())
return err
}
if !fi.IsDir() {
item, ok := FileHistoryMap[path]
if ok {
if !item.TransferStatus {
FileList.Add(fi.Name())
}
} else {
mdStr, _ := FileMD5(path)
item = &store.FileStatus{
FileName: fi.Name(),
FilePath: path,
CreateTime: fi.ModTime(),
FileMd5: mdStr,
}
FileHistoryMap[path] = item
FileList.Add(path)
}
} else {
_ = Watch.Add(fi.Name())
}
return nil
})
if err != nil {
Logger.With(
zap.String("文件夹遍历出错", "InitWatchFolder"),
).Error(err.Error())
return
}
_ = Watch.Add(folderPath)
}
func WatchEvent() {
t1 := time.NewTicker(1 * time.Second)
for {
select {
case ev := <-Watch.Events:
{
//判断事件发生的类型如下5种
// Create 创建
// Write 写入
// Remove 删除
// Rename 重命名
// Chmod 修改权限
if ev.Op&fsnotify.Create == fsnotify.Create {
//判断是否问文件夹
if IsFile(ev.Name) {
Logger.Info("创建文件", zap.String("文件名", ev.Name))
FileHistoryMap[ev.Name] = &store.FileStatus{
FileName: ev.Name,
FilePath: ev.Name,
CreateTime: time.Now(),
FileMd5: "",
TransferStatus: false,
}
FileList.Add(ev.Name)
} else {
_ = Watch.Add(ev.Name)
}
}
}
case <-t1.C:
{
if FileList.Size() > 0 {
transferData(FileList)
}
}
case err := <-Watch.Errors:
{
Logger.With(
zap.String("文件夹监控出错", "WatchEvent"),
).Error(err.Error())
return
}
}
}
}
func IsFile(f string) bool {
fi, e := os.Stat(f)
if e != nil {
return false
}
return !fi.IsDir()
}
func PathExists(path string) bool {
_, err := os.Stat(path)
if err == nil {
return true
}
if os.IsNotExist(err) { //如果返回的错误类型使用os.isNotExist()判断为true说明文件或者文件夹不存在
return false
}
return false
}
// ReadFile 读取到file中再利用ioutil将file直接读取到[]byte中, 这是最优
func ReadFile(fn string) []byte {
f, err := os.Open(fn)
if err != nil {
logging.L().Error("Read File", zap.String("File Name", fn), zap.Error(err))
return nil
}
defer func(f *os.File) {
_ = f.Close()
}(f)
fd, err := io.ReadAll(f)
if err != nil {
logging.L().Error("Read File To buff", zap.String("File Name", fn), zap.Error(err))
return nil
}
return fd
}
func transferData(list *arraylist.List) {
var (
mu sync.RWMutex
wg sync.WaitGroup
compressRatio float64 = 0
compressTime time.Duration
)
mu.Lock()
defer mu.Unlock()
startTime := time.Now()
cli := mq.GetMqClient("dataset-request", 1)
for i := 0; i < list.Size(); i++ {
if fn, ok := list.Get(i); ok {
GoroutinueChan <- true
fName := deepCopy(fn.(string))
if PathExists(fName) && IsFile(fName) {
wg.Add(1)
go func(fn string) {
fileName := fn
src := ReadFile(fileName)
if src != nil {
dstContent := store.Compress(src)
item := mq.FileTransferInfo{
FileName: filepath.Base(fileName),
FilePath: strings.Replace(FileHistoryMap[fileName].FilePath, Cfg.MonitorPath, "", -1),
DatasetId: Cfg.DatasetId,
DataType: Cfg.DataType,
FileSize: len(src),
File: base64.StdEncoding.EncodeToString(dstContent),
IsCompress: true,
FileMd5: FileHistoryMap[fileName].FileMd5,
}
payload := mq.InstructionReq{
Command: mq.DatasetRequest,
Payload: item,
}
s, _ := json.Marshal(payload)
list.Remove(0)
err := mq.GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s)
if err != nil {
Logger.With(
zap.String("文件名称", fileName),
zap.String("存储路径", item.FilePath),
).Error("文件传输", zap.Error(err))
list.Add(fileName)
}
FileHistoryMap[fileName].TransferStatus = true
Logger.With(
zap.String("文件名称", fileName),
).Info("文件传输")
compressRatio += float64(len(dstContent)) * 100 / float64(len(src))
}
}(fName)
} else {
list.Remove(0)
}
<-GoroutinueChan
}
}
wg.Wait()
compressTime = time.Since(startTime)
Logger.Info("批量任务完成",
zap.Int("压缩数量", list.Size()),
zap.String("平均压缩率", fmt.Sprintf("%.2f%%", compressRatio/float64(list.Size()))),
zap.Duration("运行时间", compressTime),
)
}
func FileMD5(filePath string) (string, error) {
file, err := os.Open(filePath)
if err != nil {
Logger.With(
zap.String("获取文件MD5错误", filePath),
).Error(err.Error())
return "", err
}
hash := md5.New()
_, _ = io.Copy(hash, file)
return hex.EncodeToString(hash.Sum(nil)), nil
}
func deepCopy(s string) string {
b := make([]byte, len(s))
copy(b, s)
return *(*string)(unsafe.Pointer(&b))
}