131 lines
4.0 KiB
Go
131 lines
4.0 KiB
Go
package frame
|
|
|
|
import (
|
|
"encoding/binary"
|
|
coder "git.hpds.cc/Component/mq_coder"
|
|
)
|
|
|
|
// HandshakeFrame is a coder encoded.
|
|
type HandshakeFrame struct {
|
|
// name is client name
|
|
name string
|
|
// ClientId represents client id
|
|
id string
|
|
// ClientType represents client type (Protocol Gateway | Stream Function)
|
|
streamType byte
|
|
// ObserveDataTags are the client data tag list.
|
|
observeDataTags []Tag
|
|
metadata []byte
|
|
}
|
|
|
|
// NewHandshakeFrame creates a new HandshakeFrame.
|
|
func NewHandshakeFrame(name string, id string, stream byte, observeDataTags []Tag, metadata []byte) *HandshakeFrame {
|
|
return &HandshakeFrame{
|
|
name: name,
|
|
id: id,
|
|
streamType: stream,
|
|
observeDataTags: observeDataTags,
|
|
metadata: metadata,
|
|
}
|
|
}
|
|
|
|
// Name is the name of dataStream.
|
|
func (h *HandshakeFrame) Name() string { return h.name }
|
|
|
|
// ID represents the dataStream ID, the ID must be a unique string.
|
|
func (h *HandshakeFrame) ID() string { return h.id }
|
|
|
|
// StreamType represents dataStream type (Source | SFN | UpstreamEmitter).
|
|
// different StreamType has different behaviors in server side.
|
|
func (h *HandshakeFrame) StreamType() byte { return h.streamType }
|
|
|
|
// ObserveDataTags are the stream data tag list.
|
|
func (h *HandshakeFrame) ObserveDataTags() []Tag { return h.observeDataTags }
|
|
|
|
// Metadata holds stream metadata,
|
|
// metadata stores information for route the data.
|
|
func (h *HandshakeFrame) Metadata() []byte { return h.metadata }
|
|
|
|
// Type returns the type of HandshakeFrame.
|
|
func (h *HandshakeFrame) Type() Type { return TagOfHandshakeFrame }
|
|
|
|
// Encode to coder encoding.
|
|
func (h *HandshakeFrame) Encode() []byte {
|
|
// name
|
|
nameBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeName))
|
|
nameBlock.SetStringValue(h.name)
|
|
// ID
|
|
idBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeId))
|
|
idBlock.SetStringValue(h.id)
|
|
// stream type
|
|
typeBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeStreamType))
|
|
typeBlock.SetBytesValue([]byte{h.streamType})
|
|
// observe data tags
|
|
observeDataTagsBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeObserveDataTags))
|
|
buf := make([]byte, 4)
|
|
for _, v := range h.observeDataTags {
|
|
binary.LittleEndian.PutUint32(buf, uint32(v))
|
|
observeDataTagsBlock.AddBytes(buf)
|
|
}
|
|
// metadata
|
|
metadataBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeMetadata))
|
|
metadataBlock.SetBytesValue(h.metadata)
|
|
// handshake frame
|
|
handshake := coder.NewNodePacketEncoder(byte(h.Type()))
|
|
handshake.AddPrimitivePacket(nameBlock)
|
|
handshake.AddPrimitivePacket(idBlock)
|
|
handshake.AddPrimitivePacket(typeBlock)
|
|
handshake.AddPrimitivePacket(observeDataTagsBlock)
|
|
handshake.AddPrimitivePacket(metadataBlock)
|
|
|
|
return handshake.Encode()
|
|
}
|
|
|
|
// DecodeToHandshakeFrame decodes coder encoded bytes to HandshakeFrame.
|
|
func DecodeToHandshakeFrame(buf []byte) (*HandshakeFrame, error) {
|
|
node := coder.NodePacket{}
|
|
_, err := coder.DecodeToNodePacket(buf, &node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
handshake := &HandshakeFrame{}
|
|
// name
|
|
if nameBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeName)]; ok {
|
|
name, err := nameBlock.ToUTF8String()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
handshake.name = name
|
|
}
|
|
// client id
|
|
if idBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeId)]; ok {
|
|
id, err := idBlock.ToUTF8String()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
handshake.id = id
|
|
}
|
|
// client type
|
|
if typeBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeStreamType)]; ok {
|
|
streamType := typeBlock.ToBytes()
|
|
handshake.streamType = streamType[0]
|
|
}
|
|
// observe data tag list
|
|
if observeDataTagsBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeObserveDataTags)]; ok {
|
|
buf := observeDataTagsBlock.GetValBuf()
|
|
length := len(buf) / 4
|
|
for i := 0; i < length; i++ {
|
|
pos := i * 4
|
|
handshake.observeDataTags = append(handshake.observeDataTags, Tag(binary.LittleEndian.Uint32(buf[pos:pos+4])))
|
|
}
|
|
}
|
|
// metadata
|
|
if typeBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeMetadata)]; ok {
|
|
metadata := typeBlock.ToBytes()
|
|
handshake.metadata = metadata
|
|
}
|
|
|
|
return handshake, nil
|
|
}
|