package hpds_net_framework import ( "encoding/binary" "fmt" "git.hpds.cc/Component/logging" "github.com/golang/protobuf/proto" "go.uber.org/zap" "reflect" ) // PbProcessor one of Processor implement protoc --go_out=. *.proto type PbProcessor struct { bigEndian bool enc Encryptor msgTypes map[reflect.Type]int handlers map[int]msgInfo logger *logging.Logger } // NewPbProcessor return PB processor func NewPbProcessor(logger *logging.Logger) *PbProcessor { pb := PbProcessor{ msgTypes: make(map[reflect.Type]int), handlers: make(map[int]msgInfo), logger: logger, } return &pb } // OnReceivedPackage 收到完整数据包, 返回解包错误 func (pbf *PbProcessor) OnReceivedPackage(writer interface{}, body []byte) error { // 解密 if pbf.enc != nil { //log.Debug("before decode:: %v", body) body = pbf.enc.Decode(body) //log.Debug("after decode:: %v", body) } // 解码 var pack Protocol if err := proto.UnmarshalMerge(body, &pack); err != nil { pbf.logger.Error("Can't unmarshal pack body to protocolMsg", zap.ByteString("body", body), zap.Error(err), ) return err } info, ok := pbf.handlers[int(pack.Id)] if !ok { pbf.logger.Error("Not register msg id", zap.Uint32("pack Id", pack.Id), ) // handler not found, not a dead err return nil } msg := reflect.New(info.msgType.Elem()).Interface() err := proto.UnmarshalMerge(pack.Content, msg.(proto.Message)) if err != nil { pbf.logger.Error("Unmarshal Merge pack contents error", zap.Uint32("pack id", pack.Id), zap.Error(err), ) return err } // 执行逻辑 execute(info, msg, writer, body, pack.Id) return nil } // WrapMsg format the interface message to []byte func (pbf *PbProcessor) WrapMsg(message interface{}) ([]byte, error) { pbf.logger.Debug("Protobuf processor wrap for write", zap.Any("reflect.TypeOf", reflect.TypeOf(message)), ) data, err := proto.Marshal(message.(proto.Message)) if err != nil { return nil, err } tp := reflect.TypeOf(message) id, ok := pbf.msgTypes[tp] if !ok { return nil, fmt.Errorf("not register %v", tp) } protocol := Protocol{ Id: uint32(id), Content: data, } data, err = proto.Marshal(&protocol) if err != nil { return nil, err } if pbf.enc != nil { //log.Debug("before encode:: %v", data) data = pbf.enc.Encode(data) //log.Debug("after encode:: %v", data) } // head head := make([]byte, 2) if pbf.bigEndian { binary.BigEndian.PutUint16(head, uint16(len(data))) } else { binary.LittleEndian.PutUint16(head, uint16(len(data))) } pkg := append(head, data...) return pkg, nil } // WrapIdMsg format the interface message to []byte with id func (pbf *PbProcessor) WrapIdMsg(id uint32, message interface{}) ([]byte, error) { pbf.logger.Debug("Protobuf processor wrap for write", zap.Any("reflect.TypeOf", reflect.TypeOf(message)), ) data, err := proto.Marshal(message.(proto.Message)) if err != nil { return nil, err } protocol := Protocol{ Id: id, Content: data, } data, err = proto.Marshal(&protocol) if err != nil { return nil, err } if pbf.enc != nil { //log.Debug("before encode:: %v", data) data = pbf.enc.Encode(data) //log.Debug("after encode:: %v", data) } // head head := make([]byte, 2) if pbf.bigEndian { binary.BigEndian.PutUint16(head, uint16(len(data))) } else { binary.LittleEndian.PutUint16(head, uint16(len(data))) } pkg := append(head, data...) return pkg, nil } // WrapMsgNoHeader without header length func (pbf *PbProcessor) WrapMsgNoHeader(message interface{}) ([]byte, error) { data, err := pbf.WrapMsg(message) if err != nil { return nil, err } return data[2:], nil } // RegisterHandler for logic func (pbf *PbProcessor) RegisterHandler(id int, entity interface{}, handle func(args ...interface{})) { if _, ok := pbf.handlers[id]; ok { pbf.logger.Error("Already register handler", zap.Int("ID", id)) } else { pbf.handlers[id] = msgInfo{ msgId: id, msgType: reflect.TypeOf(entity), msgCallback: handle, } pbf.msgTypes[reflect.TypeOf(entity)] = id } } // SetBigEndian for order func (pbf *PbProcessor) SetBigEndian() { pbf.bigEndian = true } // GetBigEndian of the order func (pbf *PbProcessor) GetBigEndian() bool { return pbf.bigEndian } // SetEncryptor for processor func (pbf *PbProcessor) SetEncryptor(enc Encryptor) { pbf.enc = enc }