From d8690204874680f0bd5584fa680c439bd4437e5f Mon Sep 17 00:00:00 2001 From: wangjian Date: Wed, 3 Aug 2022 16:55:40 +0800 Subject: [PATCH] init --- base.pb.go | 299 +++++++++++++++++++++++++++++++ base.proto | 17 ++ broadcast.go | 164 +++++++++++++++++ data.go | 54 ++++++ examples/comm/msg/code/msg_id.go | 15 ++ examples/comm/protobuf/test.pb.go | 364 ++++++++++++++++++++++++++++++++++++++ examples/comm/protobuf/test.proto | 23 +++ examples/tcp/server.go | 60 +++++++ frameNode.go | 187 ++++++++++++++++++++ go.mod | 16 ++ message.go | 18 ++ msg.go | 44 +++++ network.go | 40 +++++ proto.go | 177 ++++++++++++++++++ security/aes.go | 106 +++++++++++ security/pass.go | 51 ++++++ tcpConn.go | 257 +++++++++++++++++++++++++++ tcpServer.go | 59 ++++++ 18 files changed, 1951 insertions(+) create mode 100644 base.pb.go create mode 100644 base.proto create mode 100644 broadcast.go create mode 100644 data.go create mode 100644 examples/comm/msg/code/msg_id.go create mode 100644 examples/comm/protobuf/test.pb.go create mode 100644 examples/comm/protobuf/test.proto create mode 100644 examples/tcp/server.go create mode 100644 frameNode.go create mode 100644 go.mod create mode 100644 message.go create mode 100644 msg.go create mode 100644 network.go create mode 100644 proto.go create mode 100644 security/aes.go create mode 100644 security/pass.go create mode 100644 tcpConn.go create mode 100644 tcpServer.go diff --git a/base.pb.go b/base.pb.go new file mode 100644 index 0000000..ca7e12b --- /dev/null +++ b/base.pb.go @@ -0,0 +1,299 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.21.4 +// source: base.proto + +package hpds_net_framework + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// 协议消息 +type Protocol struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Content []byte `protobuf:"bytes,2,opt,name=content,proto3" json:"content,omitempty"` +} + +func (x *Protocol) Reset() { + *x = Protocol{} + if protoimpl.UnsafeEnabled { + mi := &file_base_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Protocol) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Protocol) ProtoMessage() {} + +func (x *Protocol) ProtoReflect() protoreflect.Message { + mi := &file_base_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Protocol.ProtoReflect.Descriptor instead. +func (*Protocol) Descriptor() ([]byte, []int) { + return file_base_proto_rawDescGZIP(), []int{0} +} + +func (x *Protocol) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *Protocol) GetContent() []byte { + if x != nil { + return x.Content + } + return nil +} + +// 打包消息 +type ScProtocolPack struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Pack []*Protocol `protobuf:"bytes,2,rep,name=pack,proto3" json:"pack,omitempty"` +} + +func (x *ScProtocolPack) Reset() { + *x = ScProtocolPack{} + if protoimpl.UnsafeEnabled { + mi := &file_base_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ScProtocolPack) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ScProtocolPack) ProtoMessage() {} + +func (x *ScProtocolPack) ProtoReflect() protoreflect.Message { + mi := &file_base_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ScProtocolPack.ProtoReflect.Descriptor instead. +func (*ScProtocolPack) Descriptor() ([]byte, []int) { + return file_base_proto_rawDescGZIP(), []int{1} +} + +func (x *ScProtocolPack) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ScProtocolPack) GetPack() []*Protocol { + if x != nil { + return x.Pack + } + return nil +} + +// 帧消息 +type ScFrame struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Frame uint32 `protobuf:"varint,1,opt,name=frame,proto3" json:"frame,omitempty"` + Protocols [][]byte `protobuf:"bytes,2,rep,name=protocols,proto3" json:"protocols,omitempty"` +} + +func (x *ScFrame) Reset() { + *x = ScFrame{} + if protoimpl.UnsafeEnabled { + mi := &file_base_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ScFrame) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ScFrame) ProtoMessage() {} + +func (x *ScFrame) ProtoReflect() protoreflect.Message { + mi := &file_base_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ScFrame.ProtoReflect.Descriptor instead. +func (*ScFrame) Descriptor() ([]byte, []int) { + return file_base_proto_rawDescGZIP(), []int{2} +} + +func (x *ScFrame) GetFrame() uint32 { + if x != nil { + return x.Frame + } + return 0 +} + +func (x *ScFrame) GetProtocols() [][]byte { + if x != nil { + return x.Protocols + } + return nil +} + +var File_base_proto protoreflect.FileDescriptor + +var file_base_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x34, 0x0a, 0x08, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, + 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, + 0x6e, 0x74, 0x22, 0x41, 0x0a, 0x10, 0x73, 0x63, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, + 0x6c, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1d, 0x0a, 0x04, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x52, + 0x04, 0x70, 0x61, 0x63, 0x6b, 0x22, 0x3e, 0x0a, 0x08, 0x73, 0x63, 0x5f, 0x66, 0x72, 0x61, 0x6d, + 0x65, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x05, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x63, 0x6f, 0x6c, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x42, 0x16, 0x5a, 0x14, 0x2e, 0x3b, 0x68, 0x70, 0x64, 0x73, 0x5f, + 0x6e, 0x65, 0x74, 0x5f, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x77, 0x6f, 0x72, 0x6b, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_base_proto_rawDescOnce sync.Once + file_base_proto_rawDescData = file_base_proto_rawDesc +) + +func file_base_proto_rawDescGZIP() []byte { + file_base_proto_rawDescOnce.Do(func() { + file_base_proto_rawDescData = protoimpl.X.CompressGZIP(file_base_proto_rawDescData) + }) + return file_base_proto_rawDescData +} + +var file_base_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_base_proto_goTypes = []interface{}{ + (*Protocol)(nil), // 0: protocol + (*ScProtocolPack)(nil), // 1: sc_protocol_pack + (*ScFrame)(nil), // 2: sc_frame +} +var file_base_proto_depIdxs = []int32{ + 0, // 0: sc_protocol_pack.pack:type_name -> protocol + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_base_proto_init() } +func file_base_proto_init() { + if File_base_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_base_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Protocol); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_base_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ScProtocolPack); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_base_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ScFrame); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_base_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_base_proto_goTypes, + DependencyIndexes: file_base_proto_depIdxs, + MessageInfos: file_base_proto_msgTypes, + }.Build() + File_base_proto = out.File + file_base_proto_rawDesc = nil + file_base_proto_goTypes = nil + file_base_proto_depIdxs = nil +} diff --git a/base.proto b/base.proto new file mode 100644 index 0000000..e4fde81 --- /dev/null +++ b/base.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; +option go_package = ".;hpds_net_framework"; +// 协议消息 +message protocol { + uint32 id = 1; + bytes content = 2; +} +// 打包消息 +message sc_protocol_pack { + uint32 id = 1; + repeated protocol pack = 2; +} +// 帧消息 +message sc_frame { + uint32 frame = 1; + repeated bytes protocols = 2; +} \ No newline at end of file diff --git a/broadcast.go b/broadcast.go new file mode 100644 index 0000000..6d16ff5 --- /dev/null +++ b/broadcast.go @@ -0,0 +1,164 @@ +package hpds_net_framework + +import ( + "errors" + "git.hpds.cc/Component/logging" + "github.com/google/uuid" + "go.uber.org/zap" + "runtime/debug" + "sync/atomic" + "time" +) + +// BroadcastNode 广播转发节点 +type BroadcastNode struct { + // 节点ID + NodeId string + // 网络连接 + Connections map[interface{}]IConnection + // 当前连接数量 + clientSize int64 + // message channel + onMessage chan interface{} + recentMessages []interface{} + // AddConn + addConnChan chan IConnection + delConnChan chan string + closeFlag int64 + logger *logging.Logger +} + +// errFoo node closed error +var errFoo = errors.New("node closed") + +// NewBroadcastNode return a new BroadcastNode +func NewBroadcastNode(logger *logging.Logger) *BroadcastNode { + return &BroadcastNode{ + Connections: make(map[interface{}]IConnection), + NodeId: uuid.New().String(), + onMessage: make(chan interface{}), + addConnChan: make(chan IConnection), + delConnChan: make(chan string), + logger: logger, + } +} + +// Serve the node +func (bNode *BroadcastNode) Serve() { + go func() { + defer func() { + for _, conn := range bNode.Connections { + conn.SetNode(nil) + } + }() + for { + // 优先管理连接 + select { + // add conn + case ic := <-bNode.addConnChan: + bNode.Connections[ic.GetUuid()] = ic + bNode.clientSize++ + // conn leave + case key := <-bNode.delConnChan: + delete(bNode.Connections, key) + bNode.clientSize-- + default: + select { + case pkg := <-bNode.onMessage: + if pkg == nil { + bNode.logger.Info("BroadcastNode stop serve", zap.String("nodeId", bNode.NodeId)) + // stop Serve + return + } + bNode.recentMessages = append(bNode.recentMessages, pkg) + // cache recent 100 + recentSize := len(bNode.recentMessages) + if recentSize > 100 { + bNode.recentMessages = bNode.recentMessages[recentSize-100:] + } + bNode.broadcast(pkg) + default: + time.Sleep(time.Millisecond * 50) + } + } + } + }() +} + +func (bNode *BroadcastNode) broadcast(msg interface{}) { + defer func() { + if r := recover(); r != nil { + bNode.logger.Error("write frame error", + zap.Any("frame data", r), + zap.ByteString("stack", debug.Stack()), + ) + } + }() + if bNode.clientSize == 0 { + return + } + bNode.logger.Debug("broadcast", + zap.Int64("amount", bNode.clientSize), + zap.Any("msg", msg), + ) + for _, conn := range bNode.Connections { + conn.WriteMsg(msg) + } + bNode.logger.Debug("broadcast ok") +} + +// OnRawMessage bytes +func (bNode *BroadcastNode) OnRawMessage([]byte) error { return nil } + +// OnProtocolMessage interface +func (bNode *BroadcastNode) OnProtocolMessage(msg interface{}) error { + if bNode.available() { + bNode.onMessage <- msg + } + return errFoo +} + +// GetAllMessage return chan []interface{} +func (bNode *BroadcastNode) GetAllMessage() chan []interface{} { + data := make(chan []interface{}, 1) + data <- bNode.recentMessages + return data +} + +// AddConn by conn +func (bNode *BroadcastNode) AddConn(conn IConnection) error { + if bNode.available() { + bNode.addConnChan <- conn + return nil + } + return errFoo +} + +// DelConn by key +func (bNode *BroadcastNode) DelConn(key string) error { + if bNode.available() { + bNode.delConnChan <- key + return nil + } + return errFoo +} + +// Complete sync +func (bNode *BroadcastNode) Complete() error { + return nil +} + +// Destroy the node +func (bNode *BroadcastNode) Destroy() error { + if bNode.available() { + atomic.AddInt64(&bNode.closeFlag, 1) + go func() { + bNode.onMessage <- nil + }() + } + return errFoo +} + +func (bNode *BroadcastNode) available() bool { + return atomic.LoadInt64(&bNode.closeFlag) == 0 +} diff --git a/data.go b/data.go new file mode 100644 index 0000000..e7e405e --- /dev/null +++ b/data.go @@ -0,0 +1,54 @@ +package hpds_net_framework + +import "C" +import "sync" + +var ( + // Cfg is the config instance + Cfg *Data + once sync.Once +) + +// Data is the config struct +type Data struct { + // 单个连接未处理消息包缓存队列大小 + // 注意:[超过这个大小,包将丢弃,视为当前系统无法处理,默认100] + ConnUndoQueueSize int + // 单个连接未写入消息包队列大小 [超过这个大小,包将丢弃,视为当前系统无法处理,默认为1] + ConnWriteQueueSize int + // 第一个包等待超市时间 (s) [默认5秒,连接上来未读到正确包,断开连接] + FirstPackageTimeout int + // 连接读取超时(s) [默认35秒, 超时等待时间内,请发送任何数据包,如心跳包] + ConnReadTimeout int + // 连接写超时(s) [默认5秒, 超时等待时间内,请发送任何数据包,如心跳包] + ConnWriteTimeout int + // 数据包最大限制,[默认2048] + MaxDataPackageSize int + // ws 最大header,[默认1024] + MaxHeaderLen int +} + +func init() { + Cfg = &Data{ + ConnUndoQueueSize: 100, + ConnWriteQueueSize: 10, + FirstPackageTimeout: 5, + ConnReadTimeout: 35, + ConnWriteTimeout: 5, + MaxDataPackageSize: 4096, + MaxHeaderLen: 1024, + } +} + +// SetConf this before startup server +func SetConf(cfg *Data) { + once.Do(func() { + Cfg = cfg + if C.ConnUndoQueueSize == 0 { + C.ConnUndoQueueSize = 1 + } + if C.ConnWriteQueueSize == 0 { + C.ConnWriteQueueSize = 1 + } + }) +} diff --git a/examples/comm/msg/code/msg_id.go b/examples/comm/msg/code/msg_id.go new file mode 100644 index 0000000..744228a --- /dev/null +++ b/examples/comm/msg/code/msg_id.go @@ -0,0 +1,15 @@ +package code + +const ( + // Hello msg code + Hello = 2001 + + // FrameData msg code + FrameData = 1000000 + // FrameStart msg code + FrameStart = 1000001 + // FrameEnd msg code + FrameEnd = 1000002 + // MoveOp msg code + MoveOp = 3001 +) diff --git a/examples/comm/protobuf/test.pb.go b/examples/comm/protobuf/test.pb.go new file mode 100644 index 0000000..f581651 --- /dev/null +++ b/examples/comm/protobuf/test.pb.go @@ -0,0 +1,364 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0-devel +// protoc v3.14.0 +// source: test.proto + +package protobuf + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// protoc --go_out=. *.proto +type Hello struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Hello string `protobuf:"bytes,1,opt,name=hello,proto3" json:"hello,omitempty"` +} + +func (x *Hello) Reset() { + *x = Hello{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Hello) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Hello) ProtoMessage() {} + +func (x *Hello) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Hello.ProtoReflect.Descriptor instead. +func (*Hello) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{0} +} + +func (x *Hello) GetHello() string { + if x != nil { + return x.Hello + } + return "" +} + +type CsStartFrame struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *CsStartFrame) Reset() { + *x = CsStartFrame{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CsStartFrame) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CsStartFrame) ProtoMessage() {} + +func (x *CsStartFrame) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CsStartFrame.ProtoReflect.Descriptor instead. +func (*CsStartFrame) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{1} +} + +type CsEndFrame struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *CsEndFrame) Reset() { + *x = CsEndFrame{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CsEndFrame) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CsEndFrame) ProtoMessage() {} + +func (x *CsEndFrame) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CsEndFrame.ProtoReflect.Descriptor instead. +func (*CsEndFrame) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{2} +} + +// 移动 +type CsMove struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FromX float32 `protobuf:"fixed32,1,opt,name=fromX,proto3" json:"fromX,omitempty"` + FromY float32 `protobuf:"fixed32,2,opt,name=fromY,proto3" json:"fromY,omitempty"` + FromZ float32 `protobuf:"fixed32,3,opt,name=fromZ,proto3" json:"fromZ,omitempty"` + ToX float32 `protobuf:"fixed32,4,opt,name=toX,proto3" json:"toX,omitempty"` + ToY float32 `protobuf:"fixed32,5,opt,name=toY,proto3" json:"toY,omitempty"` + ToZ float32 `protobuf:"fixed32,6,opt,name=toZ,proto3" json:"toZ,omitempty"` + Speed float32 `protobuf:"fixed32,7,opt,name=speed,proto3" json:"speed,omitempty"` +} + +func (x *CsMove) Reset() { + *x = CsMove{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CsMove) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CsMove) ProtoMessage() {} + +func (x *CsMove) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CsMove.ProtoReflect.Descriptor instead. +func (*CsMove) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{3} +} + +func (x *CsMove) GetFromX() float32 { + if x != nil { + return x.FromX + } + return 0 +} + +func (x *CsMove) GetFromY() float32 { + if x != nil { + return x.FromY + } + return 0 +} + +func (x *CsMove) GetFromZ() float32 { + if x != nil { + return x.FromZ + } + return 0 +} + +func (x *CsMove) GetToX() float32 { + if x != nil { + return x.ToX + } + return 0 +} + +func (x *CsMove) GetToY() float32 { + if x != nil { + return x.ToY + } + return 0 +} + +func (x *CsMove) GetToZ() float32 { + if x != nil { + return x.ToZ + } + return 0 +} + +func (x *CsMove) GetSpeed() float32 { + if x != nil { + return x.Speed + } + return 0 +} + +var File_test_proto protoreflect.FileDescriptor + +var file_test_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1d, 0x0a, 0x05, + 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x14, 0x0a, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x22, 0x10, 0x0a, 0x0e, 0x63, + 0x73, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x22, 0x0e, 0x0a, + 0x0c, 0x63, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x22, 0x97, 0x01, + 0x0a, 0x07, 0x63, 0x73, 0x5f, 0x6d, 0x6f, 0x76, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x72, 0x6f, + 0x6d, 0x58, 0x18, 0x01, 0x20, 0x01, 0x28, 0x02, 0x52, 0x05, 0x66, 0x72, 0x6f, 0x6d, 0x58, 0x12, + 0x14, 0x0a, 0x05, 0x66, 0x72, 0x6f, 0x6d, 0x59, 0x18, 0x02, 0x20, 0x01, 0x28, 0x02, 0x52, 0x05, + 0x66, 0x72, 0x6f, 0x6d, 0x59, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x72, 0x6f, 0x6d, 0x5a, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x02, 0x52, 0x05, 0x66, 0x72, 0x6f, 0x6d, 0x5a, 0x12, 0x10, 0x0a, 0x03, 0x74, + 0x6f, 0x58, 0x18, 0x04, 0x20, 0x01, 0x28, 0x02, 0x52, 0x03, 0x74, 0x6f, 0x58, 0x12, 0x10, 0x0a, + 0x03, 0x74, 0x6f, 0x59, 0x18, 0x05, 0x20, 0x01, 0x28, 0x02, 0x52, 0x03, 0x74, 0x6f, 0x59, 0x12, + 0x10, 0x0a, 0x03, 0x74, 0x6f, 0x5a, 0x18, 0x06, 0x20, 0x01, 0x28, 0x02, 0x52, 0x03, 0x74, 0x6f, + 0x5a, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x70, 0x65, 0x65, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x02, + 0x52, 0x05, 0x73, 0x70, 0x65, 0x65, 0x64, 0x42, 0x0c, 0x5a, 0x0a, 0x2e, 0x3b, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_test_proto_rawDescOnce sync.Once + file_test_proto_rawDescData = file_test_proto_rawDesc +) + +func file_test_proto_rawDescGZIP() []byte { + file_test_proto_rawDescOnce.Do(func() { + file_test_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_proto_rawDescData) + }) + return file_test_proto_rawDescData +} + +var file_test_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_test_proto_goTypes = []interface{}{ + (*Hello)(nil), // 0: hello + (*CsStartFrame)(nil), // 1: cs_start_frame + (*CsEndFrame)(nil), // 2: cs_end_frame + (*CsMove)(nil), // 3: cs_move +} +var file_test_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_test_proto_init() } +func file_test_proto_init() { + if File_test_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_test_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Hello); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_test_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CsStartFrame); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_test_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CsEndFrame); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_test_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CsMove); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_test_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_test_proto_goTypes, + DependencyIndexes: file_test_proto_depIdxs, + MessageInfos: file_test_proto_msgTypes, + }.Build() + File_test_proto = out.File + file_test_proto_rawDesc = nil + file_test_proto_goTypes = nil + file_test_proto_depIdxs = nil +} diff --git a/examples/comm/protobuf/test.proto b/examples/comm/protobuf/test.proto new file mode 100644 index 0000000..9d0644b --- /dev/null +++ b/examples/comm/protobuf/test.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; +option go_package = ".;protobuf"; + +// protoc --go_out=. *.proto +message hello { + string hello = 1; +} + +message cs_start_frame { +} + +message cs_end_frame { +} +// 移动 +message cs_move { + float fromX = 1; + float fromY = 2; + float fromZ = 3; + float toX = 4; + float toY = 5; + float toZ = 6; + float speed = 7; +} \ No newline at end of file diff --git a/examples/tcp/server.go b/examples/tcp/server.go new file mode 100644 index 0000000..3e8bc5a --- /dev/null +++ b/examples/tcp/server.go @@ -0,0 +1,60 @@ +package main + +import ( + "git.hpds.cc/Component/hpds_net_framework" + "git.hpds.cc/Component/hpds_net_framework/examples/comm/msg/code" + "git.hpds.cc/Component/hpds_net_framework/examples/comm/protobuf" + "git.hpds.cc/Component/hpds_net_framework/security" + "git.hpds.cc/Component/logging" + "go.uber.org/zap" +) + +var ( + logger *logging.Logger + processor *hpds_net_framework.PbProcessor +) + +func init() { + logger = LoadLoggerConfig() + passwd := security.RandLittlePassword() + cipher := security.NewAESCipher(passwd) + processor = hpds_net_framework.NewPbProcessor(logger) + // add encrypt cipher for processor + processor.SetEncryptor(cipher) + // 注册消息,以及回调处理 + processor.RegisterHandler(code.Hello, &protobuf.Hello{}, func(args ...interface{}) { + msg := args[hpds_net_framework.Msg].(*protobuf.Hello) + logger.Info("Message => from client", zap.String("content", msg.Hello)) + conn := args[hpds_net_framework.Conn].(hpds_net_framework.IConnection) + conn.WriteMsg(msg) + }) +} + +func main() { + // run server + if s, err := hpds_net_framework.NewTcpServer("localhost:2021", processor, logger); err != nil { + panic(err) + } else { + e := s.Run() + if e != nil { + logger.Fatal("Fatal", zap.Error(e)) + } + } +} + +// LoadLoggerConfig 加载日志配置 +func LoadLoggerConfig() *logging.Logger { + return logging.NewLogger( + logging.SetPath("./log/"), + logging.SetPrefix(""), + logging.SetDevelopment(true), + logging.SetDebugFileSuffix(""), + logging.SetWarnFileSuffix(""), + logging.SetErrorFileSuffix(""), + logging.SetInfoFileSuffix(""), + logging.SetMaxAge(30), + logging.SetMaxBackups(100), + logging.SetMaxSize(100), + logging.SetLevel(logging.LogLevel["debug"]), + ) +} diff --git a/frameNode.go b/frameNode.go new file mode 100644 index 0000000..507a0a4 --- /dev/null +++ b/frameNode.go @@ -0,0 +1,187 @@ +package hpds_net_framework + +import ( + "errors" + "git.hpds.cc/Component/logging" + "github.com/google/uuid" + "go.uber.org/zap" + "sync/atomic" + "time" +) + +// FrameNode 帧同步节点 +type FrameNode struct { + // 节点ID + NodeId string + // 网络连接 + Connections map[interface{}]IConnection + // 当前连接数量 + clientSize int64 + // 完成同步数量 + overSize int64 + // 同步周期 + FrameTicker *time.Ticker + // current frame messages + frameData [][]byte + frameId uint32 + allFrame []interface{} + // rand seed + RandSeed int64 + // message channel + onMessage chan []byte + // AddConn + addConnChan chan IConnection + delConnChan chan string + completeChan chan interface{} + closeFlag int64 + logger *logging.Logger +} + +// NewFrameNode return a new FrameNode +func NewFrameNode(logger *logging.Logger) *FrameNode { + return &FrameNode{ + Connections: make(map[interface{}]IConnection), + NodeId: uuid.New().String(), + FrameTicker: time.NewTicker(time.Millisecond * 66), + RandSeed: time.Now().UnixNano(), + onMessage: make(chan []byte), + addConnChan: make(chan IConnection), + delConnChan: make(chan string), + completeChan: make(chan interface{}), + logger: logger, + } +} + +// Serve the node +func (gr *FrameNode) Serve() { + go func() { + defer func() { + for _, conn := range gr.Connections { + conn.SetNode(nil) + } + }() + for { + // 优先管理连接状态 + select { + // add conn + case ic := <-gr.addConnChan: + gr.Connections[ic.GetUuid()] = ic + gr.clientSize++ + // conn leave + case key := <-gr.delConnChan: + delete(gr.Connections, key) + gr.clientSize-- + // sync complete + case <-gr.completeChan: + gr.overSize++ + if gr.overSize >= gr.clientSize/2 { + _ = gr.Destroy() + } + default: + select { + case <-gr.FrameTicker.C: + gr.sendFrame() + case pkg := <-gr.onMessage: + if pkg == nil { + gr.logger.Info("FrameNode stop serve", zap.String("NodeId", gr.NodeId)) + // stop Serve + gr.FrameTicker.Stop() + return + } + gr.frameData = append(gr.frameData, pkg) + } + } + } + }() +} + +func (gr *FrameNode) sendFrame() { + // 没有消息 + if len(gr.frameData) == 0 || gr.clientSize == 0 { + //log.Debug("Server empty frame without data") + return + } + // 打包消息 + frame := ScFrame{ + Frame: gr.frameId, + Protocols: gr.frameData, + } + gr.logger.Debug("send frame to connections", + zap.Int("connection count", len(gr.Connections)), + zap.Int("contains package count", len(gr.frameData)), + ) + for _, conn := range gr.Connections { + conn.WriteMsg(&frame) + } + // reset data + gr.frameId++ + gr.frameData = gr.frameData[:0] + gr.allFrame = append(gr.allFrame, gr.frameData) +} + +// OnRawMessage msg +func (gr *FrameNode) OnRawMessage(msg []byte) error { + if msg == nil { + err := errors.New("can't frame nil message") + return err + } + if gr.available() { + gr.onMessage <- msg + } + return errFoo +} + +// OnProtocolMessage interface +func (gr *FrameNode) OnProtocolMessage(interface{}) error { + return nil +} + +// GetAllMessage return chan []interface +func (gr *FrameNode) GetAllMessage() chan []interface{} { + data := make(chan []interface{}, 1) + data <- gr.allFrame + return data +} + +// AddConn conn +func (gr *FrameNode) AddConn(conn IConnection) error { + if gr.available() { + gr.addConnChan <- conn + return nil + } + return errFoo +} + +// DelConn by key +func (gr *FrameNode) DelConn(key string) error { + if gr.available() { + gr.delConnChan <- key + return nil + } + return errFoo +} + +// Complete sync +func (gr *FrameNode) Complete() error { + if gr.available() { + gr.completeChan <- struct{}{} + return nil + } + return errFoo +} + +// Destroy the node +func (gr *FrameNode) Destroy() error { + if gr.available() { + atomic.AddInt64(&gr.closeFlag, 1) + go func() { + gr.onMessage <- nil + }() + return nil + } + return errFoo +} + +func (gr *FrameNode) available() bool { + return atomic.LoadInt64(&gr.closeFlag) == 0 +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..838c68e --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module git.hpds.cc/Component/hpds_net_framework + +go 1.18 + +require ( + git.hpds.cc/Component/logging v0.0.0-20220802072911-f95dfe16666a + github.com/google/uuid v1.3.0 + go.uber.org/zap v1.21.0 + google.golang.org/protobuf v1.28.1 +) + +require ( + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect +) diff --git a/message.go b/message.go new file mode 100644 index 0000000..a967d66 --- /dev/null +++ b/message.go @@ -0,0 +1,18 @@ +package hpds_net_framework + +// Encryptor interface +type Encryptor interface { + Encode(bs []byte) []byte + Decode(bs []byte) []byte +} + +// Processor interface +type Processor interface { + SetBigEndian() + GetBigEndian() bool + SetEncryptor(enc Encryptor) + OnReceivedPackage(interface{}, []byte) error + WrapMsg(interface{}) ([]byte, error) + WrapIdMsg(id uint32, data interface{}) ([]byte, error) + RegisterHandler(id int, entity interface{}, handle func(args ...interface{})) +} diff --git a/msg.go b/msg.go new file mode 100644 index 0000000..10e9722 --- /dev/null +++ b/msg.go @@ -0,0 +1,44 @@ +package hpds_net_framework + +import ( + "git.hpds.cc/Component/logging" + "go.uber.org/zap" + "reflect" + "runtime/debug" + "time" +) + +// 回调传参常量 +const ( + Msg = iota + Conn + Raw +) + +// 消息信息 +type msgInfo struct { + msgId int + msgType reflect.Type + msgCallback func(args ...interface{}) +} + +// 执行消息回调 +func execute(mInfo msgInfo, msg interface{}, writer interface{}, body []byte, id uint32) { + defer func() { + if r := recover(); r != nil { + logging.L().Error("panic at msg", + zap.Uint32("id", id), + zap.Any("recover", r), + zap.ByteString("stack", debug.Stack()), + ) + } + }() + begin := time.Now().UnixNano() / int64(time.Millisecond) + mInfo.msgCallback(msg, writer, body) + costs := time.Now().UnixNano()/int64(time.Millisecond) - begin + logging.L().Debug("execute logic", + zap.Int("msgId", mInfo.msgId), + zap.Int64("costs", costs), + zap.Any("msgType", mInfo.msgType), + ) +} diff --git a/network.go b/network.go new file mode 100644 index 0000000..0b68dfc --- /dev/null +++ b/network.go @@ -0,0 +1,40 @@ +package hpds_net_framework + +import ( + "net" +) + +// Server interface +type Server interface { + Run() error + Handle(conn net.Conn) +} + +// INode 网络同步节点,如消息节点,聊天室节点 +type INode interface { + AddConn(IConnection) error + DelConn(string) error + Serve() + OnRawMessage([]byte) error + OnProtocolMessage(interface{}) error + GetAllMessage() chan []interface{} + Destroy() error + Complete() error +} + +// IConnection 网络连接 +type IConnection interface { + GetUuid() string + ReadMsg() + WriteMsg(message interface{}) + Close() error + AfterClose(func()) + //SetData 设置自定义数据 + SetData(interface{}) + GetData() interface{} + //SetNode 设置节点 + SetNode(INode) + GetNode() INode + //IsClosed 是否关闭 + IsClosed() bool +} diff --git a/proto.go b/proto.go new file mode 100644 index 0000000..aef7c42 --- /dev/null +++ b/proto.go @@ -0,0 +1,177 @@ +package hpds_net_framework + +import ( + "encoding/binary" + "fmt" + "git.hpds.cc/Component/logging" + "github.com/golang/protobuf/proto" + "go.uber.org/zap" + "reflect" +) + +// PbProcessor one of Processor implement protoc --go_out=. *.proto +type PbProcessor struct { + bigEndian bool + enc Encryptor + msgTypes map[reflect.Type]int + handlers map[int]msgInfo + logger *logging.Logger +} + +// NewPbProcessor return PB processor +func NewPbProcessor(logger *logging.Logger) *PbProcessor { + pb := PbProcessor{ + msgTypes: make(map[reflect.Type]int), + handlers: make(map[int]msgInfo), + logger: logger, + } + return &pb +} + +// OnReceivedPackage 收到完整数据包, 返回解包错误 +func (pbf *PbProcessor) OnReceivedPackage(writer interface{}, body []byte) error { + // 解密 + if pbf.enc != nil { + //log.Debug("before decode:: %v", body) + body = pbf.enc.Decode(body) + //log.Debug("after decode:: %v", body) + } + // 解码 + var pack Protocol + if err := proto.UnmarshalMerge(body, &pack); err != nil { + pbf.logger.Error("Can't unmarshal pack body to protocolMsg", + zap.ByteString("body", body), + zap.Error(err), + ) + return err + } + info, ok := pbf.handlers[int(pack.Id)] + if !ok { + pbf.logger.Error("Not register msg id", + zap.Uint32("pack Id", pack.Id), + ) + // handler not found, not a dead err + return nil + } + msg := reflect.New(info.msgType.Elem()).Interface() + err := proto.UnmarshalMerge(pack.Content, msg.(proto.Message)) + if err != nil { + pbf.logger.Error("Unmarshal Merge pack contents error", + zap.Uint32("pack id", pack.Id), + zap.Error(err), + ) + return err + } + // 执行逻辑 + execute(info, msg, writer, body, pack.Id) + return nil +} + +// WrapMsg format the interface message to []byte +func (pbf *PbProcessor) WrapMsg(message interface{}) ([]byte, error) { + pbf.logger.Debug("Protobuf processor wrap for write", + zap.Any("reflect.TypeOf", reflect.TypeOf(message)), + ) + data, err := proto.Marshal(message.(proto.Message)) + if err != nil { + return nil, err + } + tp := reflect.TypeOf(message) + id, ok := pbf.msgTypes[tp] + if !ok { + return nil, fmt.Errorf("not register %v", tp) + } + protocol := Protocol{ + Id: uint32(id), + Content: data, + } + data, err = proto.Marshal(&protocol) + if err != nil { + return nil, err + } + if pbf.enc != nil { + //log.Debug("before encode:: %v", data) + data = pbf.enc.Encode(data) + //log.Debug("after encode:: %v", data) + } + // head + head := make([]byte, 2) + if pbf.bigEndian { + binary.BigEndian.PutUint16(head, uint16(len(data))) + } else { + binary.LittleEndian.PutUint16(head, uint16(len(data))) + } + pkg := append(head, data...) + return pkg, nil +} + +// WrapIdMsg format the interface message to []byte with id +func (pbf *PbProcessor) WrapIdMsg(id uint32, message interface{}) ([]byte, error) { + pbf.logger.Debug("Protobuf processor wrap for write", + zap.Any("reflect.TypeOf", reflect.TypeOf(message)), + ) + data, err := proto.Marshal(message.(proto.Message)) + if err != nil { + return nil, err + } + protocol := Protocol{ + Id: id, + Content: data, + } + data, err = proto.Marshal(&protocol) + if err != nil { + return nil, err + } + if pbf.enc != nil { + //log.Debug("before encode:: %v", data) + data = pbf.enc.Encode(data) + //log.Debug("after encode:: %v", data) + } + // head + head := make([]byte, 2) + if pbf.bigEndian { + binary.BigEndian.PutUint16(head, uint16(len(data))) + } else { + binary.LittleEndian.PutUint16(head, uint16(len(data))) + } + pkg := append(head, data...) + return pkg, nil +} + +// WrapMsgNoHeader without header length +func (pbf *PbProcessor) WrapMsgNoHeader(message interface{}) ([]byte, error) { + data, err := pbf.WrapMsg(message) + if err != nil { + return nil, err + } + return data[2:], nil +} + +// RegisterHandler for logic +func (pbf *PbProcessor) RegisterHandler(id int, entity interface{}, handle func(args ...interface{})) { + if _, ok := pbf.handlers[id]; ok { + pbf.logger.Error("Already register handler", zap.Int("ID", id)) + } else { + pbf.handlers[id] = msgInfo{ + msgId: id, + msgType: reflect.TypeOf(entity), + msgCallback: handle, + } + pbf.msgTypes[reflect.TypeOf(entity)] = id + } +} + +// SetBigEndian for order +func (pbf *PbProcessor) SetBigEndian() { + pbf.bigEndian = true +} + +// GetBigEndian of the order +func (pbf *PbProcessor) GetBigEndian() bool { + return pbf.bigEndian +} + +// SetEncryptor for processor +func (pbf *PbProcessor) SetEncryptor(enc Encryptor) { + pbf.enc = enc +} diff --git a/security/aes.go b/security/aes.go new file mode 100644 index 0000000..b70ba07 --- /dev/null +++ b/security/aes.go @@ -0,0 +1,106 @@ +package security + +import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "errors" +) + +// AESCipher one of Encryptor implement +type AESCipher struct { + key []byte + iv []byte +} + +// NewAESCipher return a AESCipher +func NewAESCipher(key string) *AESCipher { + size := len(key) / 8 + if size < 2 { + //log.Error("incorrect key, need 16(aes-128), 24(aes-192), 32(aes-256) length string") + return nil + } else if size > 4 { + size = 4 + } + key = key[:size*8] + return &AESCipher{key: []byte(key), iv: []byte(key[:16])} +} + +// Decode src +func (cipher *AESCipher) Decode(src []byte) []byte { + encrypt, err := aesDeCrypt(src, cipher.key, cipher.iv) + if err != nil { + return src + } + return encrypt +} + +// Encode src +func (cipher *AESCipher) Encode(src []byte) []byte { + encrypt, err := aesEcrypt(src, cipher.key, cipher.iv) + if err != nil { + //log.Error("Aes Encode error %s", err) + return src + } + return encrypt +} + +//PKCS7 填充模式 +func pKCS7Padding(ciphertext []byte, blockSize int) []byte { + padding := blockSize - len(ciphertext)%blockSize + //Repeat()函数的功能是把切片[]byte{byte(padding)}复制padding个,然后合并成新的字节切片返回 + padText := bytes.Repeat([]byte{byte(padding)}, padding) + return append(ciphertext, padText...) +} + +//填充的反向操作,删除填充字符串 +func pKCS7UnPadding(origData []byte) ([]byte, error) { + //获取数据长度 + length := len(origData) + if length == 0 { + return nil, errors.New("pKCS7UnPadding error") + } + //获取填充字符串长度 + unpadding := int(origData[length-1]) + //截取切片,删除填充字节,并且返回明文 + return origData[:(length - unpadding)], nil +} + +//实现加密 +func aesEcrypt(origData []byte, key, iv []byte) ([]byte, error) { + //创建加密算法实例 + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + //获取块的大小 + blockSize := block.BlockSize() + //对数据进行填充,让数据长度满足需求 + origData = pKCS7Padding(origData, blockSize) + //采用AES加密方法中CBC加密模式 + blocMode := cipher.NewCBCEncrypter(block, iv) + crypted := make([]byte, len(origData)) + //执行加密 + blocMode.CryptBlocks(crypted, origData) + return crypted, nil +} + +//实现解密 +func aesDeCrypt(cypted []byte, key, iv []byte) ([]byte, error) { + //创建加密算法实例 + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + //创建加密客户端实例 + blockMode := cipher.NewCBCDecrypter(block, iv) + origData := make([]byte, len(cypted)) + //这个函数也可以用来解密 + blockMode.CryptBlocks(origData, cypted) + //去除填充字符串 + origData, err = pKCS7UnPadding(origData) + if err != nil { + return nil, err + } + return origData, err +} diff --git a/security/pass.go b/security/pass.go new file mode 100644 index 0000000..eee0e80 --- /dev/null +++ b/security/pass.go @@ -0,0 +1,51 @@ +package security + +import ( + "encoding/base64" + "errors" + "math/rand" + "strings" + "time" +) + +const passwordLength = 256 + +type password [passwordLength]byte + +func init() { + // 更新随机种子,防止生成一样的随机密码 + rand.Seed(time.Now().Unix()) +} + +// 采用base64编码把密码转换为字符串 +func (password *password) String() string { + return base64.StdEncoding.EncodeToString(password[:]) +} + +// ParseLittlePassword 解析采用base64编码的字符串获取密码 +func ParseLittlePassword(passwordString string) (*password, error) { + bs, err := base64.StdEncoding.DecodeString(strings.TrimSpace(passwordString)) + if err != nil || len(bs) != passwordLength { + return nil, errors.New("不合法的密码") + } + password := password{} + copy(password[:], bs) + //bs = nil + return &password, nil +} + +// RandLittlePassword 产生 256个byte随机组合的 密码,最后会使用base64编码为字符串存储在配置文件中 +// 不能出现任何一个重复的byte位,必须又 0-255 组成,并且都需要包含 +func RandLittlePassword() string { + // 随机生成一个由 0~255 组成的 byte 数组 + intArr := rand.Perm(passwordLength) + password := &password{} + for i, v := range intArr { + password[i] = byte(v) + if i == v { + // 确保不会出现如何一个byte位出现重复 + return RandLittlePassword() + } + } + return password.String() +} diff --git a/tcpConn.go b/tcpConn.go new file mode 100644 index 0000000..615efee --- /dev/null +++ b/tcpConn.go @@ -0,0 +1,257 @@ +package hpds_net_framework + +import ( + "encoding/binary" + "fmt" + "git.hpds.cc/Component/logging" + "github.com/google/uuid" + "go.uber.org/zap" + "io" + "net" + "runtime/debug" + "sync" + "sync/atomic" + "time" +) + +// TCPConn is warped tcp conn for luck +type TCPConn struct { + sync.RWMutex + uuid string + net.Conn + // 缓写队列 + writeQueue chan []byte + // 逻辑消息队列 + logicQueue chan []byte + // 消息处理器 + processor Processor + userData interface{} + node INode + // after close + closeCb func() + closeFlag int64 + logger *logging.Logger +} + +// KCPConn 可靠的UDP,like TCP +type KCPConn struct { + *TCPConn +} + +// NewKcpConn get new kcp conn +func NewKcpConn(conn net.Conn, processor Processor, logger *logging.Logger) *KCPConn { + tcpConn := NewTcpConn(conn, processor, logger) + if tcpConn != nil { + return &KCPConn{tcpConn} + } + return nil +} + +// NewTcpConn return new tcp conn +func NewTcpConn(conn net.Conn, processor Processor, logger *logging.Logger) *TCPConn { + if processor == nil || conn == nil { + return nil + } + tc := &TCPConn{ + uuid: uuid.New().String(), + Conn: conn, + writeQueue: make(chan []byte, Cfg.ConnWriteQueueSize), + processor: processor, + // 单个缓存100个为处理的包 + logicQueue: make(chan []byte, Cfg.ConnUndoQueueSize), + logger: logger, + } + // write q + go func() { + for pkg := range tc.writeQueue { + if pkg == nil { + break + } + if Cfg.ConnWriteTimeout > 0 { + _ = tc.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(Cfg.ConnWriteTimeout))) + } + _, err := tc.Write(pkg) + if err != nil { + logger.Error("tcp write", zap.Error(err)) + break + } + _ = tc.SetWriteDeadline(time.Time{}) + } + // write over or error + _ = tc.Close() + logger.Info("Conn Close", + zap.String("local address", tc.Conn.LocalAddr().String()), + zap.String("remote address", tc.Conn.RemoteAddr().String()), + ) + }() + // logic q + go func() { + for pkg := range tc.logicQueue { + // logic over + if pkg == nil { + break + } + // processor handle the package + func() { + defer func() { + if r := recover(); r != nil { + logger.Error("processor panic", + zap.Any("panic", r), + zap.ByteString("stack", debug.Stack()), + ) + } + }() + _ = tc.processor.OnReceivedPackage(tc, pkg) + }() + } + }() + return tc +} + +// GetUuid get uuid of conn +func (tc *TCPConn) GetUuid() string { + return tc.uuid +} + +// ReadMsg read | write end -> write | read end -> conn end +func (tc *TCPConn) ReadMsg() { + defer func() { + tc.logicQueue <- nil + tc.writeQueue <- nil + // force close conn + if !tc.IsClosed() { + _ = tc.Close() + } + }() + bf := make([]byte, Cfg.MaxDataPackageSize) + // 第一个包默认5秒 + timeout := time.Second * time.Duration(Cfg.FirstPackageTimeout) + for { + _ = tc.SetReadDeadline(time.Now().Add(timeout)) + // read length + _, err := io.ReadAtLeast(tc, bf[:2], 2) + if err != nil { + tc.logger.Error("TCPConn read message head", + zap.Error(err), + ) + return + } + var ln uint16 + if tc.processor.GetBigEndian() { + ln = binary.BigEndian.Uint16(bf[:2]) + } else { + ln = binary.LittleEndian.Uint16(bf[:2]) + } + if ln < 1 || int(ln) > Cfg.MaxDataPackageSize { + tc.logger.Error("TCPConn message length invalid", + zap.Uint16("length", ln), + zap.Error(fmt.Errorf("TCPConn message length invalid")), + ) + return + } + // read data + _, err = io.ReadFull(tc, bf[:ln]) + if err != nil { + tc.logger.Error("TCPConn read data", + zap.Error(err), + ) + return + } + // clean + _ = tc.SetDeadline(time.Time{}) + // write to cache queue + select { + case tc.logicQueue <- append(make([]byte, 0), bf[:ln]...): + default: + // ignore overflow package not close conn + tc.logger.Error("TCPConn logic queue overflow", + zap.String("local address", tc.LocalAddr().String()), + zap.String("remote address", tc.RemoteAddr().String()), + zap.Int("queue length", len(tc.logicQueue)), + zap.Error(fmt.Errorf("TCPConn logic queue overflow")), + ) + } + // after first pack | check heartbeat + timeout = time.Second * time.Duration(Cfg.ConnReadTimeout) + } +} + +// WriteMsg warp msg base on connection's processor +func (tc *TCPConn) WriteMsg(message interface{}) { + pkg, err := tc.processor.WrapMsg(message) + if err != nil { + tc.logger.Error("OnWrapMsg package", + zap.Error(err), + ) + } else { + push: + select { + case tc.writeQueue <- pkg: + default: + if tc.IsClosed() { + return + } + time.Sleep(time.Millisecond * 50) + // re push + goto push + } + } +} + +// Close the connection +func (tc *TCPConn) Close() error { + tc.Lock() + defer func() { + tc.Unlock() + // add close flag + atomic.AddInt64(&tc.closeFlag, 1) + if tc.closeCb != nil { + tc.closeCb() + } + // clean write q if not empty + for len(tc.writeQueue) > 0 { + <-tc.writeQueue + } + }() + return tc.Conn.Close() +} + +// IsClosed return the status of conn +func (tc *TCPConn) IsClosed() bool { + return atomic.LoadInt64(&tc.closeFlag) != 0 +} + +// AfterClose conn call back +func (tc *TCPConn) AfterClose(cb func()) { + tc.Lock() + defer tc.Unlock() + tc.closeCb = cb +} + +// SetData for conn +func (tc *TCPConn) SetData(data interface{}) { + tc.Lock() + defer tc.Unlock() + tc.userData = data +} + +// GetData from conn +func (tc *TCPConn) GetData() interface{} { + tc.RLock() + defer tc.RUnlock() + return tc.userData +} + +// SetNode for conn +func (tc *TCPConn) SetNode(node INode) { + tc.Lock() + defer tc.Unlock() + tc.node = node +} + +// GetNode from conn +func (tc *TCPConn) GetNode() INode { + tc.RLock() + defer tc.RUnlock() + return tc.node +} diff --git a/tcpServer.go b/tcpServer.go new file mode 100644 index 0000000..155715a --- /dev/null +++ b/tcpServer.go @@ -0,0 +1,59 @@ +package hpds_net_framework + +import ( + "go.uber.org/zap" + "net" + "runtime/debug" + "sync" + + "git.hpds.cc/Component/logging" +) + +type tcpServer struct { + mu sync.Mutex + addr string + ln net.Listener + processor Processor + logger *logging.Logger +} + +// NewTcpServer return new tcpServer +func NewTcpServer(addr string, processor Processor, logger *logging.Logger) (s *tcpServer, err error) { + ts := new(tcpServer) + ts.addr = addr + ts.ln, err = net.Listen("tcp", addr) + if processor == nil { + panic("processor must be set.") + } + ts.processor = processor + ts.logger = logger + if err != nil { + return nil, err + } + return ts, err +} + +// Run the server +func (s *tcpServer) Run() error { + s.logger.Info("Starting tcp server", zap.String("address", s.addr)) + for { + conn, err := s.ln.Accept() + if err != nil { + return err + } + go s.Handle(conn) + } +} + +// Handle goroutine handle connection +func (s *tcpServer) Handle(conn net.Conn) { + defer func() { + if r := recover(); r != nil { + s.logger.Error("TCP handle panic", zap.Any("PANIC", r), + zap.ByteString("stack", debug.Stack())) + } + }() + var ic IConnection + ic = NewTcpConn(conn, s.processor, s.logger) + ic.ReadMsg() +}