You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
187 lines
3.8 KiB
187 lines
3.8 KiB
package hpds_net_framework |
|
|
|
import ( |
|
"errors" |
|
"git.hpds.cc/Component/logging" |
|
"github.com/google/uuid" |
|
"go.uber.org/zap" |
|
"sync/atomic" |
|
"time" |
|
) |
|
|
|
// FrameNode 帧同步节点 |
|
type FrameNode struct { |
|
// 节点ID |
|
NodeId string |
|
// 网络连接 |
|
Connections map[interface{}]IConnection |
|
// 当前连接数量 |
|
clientSize int64 |
|
// 完成同步数量 |
|
overSize int64 |
|
// 同步周期 |
|
FrameTicker *time.Ticker |
|
// current frame messages |
|
frameData [][]byte |
|
frameId uint32 |
|
allFrame []interface{} |
|
// rand seed |
|
RandSeed int64 |
|
// message channel |
|
onMessage chan []byte |
|
// AddConn |
|
addConnChan chan IConnection |
|
delConnChan chan string |
|
completeChan chan interface{} |
|
closeFlag int64 |
|
logger *logging.Logger |
|
} |
|
|
|
// NewFrameNode return a new FrameNode |
|
func NewFrameNode(logger *logging.Logger) *FrameNode { |
|
return &FrameNode{ |
|
Connections: make(map[interface{}]IConnection), |
|
NodeId: uuid.New().String(), |
|
FrameTicker: time.NewTicker(time.Millisecond * 66), |
|
RandSeed: time.Now().UnixNano(), |
|
onMessage: make(chan []byte), |
|
addConnChan: make(chan IConnection), |
|
delConnChan: make(chan string), |
|
completeChan: make(chan interface{}), |
|
logger: logger, |
|
} |
|
} |
|
|
|
// Serve the node |
|
func (gr *FrameNode) Serve() { |
|
go func() { |
|
defer func() { |
|
for _, conn := range gr.Connections { |
|
conn.SetNode(nil) |
|
} |
|
}() |
|
for { |
|
// 优先管理连接状态 |
|
select { |
|
// add conn |
|
case ic := <-gr.addConnChan: |
|
gr.Connections[ic.GetUuid()] = ic |
|
gr.clientSize++ |
|
// conn leave |
|
case key := <-gr.delConnChan: |
|
delete(gr.Connections, key) |
|
gr.clientSize-- |
|
// sync complete |
|
case <-gr.completeChan: |
|
gr.overSize++ |
|
if gr.overSize >= gr.clientSize/2 { |
|
_ = gr.Destroy() |
|
} |
|
default: |
|
select { |
|
case <-gr.FrameTicker.C: |
|
gr.sendFrame() |
|
case pkg := <-gr.onMessage: |
|
if pkg == nil { |
|
gr.logger.Info("FrameNode stop serve", zap.String("NodeId", gr.NodeId)) |
|
// stop Serve |
|
gr.FrameTicker.Stop() |
|
return |
|
} |
|
gr.frameData = append(gr.frameData, pkg) |
|
} |
|
} |
|
} |
|
}() |
|
} |
|
|
|
func (gr *FrameNode) sendFrame() { |
|
// 没有消息 |
|
if len(gr.frameData) == 0 || gr.clientSize == 0 { |
|
//log.Debug("Server empty frame without data") |
|
return |
|
} |
|
// 打包消息 |
|
frame := ScFrame{ |
|
Frame: gr.frameId, |
|
Protocols: gr.frameData, |
|
} |
|
gr.logger.Debug("send frame to connections", |
|
zap.Int("connection count", len(gr.Connections)), |
|
zap.Int("contains package count", len(gr.frameData)), |
|
) |
|
for _, conn := range gr.Connections { |
|
conn.WriteMsg(&frame) |
|
} |
|
// reset data |
|
gr.frameId++ |
|
gr.frameData = gr.frameData[:0] |
|
gr.allFrame = append(gr.allFrame, gr.frameData) |
|
} |
|
|
|
// OnRawMessage msg |
|
func (gr *FrameNode) OnRawMessage(msg []byte) error { |
|
if msg == nil { |
|
err := errors.New("can't frame nil message") |
|
return err |
|
} |
|
if gr.available() { |
|
gr.onMessage <- msg |
|
} |
|
return errFoo |
|
} |
|
|
|
// OnProtocolMessage interface |
|
func (gr *FrameNode) OnProtocolMessage(interface{}) error { |
|
return nil |
|
} |
|
|
|
// GetAllMessage return chan []interface |
|
func (gr *FrameNode) GetAllMessage() chan []interface{} { |
|
data := make(chan []interface{}, 1) |
|
data <- gr.allFrame |
|
return data |
|
} |
|
|
|
// AddConn conn |
|
func (gr *FrameNode) AddConn(conn IConnection) error { |
|
if gr.available() { |
|
gr.addConnChan <- conn |
|
return nil |
|
} |
|
return errFoo |
|
} |
|
|
|
// DelConn by key |
|
func (gr *FrameNode) DelConn(key string) error { |
|
if gr.available() { |
|
gr.delConnChan <- key |
|
return nil |
|
} |
|
return errFoo |
|
} |
|
|
|
// Complete sync |
|
func (gr *FrameNode) Complete() error { |
|
if gr.available() { |
|
gr.completeChan <- struct{}{} |
|
return nil |
|
} |
|
return errFoo |
|
} |
|
|
|
// Destroy the node |
|
func (gr *FrameNode) Destroy() error { |
|
if gr.available() { |
|
atomic.AddInt64(&gr.closeFlag, 1) |
|
go func() { |
|
gr.onMessage <- nil |
|
}() |
|
return nil |
|
} |
|
return errFoo |
|
} |
|
|
|
func (gr *FrameNode) available() bool { |
|
return atomic.LoadInt64(&gr.closeFlag) == 0 |
|
}
|
|
|