You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
177 lines
4.3 KiB
177 lines
4.3 KiB
package hpds_net_framework |
|
|
|
import ( |
|
"encoding/binary" |
|
"fmt" |
|
"git.hpds.cc/Component/logging" |
|
"github.com/golang/protobuf/proto" |
|
"go.uber.org/zap" |
|
"reflect" |
|
) |
|
|
|
// PbProcessor one of Processor implement protoc --go_out=. *.proto |
|
type PbProcessor struct { |
|
bigEndian bool |
|
enc Encryptor |
|
msgTypes map[reflect.Type]int |
|
handlers map[int]msgInfo |
|
logger *logging.Logger |
|
} |
|
|
|
// NewPbProcessor return PB processor |
|
func NewPbProcessor(logger *logging.Logger) *PbProcessor { |
|
pb := PbProcessor{ |
|
msgTypes: make(map[reflect.Type]int), |
|
handlers: make(map[int]msgInfo), |
|
logger: logger, |
|
} |
|
return &pb |
|
} |
|
|
|
// OnReceivedPackage 收到完整数据包, 返回解包错误 |
|
func (pbf *PbProcessor) OnReceivedPackage(writer interface{}, body []byte) error { |
|
// 解密 |
|
if pbf.enc != nil { |
|
//log.Debug("before decode:: %v", body) |
|
body = pbf.enc.Decode(body) |
|
//log.Debug("after decode:: %v", body) |
|
} |
|
// 解码 |
|
var pack Protocol |
|
if err := proto.UnmarshalMerge(body, &pack); err != nil { |
|
pbf.logger.Error("Can't unmarshal pack body to protocolMsg", |
|
zap.ByteString("body", body), |
|
zap.Error(err), |
|
) |
|
return err |
|
} |
|
info, ok := pbf.handlers[int(pack.Id)] |
|
if !ok { |
|
pbf.logger.Error("Not register msg id", |
|
zap.Uint32("pack Id", pack.Id), |
|
) |
|
// handler not found, not a dead err |
|
return nil |
|
} |
|
msg := reflect.New(info.msgType.Elem()).Interface() |
|
err := proto.UnmarshalMerge(pack.Content, msg.(proto.Message)) |
|
if err != nil { |
|
pbf.logger.Error("Unmarshal Merge pack contents error", |
|
zap.Uint32("pack id", pack.Id), |
|
zap.Error(err), |
|
) |
|
return err |
|
} |
|
// 执行逻辑 |
|
execute(info, msg, writer, body, pack.Id) |
|
return nil |
|
} |
|
|
|
// WrapMsg format the interface message to []byte |
|
func (pbf *PbProcessor) WrapMsg(message interface{}) ([]byte, error) { |
|
pbf.logger.Debug("Protobuf processor wrap for write", |
|
zap.Any("reflect.TypeOf", reflect.TypeOf(message)), |
|
) |
|
data, err := proto.Marshal(message.(proto.Message)) |
|
if err != nil { |
|
return nil, err |
|
} |
|
tp := reflect.TypeOf(message) |
|
id, ok := pbf.msgTypes[tp] |
|
if !ok { |
|
return nil, fmt.Errorf("not register %v", tp) |
|
} |
|
protocol := Protocol{ |
|
Id: uint32(id), |
|
Content: data, |
|
} |
|
data, err = proto.Marshal(&protocol) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if pbf.enc != nil { |
|
//log.Debug("before encode:: %v", data) |
|
data = pbf.enc.Encode(data) |
|
//log.Debug("after encode:: %v", data) |
|
} |
|
// head |
|
head := make([]byte, 2) |
|
if pbf.bigEndian { |
|
binary.BigEndian.PutUint16(head, uint16(len(data))) |
|
} else { |
|
binary.LittleEndian.PutUint16(head, uint16(len(data))) |
|
} |
|
pkg := append(head, data...) |
|
return pkg, nil |
|
} |
|
|
|
// WrapIdMsg format the interface message to []byte with id |
|
func (pbf *PbProcessor) WrapIdMsg(id uint32, message interface{}) ([]byte, error) { |
|
pbf.logger.Debug("Protobuf processor wrap for write", |
|
zap.Any("reflect.TypeOf", reflect.TypeOf(message)), |
|
) |
|
data, err := proto.Marshal(message.(proto.Message)) |
|
if err != nil { |
|
return nil, err |
|
} |
|
protocol := Protocol{ |
|
Id: id, |
|
Content: data, |
|
} |
|
data, err = proto.Marshal(&protocol) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if pbf.enc != nil { |
|
//log.Debug("before encode:: %v", data) |
|
data = pbf.enc.Encode(data) |
|
//log.Debug("after encode:: %v", data) |
|
} |
|
// head |
|
head := make([]byte, 2) |
|
if pbf.bigEndian { |
|
binary.BigEndian.PutUint16(head, uint16(len(data))) |
|
} else { |
|
binary.LittleEndian.PutUint16(head, uint16(len(data))) |
|
} |
|
pkg := append(head, data...) |
|
return pkg, nil |
|
} |
|
|
|
// WrapMsgNoHeader without header length |
|
func (pbf *PbProcessor) WrapMsgNoHeader(message interface{}) ([]byte, error) { |
|
data, err := pbf.WrapMsg(message) |
|
if err != nil { |
|
return nil, err |
|
} |
|
return data[2:], nil |
|
} |
|
|
|
// RegisterHandler for logic |
|
func (pbf *PbProcessor) RegisterHandler(id int, entity interface{}, handle func(args ...interface{})) { |
|
if _, ok := pbf.handlers[id]; ok { |
|
pbf.logger.Error("Already register handler", zap.Int("ID", id)) |
|
} else { |
|
pbf.handlers[id] = msgInfo{ |
|
msgId: id, |
|
msgType: reflect.TypeOf(entity), |
|
msgCallback: handle, |
|
} |
|
pbf.msgTypes[reflect.TypeOf(entity)] = id |
|
} |
|
} |
|
|
|
// SetBigEndian for order |
|
func (pbf *PbProcessor) SetBigEndian() { |
|
pbf.bigEndian = true |
|
} |
|
|
|
// GetBigEndian of the order |
|
func (pbf *PbProcessor) GetBigEndian() bool { |
|
return pbf.bigEndian |
|
} |
|
|
|
// SetEncryptor for processor |
|
func (pbf *PbProcessor) SetEncryptor(enc Encryptor) { |
|
pbf.enc = enc |
|
}
|
|
|