diff --git a/cmd/server.go b/cmd/server.go index e7cec40..a382d24 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -114,7 +114,7 @@ func NewStartCmd() *cobra.Command { ).Error(ctx.Err().Error()) return case errs := <-exitChannel: - store.Save(cfg.TempPath, global.FileHistoryMap) + store.Save(cfg.TempPath, cfg.MonitorPath, global.FileHistoryMap) global.Logger.With( zap.String(cfg.Name, "服务退出"), ).Info(errs.String()) diff --git a/config/config.yaml b/config/config.yaml index d5b9d57..c910c3f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,11 +1,11 @@ name: "folder_monitoring" -monitorPath: "./src/" +monitorPath: "/Users/wangjian/Downloads/数据标注/" tempPath: "./tmp/" -datasetId: 1 +datasetId: 2 dataType: 1 node: - host: 127.0.0.1 - port: 27188 + host: 192.168.53.10 + port: 9188 token: 06d36c6f5705507dae778fdce90d0767 functions: - name: dataset-request diff --git a/global/global.go b/global/global.go index 9dcc77f..1eb8d7e 100644 --- a/global/global.go +++ b/global/global.go @@ -24,11 +24,13 @@ import ( ) var ( - FileList *arraylist.List - Watch *fsnotify.Watcher - FileHistoryMap map[string]*store.FileStatus - Cfg *config.FolderMonitorConfig - Logger *logging.Logger + FileList *arraylist.List + Watch *fsnotify.Watcher + FileHistoryMap map[string]*store.FileStatus + Cfg *config.FolderMonitorConfig + Logger *logging.Logger + maxGoroutinueNum = 5 + GoroutinueChan = make(chan bool, maxGoroutinueNum) //encoderPool *sync.Pool ) @@ -173,11 +175,12 @@ func transferData(list *arraylist.List) { defer mu.Unlock() startTime := time.Now() cli := mq.GetMqClient("dataset-request", 1) - fmt.Println("list.Size() =====>>>", list.Size()) for i := 0; i < list.Size(); i++ { if fn, ok := list.Get(i); ok { + GoroutinueChan <- true fName := deepCopy(fn.(string)) - if PathExists(fName) { + if PathExists(fName) && IsFile(fName) { + wg.Add(1) go func(fn string) { fileName := fn @@ -218,6 +221,7 @@ func transferData(list *arraylist.List) { } else { list.Remove(0) } + <-GoroutinueChan } } wg.Wait() diff --git a/go.mod b/go.mod index 35f82d5..0e0e83d 100644 --- a/go.mod +++ b/go.mod @@ -7,36 +7,35 @@ require ( github.com/emirpasic/gods v1.18.1 github.com/fsnotify/fsnotify v1.4.9 github.com/klauspost/compress v1.16.3 - // github.com/mostynb/zstdpool-syncpool v0.0.12 github.com/spf13/cobra v1.6.1 go.uber.org/zap v1.23.0 gopkg.in/yaml.v3 v3.0.1 ) -require git.hpds.cc/pavement/hpds_node v0.0.0-20230402152619-41414aafa930 +require ( + git.hpds.cc/Component/network v0.0.0-20230405135741-a4ea724bab76 + git.hpds.cc/pavement/hpds_node v0.0.0-20230405153516-9403c4d01e12 +) require ( git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect - git.hpds.cc/Component/network v0.0.0-20230402152327-d10a80765e7c // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/golang/mock v1.6.0 // indirect + github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect - github.com/lucas-clemente/quic-go v0.29.1 // indirect - github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect - github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect github.com/matoous/go-nanoid/v2 v2.0.0 // indirect - github.com/nxadm/tail v1.4.8 // indirect - github.com/onsi/ginkgo v1.16.4 // indirect + github.com/onsi/ginkgo/v2 v2.2.0 // indirect + github.com/quic-go/qtls-go1-19 v0.2.1 // indirect + github.com/quic-go/qtls-go1-20 v0.1.1 // indirect + github.com/quic-go/quic-go v0.33.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect - golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect - golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect - golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect - golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect - golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect - golang.org/x/tools v0.1.10 // indirect - golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + golang.org/x/crypto v0.4.0 // indirect + golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect + golang.org/x/mod v0.6.0 // indirect + golang.org/x/net v0.4.0 // indirect + golang.org/x/sys v0.3.0 // indirect + golang.org/x/tools v0.2.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect - gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect ) diff --git a/mq/index.go b/mq/index.go index 9313c9b..b40a497 100644 --- a/mq/index.go +++ b/mq/index.go @@ -4,6 +4,7 @@ import ( "fmt" "folder_monitoring/config" "git.hpds.cc/Component/logging" + "git.hpds.cc/Component/network/frame" "go.uber.org/zap" "os" "time" @@ -39,7 +40,7 @@ func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *loggin sf := hpds_node.NewStreamFunction( v.Name, hpds_node.WithMqAddr(fmt.Sprintf("%s:%d", node.Host, node.Port)), - hpds_node.WithObserveDataTags(v.DataTag), + hpds_node.WithObserveDataTags(frame.Tag(v.DataTag)), hpds_node.WithCredential(node.Token), ) err = sf.Connect() @@ -65,7 +66,7 @@ func NewMqClient(funcs []config.FuncConfig, node config.HpdsNode, logger *loggin EndPoint: ap, } must(logger, err) - ap.SetDataTag(v.DataTag) + ap.SetDataTag(frame.Tag(v.DataTag)) mqList = append(mqList, nodeInfo) } diff --git a/store/index.go b/store/index.go index 5c04e1b..aa970a1 100644 --- a/store/index.go +++ b/store/index.go @@ -67,13 +67,16 @@ func Load(storePath string, list map[string]*FileStatus) error { return nil } -func Save(storePath string, list map[string]*FileStatus) { +func Save(storePath, monitorPath string, list map[string]*FileStatus) { if b, _ := PathExists(storePath); !b { _ = os.MkdirAll(storePath, os.ModePerm) } - fileName := strings.Replace(storePath, ":\\", "_", -1) - fileName = strings.Replace(storePath, "\\", "_", -1) - fileName = strings.Replace(storePath, "/", "_", -1) + if b, _ := PathExists(monitorPath); !b { + _ = os.MkdirAll(monitorPath, os.ModePerm) + } + fileName := strings.Replace(monitorPath, ":\\", "_", -1) + fileName = strings.Replace(fileName, "\\", "_", -1) + fileName = strings.Replace(fileName, "/", "_", -1) storeFile := path.Join(storePath, fmt.Sprintf("%s.hdb", fileName)) if b, _ := PathExists(storeFile); !b { NewFile(storeFile)