修改传输协议中的数据

This commit is contained in:
wangjian 2023-04-02 09:24:37 +08:00
parent 882287cac1
commit d1c3e04298
5 changed files with 59 additions and 36 deletions

View File

@ -16,8 +16,10 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"time" "time"
"unsafe"
) )
var ( var (
@ -46,7 +48,7 @@ func InitWatchFolder(folderPath string) {
} }
if !fi.IsDir() { if !fi.IsDir() {
item, ok := FileHistoryMap[fi.Name()] item, ok := FileHistoryMap[path]
if ok { if ok {
if !item.TransferStatus { if !item.TransferStatus {
FileList.Add(fi.Name()) FileList.Add(fi.Name())
@ -87,6 +89,8 @@ func WatchEvent() {
// Rename 重命名 // Rename 重命名
// Chmod 修改权限 // Chmod 修改权限
if ev.Op&fsnotify.Create == fsnotify.Create { if ev.Op&fsnotify.Create == fsnotify.Create {
//判断是否问文件夹
if IsFile(ev.Name) {
Logger.Info("创建文件", zap.String("文件名", ev.Name)) Logger.Info("创建文件", zap.String("文件名", ev.Name))
FileHistoryMap[ev.Name] = &store.FileStatus{ FileHistoryMap[ev.Name] = &store.FileStatus{
FileName: ev.Name, FileName: ev.Name,
@ -96,6 +100,9 @@ func WatchEvent() {
TransferStatus: false, TransferStatus: false,
} }
FileList.Add(ev.Name) FileList.Add(ev.Name)
} else {
_ = Watch.Add(ev.Name)
}
} }
} }
case <-t1.C: case <-t1.C:
@ -114,7 +121,13 @@ func WatchEvent() {
} }
} }
} }
func IsFile(f string) bool {
fi, e := os.Stat(f)
if e != nil {
return false
}
return !fi.IsDir()
}
func PathExists(path string) bool { func PathExists(path string) bool {
_, err := os.Stat(path) _, err := os.Stat(path)
if err == nil { if err == nil {
@ -157,19 +170,22 @@ func transferData(list *arraylist.List) {
defer mu.Unlock() defer mu.Unlock()
startTime := time.Now() startTime := time.Now()
cli := mq.GetMqClient("dataset-request", 1) cli := mq.GetMqClient("dataset-request", 1)
fmt.Println("list.Size() =====>>>", list.Size())
for i := 0; i < list.Size(); i++ { for i := 0; i < list.Size(); i++ {
if fn, ok := list.Get(0); ok { if fn, ok := list.Get(i); ok {
if PathExists(fn.(string)) { fName := deepCopy(fn.(string))
if PathExists(fName) {
wg.Add(1) wg.Add(1)
go func() { go func(fn string) {
fileName := fn.(string) fileName := fn
src := ReadFile(fileName) src := ReadFile(fileName)
if src != nil { if src != nil {
dstContent := store.Compress(src) dstContent := store.Compress(src)
item := mq.FileTransferInfo{ item := mq.FileTransferInfo{
FileName: fileName, FileName: filepath.Base(fileName),
FilePath: FileHistoryMap[fileName].FilePath, FilePath: strings.Replace(FileHistoryMap[fileName].FilePath, Cfg.MonitorPath, "", -1),
DatasetId: Cfg.DatasetId, DatasetId: Cfg.DatasetId,
FileSize: len(src),
File: string(dstContent), File: string(dstContent),
IsCompress: true, IsCompress: true,
FileMd5: FileHistoryMap[fileName].FileMd5, FileMd5: FileHistoryMap[fileName].FileMd5,
@ -180,7 +196,7 @@ func transferData(list *arraylist.List) {
} }
s, _ := json.Marshal(payload) s, _ := json.Marshal(payload)
list.Remove(0) list.Remove(0)
err := mq.GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s, Logger) err := mq.GenerateAndSendData(cli.EndPoint.(hpds_node.AccessPoint), s)
if err != nil { if err != nil {
Logger.With( Logger.With(
zap.String("文件名称", fileName), zap.String("文件名称", fileName),
@ -193,7 +209,10 @@ func transferData(list *arraylist.List) {
).Info("文件传输") ).Info("文件传输")
compressRatio += float64(len(dstContent)) * 100 / float64(len(src)) compressRatio += float64(len(dstContent)) * 100 / float64(len(src))
} }
}() }(fName)
} else {
list.Remove(0)
}
} }
} }
wg.Wait() wg.Wait()
@ -203,7 +222,6 @@ func transferData(list *arraylist.List) {
zap.String("平均压缩率", fmt.Sprintf("%.2f%%", compressRatio/float64(list.Size()))), zap.String("平均压缩率", fmt.Sprintf("%.2f%%", compressRatio/float64(list.Size()))),
zap.Duration("运行时间", compressTime), zap.Duration("运行时间", compressTime),
) )
}
} }
func FileMD5(filePath string) (string, error) { func FileMD5(filePath string) (string, error) {
file, err := os.Open(filePath) file, err := os.Open(filePath)
@ -217,3 +235,8 @@ func FileMD5(filePath string) (string, error) {
_, _ = io.Copy(hash, file) _, _ = io.Copy(hash, file)
return hex.EncodeToString(hash.Sum(nil)), nil return hex.EncodeToString(hash.Sum(nil)), nil
} }
func deepCopy(s string) string {
b := make([]byte, len(s))
copy(b, s)
return *(*string)(unsafe.Pointer(&b))
}

3
go.mod
View File

@ -4,7 +4,6 @@ go 1.19
require ( require (
git.hpds.cc/Component/logging v0.0.0-20230106105738-e378e873921b git.hpds.cc/Component/logging v0.0.0-20230106105738-e378e873921b
git.hpds.cc/pavement/hpds_node v0.0.0-20230326152949-a1c0ad2f7052
github.com/emirpasic/gods v1.18.1 github.com/emirpasic/gods v1.18.1
github.com/fsnotify/fsnotify v1.4.9 github.com/fsnotify/fsnotify v1.4.9
github.com/klauspost/compress v1.16.3 github.com/klauspost/compress v1.16.3
@ -14,6 +13,8 @@ require (
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
require git.hpds.cc/pavement/hpds_node v0.0.0-20230326152949-a1c0ad2f7052
require ( require (
git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect
git.hpds.cc/Component/network v0.0.0-20230326151855-3c157f531d86 // indirect git.hpds.cc/Component/network v0.0.0-20230326151855-3c157f531d86 // indirect

View File

@ -82,8 +82,7 @@ func GetMqClient(topic string, mqType uint) *HpdsMqNode {
return nil return nil
} }
func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte, logger *logging.Logger) error { func GenerateAndSendData(stream hpds_node.AccessPoint, data []byte) error {
logger.With(zap.String("web节点", "发送消息")).Info("数据", zap.String("发送的数据", string(data)))
_, err := stream.Write(data) _, err := stream.Write(data)
if err != nil { if err != nil {
return err return err

View File

@ -2,7 +2,6 @@ package mq
const ( const (
DatasetRequest = iota + 10 DatasetRequest = iota + 10
DatasetResponse
) )
type InstructionReq struct { type InstructionReq struct {
@ -11,10 +10,11 @@ type InstructionReq struct {
} }
type FileTransferInfo struct { type FileTransferInfo struct {
FileName string FileName string `json:"fileName"`
FilePath string FilePath string `json:"filePath"`
DatasetId int DatasetId int `json:"datasetId"`
File string FileSize int `json:"fileSize"`
IsCompress bool File string `json:"file"`
FileMd5 string IsCompress bool `json:"isCompress"`
FileMd5 string `json:"fileMd5"`
} }

View File

@ -78,7 +78,7 @@ func Save(storePath string, list map[string]*FileStatus) {
if b, _ := PathExists(storeFile); !b { if b, _ := PathExists(storeFile); !b {
NewFile(storeFile) NewFile(storeFile)
} }
f, _ := os.OpenFile(storeFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) f, _ := os.OpenFile(storeFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
defer func() { defer func() {
_ = f.Close() _ = f.Close()
}() }()