network/frame/handshake_frame.go

131 lines
4.0 KiB
Go
Raw Permalink Normal View History

2022-10-11 17:36:09 +08:00
package frame
import (
2023-04-05 16:15:59 +08:00
"encoding/binary"
2022-10-11 17:36:09 +08:00
coder "git.hpds.cc/Component/mq_coder"
)
// HandshakeFrame is a coder encoded.
type HandshakeFrame struct {
2023-04-05 16:15:59 +08:00
// name is client name
name string
2022-10-11 17:36:09 +08:00
// ClientId represents client id
2023-04-05 16:15:59 +08:00
id string
2022-10-11 17:36:09 +08:00
// ClientType represents client type (Protocol Gateway | Stream Function)
2023-04-05 16:15:59 +08:00
streamType byte
2022-10-11 17:36:09 +08:00
// ObserveDataTags are the client data tag list.
2023-04-05 16:15:59 +08:00
observeDataTags []Tag
metadata []byte
2022-10-11 17:36:09 +08:00
}
// NewHandshakeFrame creates a new HandshakeFrame.
2023-04-05 16:15:59 +08:00
func NewHandshakeFrame(name string, id string, stream byte, observeDataTags []Tag, metadata []byte) *HandshakeFrame {
2022-10-11 17:36:09 +08:00
return &HandshakeFrame{
2023-04-05 16:15:59 +08:00
name: name,
id: id,
streamType: stream,
observeDataTags: observeDataTags,
metadata: metadata,
2022-10-11 17:36:09 +08:00
}
}
2023-04-05 16:15:59 +08:00
// Name is the name of dataStream.
func (h *HandshakeFrame) Name() string { return h.name }
// ID represents the dataStream ID, the ID must be a unique string.
func (h *HandshakeFrame) ID() string { return h.id }
// StreamType represents dataStream type (Source | SFN | UpstreamEmitter).
// different StreamType has different behaviors in server side.
func (h *HandshakeFrame) StreamType() byte { return h.streamType }
// ObserveDataTags are the stream data tag list.
func (h *HandshakeFrame) ObserveDataTags() []Tag { return h.observeDataTags }
// Metadata holds stream metadata,
// metadata stores information for route the data.
func (h *HandshakeFrame) Metadata() []byte { return h.metadata }
// Type returns the type of HandshakeFrame.
func (h *HandshakeFrame) Type() Type { return TagOfHandshakeFrame }
2022-10-11 17:36:09 +08:00
// Encode to coder encoding.
func (h *HandshakeFrame) Encode() []byte {
// name
2023-03-26 23:18:55 +08:00
nameBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeName))
2023-04-05 16:15:59 +08:00
nameBlock.SetStringValue(h.name)
// ID
2023-03-26 23:18:55 +08:00
idBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeId))
2023-04-05 16:15:59 +08:00
idBlock.SetStringValue(h.id)
// stream type
typeBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeStreamType))
typeBlock.SetBytesValue([]byte{h.streamType})
2022-10-11 17:36:09 +08:00
// observe data tags
2023-03-26 23:18:55 +08:00
observeDataTagsBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeObserveDataTags))
2023-04-05 16:15:59 +08:00
buf := make([]byte, 4)
for _, v := range h.observeDataTags {
binary.LittleEndian.PutUint32(buf, uint32(v))
observeDataTagsBlock.AddBytes(buf)
}
// metadata
metadataBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeMetadata))
metadataBlock.SetBytesValue(h.metadata)
2022-10-11 17:36:09 +08:00
// handshake frame
2023-03-26 23:18:55 +08:00
handshake := coder.NewNodePacketEncoder(byte(h.Type()))
2022-10-11 17:36:09 +08:00
handshake.AddPrimitivePacket(nameBlock)
handshake.AddPrimitivePacket(idBlock)
handshake.AddPrimitivePacket(typeBlock)
handshake.AddPrimitivePacket(observeDataTagsBlock)
2023-04-05 16:15:59 +08:00
handshake.AddPrimitivePacket(metadataBlock)
2022-10-11 17:36:09 +08:00
return handshake.Encode()
}
// DecodeToHandshakeFrame decodes coder encoded bytes to HandshakeFrame.
func DecodeToHandshakeFrame(buf []byte) (*HandshakeFrame, error) {
node := coder.NodePacket{}
_, err := coder.DecodeToNodePacket(buf, &node)
if err != nil {
return nil, err
}
handshake := &HandshakeFrame{}
// name
2023-03-26 23:18:55 +08:00
if nameBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeName)]; ok {
2022-10-11 17:36:09 +08:00
name, err := nameBlock.ToUTF8String()
if err != nil {
return nil, err
}
2023-04-05 16:15:59 +08:00
handshake.name = name
2022-10-11 17:36:09 +08:00
}
// client id
2023-03-26 23:18:55 +08:00
if idBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeId)]; ok {
2022-10-11 17:36:09 +08:00
id, err := idBlock.ToUTF8String()
if err != nil {
return nil, err
}
2023-04-05 16:15:59 +08:00
handshake.id = id
2022-10-11 17:36:09 +08:00
}
// client type
2023-04-05 16:15:59 +08:00
if typeBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeStreamType)]; ok {
streamType := typeBlock.ToBytes()
handshake.streamType = streamType[0]
2022-10-11 17:36:09 +08:00
}
// observe data tag list
2023-03-26 23:18:55 +08:00
if observeDataTagsBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeObserveDataTags)]; ok {
2023-04-05 16:15:59 +08:00
buf := observeDataTagsBlock.GetValBuf()
length := len(buf) / 4
for i := 0; i < length; i++ {
pos := i * 4
handshake.observeDataTags = append(handshake.observeDataTags, Tag(binary.LittleEndian.Uint32(buf[pos:pos+4])))
2022-10-11 17:36:09 +08:00
}
}
2023-04-05 16:15:59 +08:00
// metadata
if typeBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeMetadata)]; ok {
metadata := typeBlock.ToBytes()
handshake.metadata = metadata
2022-10-11 17:36:09 +08:00
}
return handshake, nil
}