network/frame/handshake_frame.go

131 lines
4.0 KiB
Go

package frame
import (
"encoding/binary"
coder "git.hpds.cc/Component/mq_coder"
)
// HandshakeFrame is a coder encoded.
type HandshakeFrame struct {
// name is client name
name string
// ClientId represents client id
id string
// ClientType represents client type (Protocol Gateway | Stream Function)
streamType byte
// ObserveDataTags are the client data tag list.
observeDataTags []Tag
metadata []byte
}
// NewHandshakeFrame creates a new HandshakeFrame.
func NewHandshakeFrame(name string, id string, stream byte, observeDataTags []Tag, metadata []byte) *HandshakeFrame {
return &HandshakeFrame{
name: name,
id: id,
streamType: stream,
observeDataTags: observeDataTags,
metadata: metadata,
}
}
// Name is the name of dataStream.
func (h *HandshakeFrame) Name() string { return h.name }
// ID represents the dataStream ID, the ID must be a unique string.
func (h *HandshakeFrame) ID() string { return h.id }
// StreamType represents dataStream type (Source | SFN | UpstreamEmitter).
// different StreamType has different behaviors in server side.
func (h *HandshakeFrame) StreamType() byte { return h.streamType }
// ObserveDataTags are the stream data tag list.
func (h *HandshakeFrame) ObserveDataTags() []Tag { return h.observeDataTags }
// Metadata holds stream metadata,
// metadata stores information for route the data.
func (h *HandshakeFrame) Metadata() []byte { return h.metadata }
// Type returns the type of HandshakeFrame.
func (h *HandshakeFrame) Type() Type { return TagOfHandshakeFrame }
// Encode to coder encoding.
func (h *HandshakeFrame) Encode() []byte {
// name
nameBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeName))
nameBlock.SetStringValue(h.name)
// ID
idBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeId))
idBlock.SetStringValue(h.id)
// stream type
typeBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeStreamType))
typeBlock.SetBytesValue([]byte{h.streamType})
// observe data tags
observeDataTagsBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeObserveDataTags))
buf := make([]byte, 4)
for _, v := range h.observeDataTags {
binary.LittleEndian.PutUint32(buf, uint32(v))
observeDataTagsBlock.AddBytes(buf)
}
// metadata
metadataBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeMetadata))
metadataBlock.SetBytesValue(h.metadata)
// handshake frame
handshake := coder.NewNodePacketEncoder(byte(h.Type()))
handshake.AddPrimitivePacket(nameBlock)
handshake.AddPrimitivePacket(idBlock)
handshake.AddPrimitivePacket(typeBlock)
handshake.AddPrimitivePacket(observeDataTagsBlock)
handshake.AddPrimitivePacket(metadataBlock)
return handshake.Encode()
}
// DecodeToHandshakeFrame decodes coder encoded bytes to HandshakeFrame.
func DecodeToHandshakeFrame(buf []byte) (*HandshakeFrame, error) {
node := coder.NodePacket{}
_, err := coder.DecodeToNodePacket(buf, &node)
if err != nil {
return nil, err
}
handshake := &HandshakeFrame{}
// name
if nameBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeName)]; ok {
name, err := nameBlock.ToUTF8String()
if err != nil {
return nil, err
}
handshake.name = name
}
// client id
if idBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeId)]; ok {
id, err := idBlock.ToUTF8String()
if err != nil {
return nil, err
}
handshake.id = id
}
// client type
if typeBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeStreamType)]; ok {
streamType := typeBlock.ToBytes()
handshake.streamType = streamType[0]
}
// observe data tag list
if observeDataTagsBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeObserveDataTags)]; ok {
buf := observeDataTagsBlock.GetValBuf()
length := len(buf) / 4
for i := 0; i < length; i++ {
pos := i * 4
handshake.observeDataTags = append(handshake.observeDataTags, Tag(binary.LittleEndian.Uint32(buf[pos:pos+4])))
}
}
// metadata
if typeBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeMetadata)]; ok {
metadata := typeBlock.ToBytes()
handshake.metadata = metadata
}
return handshake, nil
}