初始化代码
This commit is contained in:
parent
c3a82ed70f
commit
882287cac1
|
@ -0,0 +1,131 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"folder_monitoring/config"
|
||||||
|
"folder_monitoring/global"
|
||||||
|
"folder_monitoring/mq"
|
||||||
|
"folder_monitoring/store"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"path/filepath"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ConfigFileFlag string = "./config/config.yaml"
|
||||||
|
NodeName string = "main-node"
|
||||||
|
Mode string = "dev"
|
||||||
|
Path string = "" //监控文件夹
|
||||||
|
)
|
||||||
|
|
||||||
|
func must(err error) {
|
||||||
|
if err != nil {
|
||||||
|
_, _ = fmt.Fprint(os.Stderr, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStartCmd() *cobra.Command {
|
||||||
|
cmd := &cobra.Command{
|
||||||
|
Use: "start",
|
||||||
|
Short: "Start folder monitor",
|
||||||
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
|
var (
|
||||||
|
cfg *config.FolderMonitorConfig
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
must(err)
|
||||||
|
configFileFlag, err := cmd.Flags().GetString("c")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("get local config err: ", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
NodeName, err = cmd.Flags().GetString("n")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("get remote path config err: ", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
Mode, err = cmd.Flags().GetString("m")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("get remote path config err: ", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(configFileFlag) > 1 {
|
||||||
|
cfg, err = config.ParseConfigByFile(configFileFlag)
|
||||||
|
must(err)
|
||||||
|
cfg.MonitorPath, _ = filepath.Abs(cfg.MonitorPath)
|
||||||
|
cfg.TempPath, _ = filepath.Abs(cfg.TempPath)
|
||||||
|
ConfigFileFlag = configFileFlag
|
||||||
|
}
|
||||||
|
global.Cfg = cfg
|
||||||
|
global.Logger = config.LoadLoggerConfig(cfg.Logging)
|
||||||
|
//加载历史文件的传输情况
|
||||||
|
err = store.Load(cfg.TempPath, global.FileHistoryMap)
|
||||||
|
must(err)
|
||||||
|
//创建消息连接点
|
||||||
|
mq.MqList, err = mq.NewMqClient(cfg.Funcs, cfg.Node, global.Logger)
|
||||||
|
must(err)
|
||||||
|
if b, _ := store.PathExists(cfg.MonitorPath); !b {
|
||||||
|
_ = os.MkdirAll(cfg.MonitorPath, os.ModePerm)
|
||||||
|
}
|
||||||
|
//初始化全局
|
||||||
|
global.InitWatchFolder(cfg.MonitorPath)
|
||||||
|
must(err)
|
||||||
|
// 退出channel
|
||||||
|
exitChannel := make(chan os.Signal)
|
||||||
|
defer close(exitChannel)
|
||||||
|
// 退出信号监听
|
||||||
|
go func(c chan os.Signal) {
|
||||||
|
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
}(exitChannel)
|
||||||
|
//开始监控
|
||||||
|
go global.WatchEvent()
|
||||||
|
//go func() {
|
||||||
|
// for {
|
||||||
|
// select {
|
||||||
|
// case ev := <-global.Watch.Events:
|
||||||
|
// if ev.Op&fsnotify.Create == fsnotify.Create {
|
||||||
|
// logging.L().Info("创建文件", zap.String("文件名", ev.Name))
|
||||||
|
// global.FileList.Add(ev.Name)
|
||||||
|
// }
|
||||||
|
// case <-t1.C:
|
||||||
|
// {
|
||||||
|
// if global.FileList.Size() > 0 {
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// case err := <-global.Watch.Errors:
|
||||||
|
// {
|
||||||
|
// log.Println("error : ", err)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//}()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
global.Logger.With(
|
||||||
|
zap.String(cfg.Name, "exit"),
|
||||||
|
).Error(ctx.Err().Error())
|
||||||
|
return
|
||||||
|
case errs := <-exitChannel:
|
||||||
|
store.Save(cfg.TempPath, global.FileHistoryMap)
|
||||||
|
global.Logger.With(
|
||||||
|
zap.String(cfg.Name, "服务退出"),
|
||||||
|
).Info(errs.String())
|
||||||
|
os.Exit(-1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
cmd.Flags().StringVar(&ConfigFileFlag, "c", "./config/config.yaml", "The configuration file path")
|
||||||
|
cmd.Flags().StringVar(&NodeName, "n", "main-node", "The configuration name")
|
||||||
|
cmd.Flags().StringVar(&Mode, "m", "dev", "run mode : dev | test | releases")
|
||||||
|
cmd.Flags().StringVar(&Path, "f", "", "monitor folder path")
|
||||||
|
return cmd
|
||||||
|
}
|
|
@ -0,0 +1,77 @@
|
||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"git.hpds.cc/Component/logging"
|
||||||
|
yaml "gopkg.in/yaml.v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FolderMonitorConfig struct {
|
||||||
|
Name string `yaml:"name" json:"name" toml:"name"`
|
||||||
|
MonitorPath string `json:"monitorPath" yaml:"monitorPath" toml:"monitorPath"`
|
||||||
|
TempPath string `json:"tempPath" yaml:"tempPath" toml:"tempPath"`
|
||||||
|
DatasetId int `yaml:"datasetId" json:"datasetId" toml:"datasetId"`
|
||||||
|
Logging LogOptions `yaml:"logging" json:"logging" tom:"logging"`
|
||||||
|
Node HpdsNode `yaml:"node,omitempty" json:"node,omitempty" toml:"node,omitempty"`
|
||||||
|
Funcs []FuncConfig `yaml:"functions,omitempty" json:"funcs,omitempty" toml:"funcs,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type LogOptions struct {
|
||||||
|
Path string `yaml:"path" json:"path" toml:"path"` // 文件保存地方
|
||||||
|
Prefix string `yaml:"prefix" json:"prefix" toml:"prefix"` // 日志文件前缀
|
||||||
|
ErrorFileSuffix string `yaml:"errorFileSuffix" json:"errorFileSuffix" toml:"errorFileSuffix"` // error日志文件后缀
|
||||||
|
WarnFileSuffix string `yaml:"warnFileSuffix" json:"warnFileSuffix" toml:"warnFileSuffix"` // warn日志文件后缀
|
||||||
|
InfoFileSuffix string `yaml:"infoFileSuffix" json:"infoFileSuffix" toml:"infoFileSuffix"` // info日志文件后缀
|
||||||
|
DebugFileSuffix string `yaml:"debugFileSuffix" json:"debugFileSuffix" toml:"debugFileSuffix"` // debug日志文件后缀
|
||||||
|
Level string `yaml:"level" json:"level" toml:"level"` // 日志等级
|
||||||
|
MaxSize int `yaml:"maxSize" json:"maxSize" toml:"maxSize"` // 日志文件大小(M)
|
||||||
|
MaxBackups int `yaml:"maxBackups" json:"maxBackups" toml:"maxBackups"` // 最多存在多少个切片文件
|
||||||
|
MaxAge int `yaml:"maxAge" json:"maxAge" toml:"maxAge"` // 保存的最大天数
|
||||||
|
Development bool `yaml:"development" json:"development" toml:"development"` // 是否是开发模式
|
||||||
|
}
|
||||||
|
|
||||||
|
type HpdsNode struct {
|
||||||
|
Host string `yaml:"host"`
|
||||||
|
Port int `yaml:"port"`
|
||||||
|
Token string `yaml:"token,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type FuncConfig struct {
|
||||||
|
Name string `yaml:"name"`
|
||||||
|
DataTag uint8 `yaml:"dataTag"`
|
||||||
|
MqType uint `yaml:"mqType"` //消息类型, 发布,1;订阅;2
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseConfigByFile(path string) (cfg *FolderMonitorConfig, err error) {
|
||||||
|
buffer, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return load(buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func load(buf []byte) (cfg *FolderMonitorConfig, err error) {
|
||||||
|
//cViper := viper.New()
|
||||||
|
//cViper.SetConfigType("yaml")
|
||||||
|
cfg = new(FolderMonitorConfig)
|
||||||
|
cfg.Funcs = make([]FuncConfig, 0)
|
||||||
|
err = yaml.Unmarshal(buf, cfg)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func LoadLoggerConfig(opt LogOptions) *logging.Logger {
|
||||||
|
return logging.NewLogger(
|
||||||
|
logging.SetPath(opt.Path),
|
||||||
|
logging.SetPrefix(opt.Prefix),
|
||||||
|
logging.SetDevelopment(opt.Development),
|
||||||
|
logging.SetDebugFileSuffix(opt.DebugFileSuffix),
|
||||||
|
logging.SetWarnFileSuffix(opt.WarnFileSuffix),
|
||||||
|
logging.SetErrorFileSuffix(opt.ErrorFileSuffix),
|
||||||
|
logging.SetInfoFileSuffix(opt.InfoFileSuffix),
|
||||||
|
logging.SetMaxAge(opt.MaxAge),
|
||||||
|
logging.SetMaxBackups(opt.MaxBackups),
|
||||||
|
logging.SetMaxSize(opt.MaxSize),
|
||||||
|
logging.SetLevel(logging.LogLevel["debug"]),
|
||||||
|
)
|
||||||
|
}
|
|
@ -0,0 +1,23 @@
|
||||||
|
name: "folder_monitoring"
|
||||||
|
monitorPath: "./src/"
|
||||||
|
tempPath: "./tmp/"
|
||||||
|
datasetId: 1
|
||||||
|
node:
|
||||||
|
host: 127.0.0.1
|
||||||
|
port: 27188
|
||||||
|
token: 06d36c6f5705507dae778fdce90d0767
|
||||||
|
functions:
|
||||||
|
- name: dataset-request
|
||||||
|
dataTag : 10
|
||||||
|
mqType: 1
|
||||||
|
logging:
|
||||||
|
path: ./logs
|
||||||
|
prefix: folder_monitor
|
||||||
|
errorFileSuffix: error.log
|
||||||
|
warnFileSuffix: warn.log
|
||||||
|
infoFileSuffix: info.log
|
||||||
|
debugFileSuffix: debug.log
|
||||||
|
maxSize: 100
|
||||||
|
maxBackups: 3000
|
||||||
|
maxAge: 30
|
||||||
|
development: true
|
|
@ -0,0 +1,219 @@
|
||||||
|
package global
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"folder_monitoring/config"
|
||||||
|
"folder_monitoring/mq"
|
||||||
|
"folder_monitoring/store"
|
||||||
|
"git.hpds.cc/Component/logging"
|
||||||
|
"git.hpds.cc/pavement/hpds_node"
|
||||||
|
"github.com/emirpasic/gods/lists/arraylist"
|
||||||
|
"github.com/fsnotify/fsnotify"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
FileList *arraylist.List
|
||||||
|
Watch *fsnotify.Watcher
|
||||||
|
FileHistoryMap map[string]*store.FileStatus
|
||||||
|
Cfg *config.FolderMonitorConfig
|
||||||
|
Logger *logging.Logger
|
||||||
|
//encoderPool *sync.Pool
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
FileHistoryMap = make(map[string]*store.FileStatus)
|
||||||
|
FileList = arraylist.New()
|
||||||
|
Watch, _ = fsnotify.NewWatcher()
|
||||||
|
//encoderPool = syncpool.NewEncoderPool(zstd.WithEncoderConcurrency(1), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
|
||||||
|
}
|
||||||
|
|
||||||
|
func InitWatchFolder(folderPath string) {
|
||||||
|
err := filepath.Walk(folderPath, func(path string, fi os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
Logger.With(
|
||||||
|
zap.String("监控的文件夹", path),
|
||||||
|
).Error(err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !fi.IsDir() {
|
||||||
|
item, ok := FileHistoryMap[fi.Name()]
|
||||||
|
if ok {
|
||||||
|
if !item.TransferStatus {
|
||||||
|
FileList.Add(fi.Name())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
mdStr, _ := FileMD5(path)
|
||||||
|
item = &store.FileStatus{
|
||||||
|
FileName: fi.Name(),
|
||||||
|
FilePath: path,
|
||||||
|
CreateTime: fi.ModTime(),
|
||||||
|
FileMd5: mdStr,
|
||||||
|
}
|
||||||
|
FileHistoryMap[path] = item
|
||||||
|
FileList.Add(path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
Logger.With(
|
||||||
|
zap.String("文件夹遍历出错", "InitWatchFolder"),
|
||||||
|
).Error(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = Watch.Add(folderPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func WatchEvent() {
|
||||||
|
t1 := time.NewTicker(1 * time.Second)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ev := <-Watch.Events:
|
||||||
|
{
|
||||||
|
//判断事件发生的类型,如下5种
|
||||||
|
// Create 创建
|
||||||
|
// Write 写入
|
||||||
|
// Remove 删除
|
||||||
|
// Rename 重命名
|
||||||
|
// Chmod 修改权限
|
||||||
|
if ev.Op&fsnotify.Create == fsnotify.Create {
|
||||||
|
Logger.Info("创建文件", zap.String("文件名", ev.Name))
|
||||||
|
FileHistoryMap[ev.Name] = &store.FileStatus{
|
||||||
|
FileName: ev.Name,
|
||||||
|
FilePath: ev.Name,
|
||||||
|
CreateTime: time.Now(),
|
||||||
|
FileMd5: "",
|
||||||
|
TransferStatus: false,
|
||||||
|
}
|
||||||
|
FileList.Add(ev.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-t1.C:
|
||||||
|
{
|
||||||
|
if FileList.Size() > 0 {
|
||||||
|
transferData(FileList)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case err := <-Watch.Errors:
|
||||||
|
{
|
||||||
|
Logger.With(
|
||||||
|
zap.String("文件夹监控出错", "WatchEvent"),
|
||||||
|
).Error(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func PathExists(path string) bool {
|
||||||
|
_, err := os.Stat(path)
|
||||||
|
if err == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if os.IsNotExist(err) { //如果返回的错误类型使用os.isNotExist()判断为true,说明文件或者文件夹不存在
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadFile 读取到file中,再利用ioutil将file直接读取到[]byte中, 这是最优
|
||||||
|
func ReadFile(fn string) []byte {
|
||||||
|
f, err := os.Open(fn)
|
||||||
|
if err != nil {
|
||||||
|
logging.L().Error("Read File", zap.String("File Name", fn), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer func(f *os.File) {
|
||||||
|
_ = f.Close()
|
||||||
|
}(f)
|
||||||
|
|
||||||
|
fd, err := io.ReadAll(f)
|
||||||
|
if err != nil {
|
||||||
|
logging.L().Error("Read File To buff", zap.String("File Name", fn), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fd
|
||||||
|
}
|
||||||
|
|
||||||
|
func transferData(list *arraylist.List) {
|
||||||
|
var (
|
||||||
|
mu sync.RWMutex
|
||||||
|
wg sync.WaitGroup
|
||||||
|
compressRatio float64 = 0
|
||||||
|
compressTime time.Duration
|
||||||
|
)
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
startTime := time.Now()
|
||||||
|
cli := mq.GetMqClient("dataset-request", 1)
|
||||||
|
for i := 0; i < list.Size(); i++ {
|
||||||
|
if fn, ok := list.Get(0); ok {
|
||||||
|
if PathExists(fn.(string)) {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
fileName := fn.(string)
|
||||||
|
src := ReadFile(fileName)
|
||||||
|
if src != nil {
|
||||||
|
dstContent := store.Compress(src)
|
||||||
|
item := mq.FileTransferInfo{
|
||||||
|
FileName: fileName,
|
||||||
|
FilePath: FileHistoryMap[fileName].FilePath,
|
||||||
|
DatasetId: Cfg.DatasetId,
|
||||||
|
File: string(dstContent),
|
||||||
|
IsCompress: true,
|
||||||
|
FileMd5: FileHistoryMap[fileName].FileMd5,
|
||||||
|
}
|
||||||
|
payload := mq.InstructionReq{
|
||||||
|
Command: mq.DatasetRequest,
|
||||||
|
Payload: item,
|
||||||
|
}
|
||||||
|
s, _ := json.Marshal(payload)
|
||||||
|
list.Remove(0)
|
||||||
|
err := mq.GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s, Logger)
|
||||||
|
if err != nil {
|
||||||
|
Logger.With(
|
||||||
|
zap.String("文件名称", fileName),
|
||||||
|
).Error("文件传输", zap.Error(err))
|
||||||
|
list.Add(fileName)
|
||||||
|
}
|
||||||
|
FileHistoryMap[fileName].TransferStatus = true
|
||||||
|
Logger.With(
|
||||||
|
zap.String("文件名称", fileName),
|
||||||
|
).Info("文件传输")
|
||||||
|
compressRatio += float64(len(dstContent)) * 100 / float64(len(src))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
compressTime = time.Since(startTime)
|
||||||
|
Logger.Info("批量任务完成",
|
||||||
|
zap.Int("压缩数量", list.Size()),
|
||||||
|
zap.String("平均压缩率", fmt.Sprintf("%.2f%%", compressRatio/float64(list.Size()))),
|
||||||
|
zap.Duration("运行时间", compressTime),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func FileMD5(filePath string) (string, error) {
|
||||||
|
file, err := os.Open(filePath)
|
||||||
|
if err != nil {
|
||||||
|
Logger.With(
|
||||||
|
zap.String("获取文件MD5错误", filePath),
|
||||||
|
).Error(err.Error())
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
hash := md5.New()
|
||||||
|
_, _ = io.Copy(hash, file)
|
||||||
|
return hex.EncodeToString(hash.Sum(nil)), nil
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
module folder_monitoring
|
||||||
|
|
||||||
|
go 1.19
|
||||||
|
|
||||||
|
require (
|
||||||
|
git.hpds.cc/Component/logging v0.0.0-20230106105738-e378e873921b
|
||||||
|
git.hpds.cc/pavement/hpds_node v0.0.0-20230326152949-a1c0ad2f7052
|
||||||
|
github.com/emirpasic/gods v1.18.1
|
||||||
|
github.com/fsnotify/fsnotify v1.4.9
|
||||||
|
github.com/klauspost/compress v1.16.3
|
||||||
|
// github.com/mostynb/zstdpool-syncpool v0.0.12
|
||||||
|
github.com/spf13/cobra v1.6.1
|
||||||
|
go.uber.org/zap v1.23.0
|
||||||
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect
|
||||||
|
git.hpds.cc/Component/network v0.0.0-20230326151855-3c157f531d86 // indirect
|
||||||
|
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
|
||||||
|
github.com/golang/mock v1.6.0 // indirect
|
||||||
|
github.com/inconshreveable/mousetrap v1.0.1 // indirect
|
||||||
|
github.com/lucas-clemente/quic-go v0.29.1 // indirect
|
||||||
|
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
|
||||||
|
github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect
|
||||||
|
github.com/matoous/go-nanoid/v2 v2.0.0 // indirect
|
||||||
|
github.com/nxadm/tail v1.4.8 // indirect
|
||||||
|
github.com/onsi/ginkgo v1.16.4 // indirect
|
||||||
|
github.com/spf13/pflag v1.0.5 // indirect
|
||||||
|
go.uber.org/atomic v1.7.0 // indirect
|
||||||
|
go.uber.org/multierr v1.6.0 // indirect
|
||||||
|
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
|
||||||
|
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
|
||||||
|
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
|
||||||
|
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
|
||||||
|
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
|
||||||
|
golang.org/x/tools v0.1.10 // indirect
|
||||||
|
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
|
||||||
|
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
||||||
|
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||||
|
)
|
|
@ -0,0 +1,28 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"folder_monitoring/cmd"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
rootCmd = &cobra.Command{
|
||||||
|
Use: "file_monitoring",
|
||||||
|
Long: "file_monitoring is a file monitoring and transfer tool",
|
||||||
|
Version: "v 1.0",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
rootCmd.AddCommand(cmd.NewStartCmd())
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
if err := rootCmd.Execute(); err != nil {
|
||||||
|
_, _ = fmt.Fprint(os.Stderr, err.Error())
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,93 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"folder_monitoring/config"
|
||||||
|
"git.hpds.cc/Component/logging"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.hpds.cc/pavement/hpds_node"
|
||||||
|
)
|
||||||
|
|
||||||
|
var MqList []HpdsMqNode
|
||||||
|
|
||||||
|
type HpdsMqNode struct {
|
||||||
|
MqType uint
|
||||||
|
Topic string
|
||||||
|
Node config.HpdsNode
|
||||||
|
EndPoint interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func must(logger *logging.Logger, err error) {
|
||||||
|
if err != nil {
|
||||||
|
if logger != nil {
|
||||||
|
logger.With(zap.String("web节点", "错误信息")).Error("启动错误", zap.Error(err))
|
||||||
|
} else {
|
||||||
|
_, _ = fmt.Fprint(os.Stderr, err)
|
||||||
|
}
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *logging.Logger) (mqList []HpdsMqNode, err error) {
|
||||||
|
mqList = make([]HpdsMqNode, 0)
|
||||||
|
for _, v := range funcs {
|
||||||
|
switch v.MqType {
|
||||||
|
case 2:
|
||||||
|
sf := hpds_node.NewStreamFunction(
|
||||||
|
v.Name,
|
||||||
|
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
|
||||||
|
hpds_node.WithObserveDataTags(v.DataTag),
|
||||||
|
hpds_node.WithCredential(node.Token),
|
||||||
|
)
|
||||||
|
err = sf.Connect()
|
||||||
|
nodeInfo := HpdsMqNode{
|
||||||
|
MqType: 2,
|
||||||
|
Topic: v.Name,
|
||||||
|
Node: node,
|
||||||
|
EndPoint: sf,
|
||||||
|
}
|
||||||
|
must(logger, err)
|
||||||
|
mqList = append(mqList, nodeInfo)
|
||||||
|
default:
|
||||||
|
ap := hpds_node.NewAccessPoint(
|
||||||
|
v.Name,
|
||||||
|
hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)),
|
||||||
|
hpds_node.WithCredential(node.Token),
|
||||||
|
)
|
||||||
|
err = ap.Connect()
|
||||||
|
nodeInfo := HpdsMqNode{
|
||||||
|
MqType: 1,
|
||||||
|
Topic: v.Name,
|
||||||
|
Node: node,
|
||||||
|
EndPoint: ap,
|
||||||
|
}
|
||||||
|
must(logger, err)
|
||||||
|
ap.SetDataTag(v.DataTag)
|
||||||
|
mqList = append(mqList, nodeInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return mqList, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetMqClient(topic string, mqType uint) *HpdsMqNode {
|
||||||
|
for _, v := range MqList {
|
||||||
|
if v.Topic == topic && v.MqType == mqType {
|
||||||
|
return &v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte, logger *logging.Logger) error {
|
||||||
|
logger.With(zap.String("web节点", "发送消息")).Info("数据", zap.String("发送的数据", string(data)))
|
||||||
|
_, err := stream.Write(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
time.Sleep(1000 * time.Millisecond)
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
const (
|
||||||
|
DatasetRequest = iota + 10
|
||||||
|
DatasetResponse
|
||||||
|
)
|
||||||
|
|
||||||
|
type InstructionReq struct {
|
||||||
|
Command int `json:"command"`
|
||||||
|
Payload interface{} `json:"payload"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type FileTransferInfo struct {
|
||||||
|
FileName string
|
||||||
|
FilePath string
|
||||||
|
DatasetId int
|
||||||
|
File string
|
||||||
|
IsCompress bool
|
||||||
|
FileMd5 string
|
||||||
|
}
|
Binary file not shown.
After Width: | Height: | Size: 12 MiB |
Binary file not shown.
After Width: | Height: | Size: 12 MiB |
|
@ -0,0 +1,116 @@
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FileStatus struct {
|
||||||
|
FileName string `json:"fileName"`
|
||||||
|
FilePath string `json:"filePath"`
|
||||||
|
CreateTime time.Time `json:"createTime"`
|
||||||
|
FileMd5 string `json:"fileMd5"`
|
||||||
|
TransferStatus bool `json:"transferStatus"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// PathExists 判断所给路径文件/文件夹是否存在
|
||||||
|
func PathExists(path string) (bool, error) {
|
||||||
|
_, err := os.Stat(path)
|
||||||
|
if err == nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
//IsNotExist来判断,是不是不存在的错误
|
||||||
|
if os.IsNotExist(err) { //如果返回的错误类型使用os.isNotExist()判断为true,说明文件或者文件夹不存在
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err //如果有错误了,但是不是不存在的错误,所以把这个错误原封不动的返回
|
||||||
|
}
|
||||||
|
|
||||||
|
func Load(storePath string, list map[string]*FileStatus) error {
|
||||||
|
if b, _ := PathExists(storePath); !b {
|
||||||
|
_ = os.MkdirAll(storePath, os.ModePerm)
|
||||||
|
}
|
||||||
|
fileName := strings.Replace(storePath, ":\\", "_", -1)
|
||||||
|
fileName = strings.Replace(storePath, "\\", "_", -1)
|
||||||
|
fileName = strings.Replace(storePath, "/", "_", -1)
|
||||||
|
storeFile := path.Join(storePath, fmt.Sprintf("%s.hdb", fileName))
|
||||||
|
if b, _ := PathExists(storeFile); !b {
|
||||||
|
NewFile(storeFile)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
f, _ := os.OpenFile(storeFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||||
|
defer func(f *os.File) {
|
||||||
|
_ = f.Close()
|
||||||
|
}(f)
|
||||||
|
buff, err := io.ReadAll(f)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if len(buff) > 0 {
|
||||||
|
str, err := UnCompress(buff)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = json.Unmarshal(str, &list)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Save(storePath string, list map[string]*FileStatus) {
|
||||||
|
if b, _ := PathExists(storePath); !b {
|
||||||
|
_ = os.MkdirAll(storePath, os.ModePerm)
|
||||||
|
}
|
||||||
|
fileName := strings.Replace(storePath, ":\\", "_", -1)
|
||||||
|
fileName = strings.Replace(storePath, "\\", "_", -1)
|
||||||
|
fileName = strings.Replace(storePath, "/", "_", -1)
|
||||||
|
storeFile := path.Join(storePath, fmt.Sprintf("%s.hdb", fileName))
|
||||||
|
if b, _ := PathExists(storeFile); !b {
|
||||||
|
NewFile(storeFile)
|
||||||
|
}
|
||||||
|
f, _ := os.OpenFile(storeFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
|
||||||
|
defer func() {
|
||||||
|
_ = f.Close()
|
||||||
|
}()
|
||||||
|
str, _ := json.Marshal(list)
|
||||||
|
c := Compress(str)
|
||||||
|
_, _ = f.Write(c)
|
||||||
|
}
|
||||||
|
func NewFile(fileName string) {
|
||||||
|
f, _ := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
|
||||||
|
defer func(f *os.File) {
|
||||||
|
_ = f.Close()
|
||||||
|
}(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compress 压缩
|
||||||
|
func Compress(src []byte) []byte {
|
||||||
|
encoder, _ := zstd.NewWriter(nil)
|
||||||
|
zstd.WithEncoderConcurrency(3)
|
||||||
|
return encoder.EncodeAll(src, make([]byte, 0, len(src)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func UnCompress(src []byte) ([]byte, error) {
|
||||||
|
d, err := zstd.NewReader(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer d.Close()
|
||||||
|
|
||||||
|
uncompressed, err := d.DecodeAll(src, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return uncompressed, nil
|
||||||
|
}
|
Loading…
Reference in New Issue