package hpds_net_framework import ( "errors" "git.hpds.cc/Component/logging" "github.com/google/uuid" "go.uber.org/zap" "sync/atomic" "time" ) // FrameNode 帧同步节点 type FrameNode struct { // 节点ID NodeId string // 网络连接 Connections map[interface{}]IConnection // 当前连接数量 clientSize int64 // 完成同步数量 overSize int64 // 同步周期 FrameTicker *time.Ticker // current frame messages frameData [][]byte frameId uint32 allFrame []interface{} // rand seed RandSeed int64 // message channel onMessage chan []byte // AddConn addConnChan chan IConnection delConnChan chan string completeChan chan interface{} closeFlag int64 logger *logging.Logger } // NewFrameNode return a new FrameNode func NewFrameNode(logger *logging.Logger) *FrameNode { return &FrameNode{ Connections: make(map[interface{}]IConnection), NodeId: uuid.New().String(), FrameTicker: time.NewTicker(time.Millisecond * 66), RandSeed: time.Now().UnixNano(), onMessage: make(chan []byte), addConnChan: make(chan IConnection), delConnChan: make(chan string), completeChan: make(chan interface{}), logger: logger, } } // Serve the node func (gr *FrameNode) Serve() { go func() { defer func() { for _, conn := range gr.Connections { conn.SetNode(nil) } }() for { // 优先管理连接状态 select { // add conn case ic := <-gr.addConnChan: gr.Connections[ic.GetUuid()] = ic gr.clientSize++ // conn leave case key := <-gr.delConnChan: delete(gr.Connections, key) gr.clientSize-- // sync complete case <-gr.completeChan: gr.overSize++ if gr.overSize >= gr.clientSize/2 { _ = gr.Destroy() } default: select { case <-gr.FrameTicker.C: gr.sendFrame() case pkg := <-gr.onMessage: if pkg == nil { gr.logger.Info("FrameNode stop serve", zap.String("NodeId", gr.NodeId)) // stop Serve gr.FrameTicker.Stop() return } gr.frameData = append(gr.frameData, pkg) } } } }() } func (gr *FrameNode) sendFrame() { // 没有消息 if len(gr.frameData) == 0 || gr.clientSize == 0 { //log.Debug("Server empty frame without data") return } // 打包消息 frame := ScFrame{ Frame: gr.frameId, Protocols: gr.frameData, } gr.logger.Debug("send frame to connections", zap.Int("connection count", len(gr.Connections)), zap.Int("contains package count", len(gr.frameData)), ) for _, conn := range gr.Connections { conn.WriteMsg(&frame) } // reset data gr.frameId++ gr.frameData = gr.frameData[:0] gr.allFrame = append(gr.allFrame, gr.frameData) } // OnRawMessage msg func (gr *FrameNode) OnRawMessage(msg []byte) error { if msg == nil { err := errors.New("can't frame nil message") return err } if gr.available() { gr.onMessage <- msg } return errFoo } // OnProtocolMessage interface func (gr *FrameNode) OnProtocolMessage(interface{}) error { return nil } // GetAllMessage return chan []interface func (gr *FrameNode) GetAllMessage() chan []interface{} { data := make(chan []interface{}, 1) data <- gr.allFrame return data } // AddConn conn func (gr *FrameNode) AddConn(conn IConnection) error { if gr.available() { gr.addConnChan <- conn return nil } return errFoo } // DelConn by key func (gr *FrameNode) DelConn(key string) error { if gr.available() { gr.delConnChan <- key return nil } return errFoo } // Complete sync func (gr *FrameNode) Complete() error { if gr.available() { gr.completeChan <- struct{}{} return nil } return errFoo } // Destroy the node func (gr *FrameNode) Destroy() error { if gr.available() { atomic.AddInt64(&gr.closeFlag, 1) go func() { gr.onMessage <- nil }() return nil } return errFoo } func (gr *FrameNode) available() bool { return atomic.LoadInt64(&gr.closeFlag) == 0 }