folder_monitoring/global/global.go

248 lines
5.9 KiB
Go
Raw Normal View History

2023-03-30 17:31:41 +08:00
package global
import (
"crypto/md5"
2023-04-02 23:34:11 +08:00
"encoding/base64"
2023-03-30 17:31:41 +08:00
"encoding/hex"
"encoding/json"
"fmt"
"folder_monitoring/config"
"folder_monitoring/mq"
"folder_monitoring/store"
"git.hpds.cc/Component/logging"
"git.hpds.cc/pavement/hpds_node"
"github.com/emirpasic/gods/lists/arraylist"
"github.com/fsnotify/fsnotify"
"go.uber.org/zap"
"io"
"os"
"path/filepath"
2023-04-02 09:24:37 +08:00
"strings"
2023-03-30 17:31:41 +08:00
"sync"
"time"
2023-04-02 09:24:37 +08:00
"unsafe"
2023-03-30 17:31:41 +08:00
)
var (
FileList *arraylist.List
Watch *fsnotify.Watcher
FileHistoryMap map[string]*store.FileStatus
Cfg *config.FolderMonitorConfig
Logger *logging.Logger
//encoderPool *sync.Pool
)
func init() {
FileHistoryMap = make(map[string]*store.FileStatus)
FileList = arraylist.New()
Watch, _ = fsnotify.NewWatcher()
//encoderPool = syncpool.NewEncoderPool(zstd.WithEncoderConcurrency(1), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
}
func InitWatchFolder(folderPath string) {
err := filepath.Walk(folderPath, func(path string, fi os.FileInfo, err error) error {
if err != nil {
Logger.With(
zap.String("监控的文件夹", path),
).Error(err.Error())
return err
}
if !fi.IsDir() {
2023-04-02 09:24:37 +08:00
item, ok := FileHistoryMap[path]
2023-03-30 17:31:41 +08:00
if ok {
if !item.TransferStatus {
FileList.Add(fi.Name())
}
} else {
mdStr, _ := FileMD5(path)
item = &store.FileStatus{
FileName: fi.Name(),
FilePath: path,
CreateTime: fi.ModTime(),
FileMd5: mdStr,
}
FileHistoryMap[path] = item
FileList.Add(path)
}
2023-04-02 23:34:11 +08:00
} else {
_ = Watch.Add(fi.Name())
2023-03-30 17:31:41 +08:00
}
return nil
})
if err != nil {
Logger.With(
zap.String("文件夹遍历出错", "InitWatchFolder"),
).Error(err.Error())
return
}
_ = Watch.Add(folderPath)
}
func WatchEvent() {
t1 := time.NewTicker(1 * time.Second)
for {
select {
case ev := <-Watch.Events:
{
//判断事件发生的类型如下5种
// Create 创建
// Write 写入
// Remove 删除
// Rename 重命名
// Chmod 修改权限
if ev.Op&fsnotify.Create == fsnotify.Create {
2023-04-02 09:24:37 +08:00
//判断是否问文件夹
if IsFile(ev.Name) {
Logger.Info("创建文件", zap.String("文件名", ev.Name))
FileHistoryMap[ev.Name] = &store.FileStatus{
FileName: ev.Name,
FilePath: ev.Name,
CreateTime: time.Now(),
FileMd5: "",
TransferStatus: false,
}
FileList.Add(ev.Name)
} else {
_ = Watch.Add(ev.Name)
2023-03-30 17:31:41 +08:00
}
}
}
case <-t1.C:
{
if FileList.Size() > 0 {
transferData(FileList)
}
}
case err := <-Watch.Errors:
{
Logger.With(
zap.String("文件夹监控出错", "WatchEvent"),
).Error(err.Error())
return
}
}
}
}
2023-04-02 09:24:37 +08:00
func IsFile(f string) bool {
fi, e := os.Stat(f)
if e != nil {
return false
}
return !fi.IsDir()
}
2023-03-30 17:31:41 +08:00
func PathExists(path string) bool {
_, err := os.Stat(path)
if err == nil {
return true
}
if os.IsNotExist(err) { //如果返回的错误类型使用os.isNotExist()判断为true说明文件或者文件夹不存在
return false
}
return false
}
// ReadFile 读取到file中再利用ioutil将file直接读取到[]byte中, 这是最优
func ReadFile(fn string) []byte {
f, err := os.Open(fn)
if err != nil {
logging.L().Error("Read File", zap.String("File Name", fn), zap.Error(err))
return nil
}
defer func(f *os.File) {
_ = f.Close()
}(f)
fd, err := io.ReadAll(f)
if err != nil {
logging.L().Error("Read File To buff", zap.String("File Name", fn), zap.Error(err))
return nil
}
return fd
}
func transferData(list *arraylist.List) {
var (
mu sync.RWMutex
wg sync.WaitGroup
compressRatio float64 = 0
compressTime time.Duration
)
mu.Lock()
defer mu.Unlock()
startTime := time.Now()
cli := mq.GetMqClient("dataset-request", 1)
2023-04-02 09:24:37 +08:00
fmt.Println("list.Size() =====>>>", list.Size())
2023-03-30 17:31:41 +08:00
for i := 0; i < list.Size(); i++ {
2023-04-02 09:24:37 +08:00
if fn, ok := list.Get(i); ok {
fName := deepCopy(fn.(string))
if PathExists(fName) {
2023-03-30 17:31:41 +08:00
wg.Add(1)
2023-04-02 09:24:37 +08:00
go func(fn string) {
fileName := fn
2023-03-30 17:31:41 +08:00
src := ReadFile(fileName)
if src != nil {
dstContent := store.Compress(src)
item := mq.FileTransferInfo{
2023-04-02 09:24:37 +08:00
FileName: filepath.Base(fileName),
FilePath: strings.Replace(FileHistoryMap[fileName].FilePath, Cfg.MonitorPath, "", -1),
2023-03-30 17:31:41 +08:00
DatasetId: Cfg.DatasetId,
2023-04-04 18:22:01 +08:00
DataType: Cfg.DataType,
2023-04-02 09:24:37 +08:00
FileSize: len(src),
2023-04-02 23:34:11 +08:00
File: base64.StdEncoding.EncodeToString(dstContent),
2023-03-30 17:31:41 +08:00
IsCompress: true,
FileMd5: FileHistoryMap[fileName].FileMd5,
}
payload := mq.InstructionReq{
Command: mq.DatasetRequest,
Payload: item,
}
s, _ := json.Marshal(payload)
list.Remove(0)
2023-04-02 09:24:37 +08:00
err := mq.GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s)
2023-03-30 17:31:41 +08:00
if err != nil {
Logger.With(
zap.String("文件名称", fileName),
2023-04-02 23:34:11 +08:00
zap.String("存储路径", item.FilePath),
2023-03-30 17:31:41 +08:00
).Error("文件传输", zap.Error(err))
list.Add(fileName)
}
FileHistoryMap[fileName].TransferStatus = true
Logger.With(
zap.String("文件名称", fileName),
).Info("文件传输")
compressRatio += float64(len(dstContent)) * 100 / float64(len(src))
}
2023-04-02 09:24:37 +08:00
}(fName)
} else {
list.Remove(0)
2023-03-30 17:31:41 +08:00
}
}
}
2023-04-02 09:24:37 +08:00
wg.Wait()
compressTime = time.Since(startTime)
Logger.Info("批量任务完成",
zap.Int("压缩数量", list.Size()),
zap.String("平均压缩率", fmt.Sprintf("%.2f%%", compressRatio/float64(list.Size()))),
zap.Duration("运行时间", compressTime),
)
2023-03-30 17:31:41 +08:00
}
func FileMD5(filePath string) (string, error) {
file, err := os.Open(filePath)
if err != nil {
Logger.With(
zap.String("获取文件MD5错误", filePath),
).Error(err.Error())
return "", err
}
hash := md5.New()
_, _ = io.Copy(hash, file)
return hex.EncodeToString(hash.Sum(nil)), nil
}
2023-04-02 09:24:37 +08:00
func deepCopy(s string) string {
b := make([]byte, len(s))
copy(b, s)
return *(*string)(unsafe.Pointer(&b))
}