network/frame/meta_frame.go

141 lines
3.2 KiB
Go

package frame
import (
"strconv"
"time"
coder "git.hpds.cc/Component/mq_coder"
gonanoid "github.com/matoous/go-nanoid/v2"
)
// MetaFrame is a coder encoded bytes, SeqId is a fixed value of TYPE_ID_TRANSACTION.
// used for describes metadata for a DataFrame.
type MetaFrame struct {
tid string
metadata []byte
sourceId string
broadcast bool
}
// randString genetates a random string.
func randString() string {
tid, err := gonanoid.New()
if err != nil {
tid = strconv.FormatInt(time.Now().UnixMicro(), 10)
}
return tid
}
// NewMetaFrame creates a new MetaFrame instance.
func NewMetaFrame() *MetaFrame {
tid, err := gonanoid.New()
if err != nil {
tid = strconv.FormatInt(time.Now().UnixMicro(), 10)
}
return &MetaFrame{tid: tid}
}
// SetTransactionId set the transaction id.
func (m *MetaFrame) SetTransactionId(transactionId string) {
m.tid = transactionId
}
// TransactionId returns transactionId
func (m *MetaFrame) TransactionId() string {
return m.tid
}
// SetMetadata set the extra info of the application
func (m *MetaFrame) SetMetadata(metadata []byte) {
m.metadata = metadata
}
// Metadata returns the extra info of the application
func (m *MetaFrame) Metadata() []byte {
return m.metadata
}
// SetSourceId set the source ID.
func (m *MetaFrame) SetSourceId(sourceId string) {
m.sourceId = sourceId
}
// SourceId returns source ID
func (m *MetaFrame) SourceId() string {
return m.sourceId
}
// SetBroadcast set broadcast mode
func (m *MetaFrame) SetBroadcast(enabled bool) {
m.broadcast = enabled
}
// IsBroadcast returns the broadcast mode is enabled
func (m *MetaFrame) IsBroadcast() bool {
return m.broadcast
}
// Encode implements Frame.Encode method.
func (m *MetaFrame) Encode() []byte {
meta := coder.NewNodePacketEncoder(byte(TagOfMetaFrame))
// transaction Id
transactionId := coder.NewPrimitivePacketEncoder(byte(TagOfTransactionId))
transactionId.SetStringValue(m.tid)
meta.AddPrimitivePacket(transactionId)
// source Id
sourceId := coder.NewPrimitivePacketEncoder(byte(TagOfSourceId))
sourceId.SetStringValue(m.sourceId)
meta.AddPrimitivePacket(sourceId)
// metadata
if m.metadata != nil {
metadata := coder.NewPrimitivePacketEncoder(byte(TagOfMetadata))
metadata.SetBytesValue(m.metadata)
meta.AddPrimitivePacket(metadata)
}
// broadcast mode
broadcast := coder.NewPrimitivePacketEncoder(byte(TagOfBroadcast))
broadcast.SetBoolValue(m.broadcast)
meta.AddPrimitivePacket(broadcast)
return meta.Encode()
}
// DecodeToMetaFrame decode a MetaFrame instance from given buffer.
func DecodeToMetaFrame(buf []byte, meta *MetaFrame) error {
nodeBlock := coder.NodePacket{}
_, err := coder.DecodeToNodePacket(buf, &nodeBlock)
if err != nil {
return err
}
//meta := &MetaFrame{}
for k, v := range nodeBlock.PrimitivePackets {
switch k {
case byte(TagOfTransactionId):
val, err := v.ToUTF8String()
if err != nil {
return err
}
meta.tid = val
case byte(TagOfMetadata):
meta.metadata = v.ToBytes()
case byte(TagOfSourceId):
sourceId, err := v.ToUTF8String()
if err != nil {
return err
}
meta.sourceId = sourceId
case byte(TagOfBroadcast):
broadcast, err := v.ToBool()
if err != nil {
return err
}
meta.broadcast = broadcast
}
}
return nil
}