This commit is contained in:
wangjian 2022-10-10 14:47:49 +08:00
parent b7e43ac8fe
commit bf70c3747c
21 changed files with 922 additions and 786 deletions

View File

@ -1,72 +0,0 @@
package mq_coder
import (
"bytes"
"io"
"io/ioutil"
)
type chunkVReader struct {
src io.Reader // the reader parts of V
buf *bytes.Buffer // the bytes part of V
totalSize int // size of whole buffer of this packet
off int // last read op
ChunkVSize int // the size of chunked V
}
// Read implement io.Reader interface
func (r *chunkVReader) Read(p []byte) (n int, err error) {
if r.src == nil {
return 0, nil
}
if r.off >= r.totalSize {
return 0, io.EOF
}
if r.off < r.totalSize-r.ChunkVSize {
n, err := r.buf.Read(p)
r.off += n
if err != nil {
if err == io.EOF {
return n, nil
} else {
return 0, err
}
}
return n, nil
}
n, err = r.src.Read(p)
r.off += n
if err != nil {
return n, err
}
return n, nil
}
// WriteTo implement io.WriteTo interface
func (r *chunkVReader) WriteTo(w io.Writer) (n int64, err error) {
if r.src == nil {
return 0, nil
}
// first, write existed buffer
m, err := w.Write(r.buf.Bytes())
if err != nil {
return 0, err
}
n += int64(m)
// last, write from reader
buf, err := ioutil.ReadAll(r.src)
if err != nil && err != io.EOF {
return 0, errWriteFromReader
}
m, err = w.Write(buf)
if err != nil {
return 0, err
}
n += int64(m)
return n, nil
}

View File

@ -1,54 +0,0 @@
package mq_coder
import (
"errors"
"io"
)
var (
errBuildIncomplete = errors.New("coder.Encoder: invalid structure of packet")
errInvalidAdding = errors.New("coder.Encoder: can not add this Packet after StreamPacket has been add")
errNonStreamPacket = errors.New("coder.Packet: this packet is not in node mode")
errWriteFromReader = errors.New("coder.streamV: write from reader error")
errNotNodeMode = errors.New("coder.Encoder: packet should be in node mode can be add other packets as child")
errNilReader = errors.New("coder.Decoder: nil source reader")
)
// Packet 编解码器包
type Packet interface {
// SeqId 返回此数据包的序列Id.
SeqId() int
// Size 返回整个数据包的大小
Size() int
// VSize 返回V的大小.
VSize() int
// Bytes 此数据包的全部字节.
Bytes() []byte
// Reader 返回 io.Reader 字节.
Reader() io.Reader
// VReader 返回 io.Reader 的 流字节.
VReader() io.Reader
// IsStreamMode 是否流模式
IsStreamMode() bool
// IsNodeMode 是否节点模式
IsNodeMode() bool
// BytesV 返回流字节
BytesV() []byte
// Utf8StringV 返回流的utf8字符串值
Utf8StringV() string
// Int32V 返回流的int32值
Int32V() (val int32, err error)
// UInt32V 返回流的uint32值
UInt32V() (val uint32, err error)
// Int64V 返回流的int64值
Int64V() (val int64, err error)
// UInt64V 返回流的uint64值
UInt64V() (val uint64, err error)
// Float32V 返回流的float32值
Float32V() (val float32, err error)
// Float64V 返回流的float64值
Float64V() (val float64, err error)
// BoolV 返回流的bool值
BoolV() (val bool, err error)
}

View File

@ -1,93 +0,0 @@
package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/spec"
)
// Decoder is the tool for decoding packet from stream
type Decoder struct {
tag spec.T
len *spec.L
rd io.Reader
}
// NewDecoder returns a Decoder from an io.Reader
func NewDecoder(reader io.Reader) *Decoder {
return &Decoder{
rd: reader,
}
}
// SeqId return the SequenceID of the decoding packet
func (d *Decoder) SeqId() int {
return d.tag.Sid()
}
// UnderlyingReader returns the reader this decoder using
func (d *Decoder) UnderlyingReader() io.Reader {
return d.rd
}
// ReadHeader will block until io.EOF or receive T and L of a packet.
func (d *Decoder) ReadHeader() error {
// only read T and L
return d.readTL()
}
// GetChunkedPacket will block until io.EOF or receive V of a packet in chunked mode.
func (d *Decoder) GetChunkedPacket() Packet {
return &StreamPacket{
t: d.tag,
l: *d.len,
vr: d.rd,
chunkMode: true,
chunkSize: d.len.VSize(),
}
}
// GetFullFilledPacket read full Packet from given io.Reader
func (d *Decoder) GetFullFilledPacket() (packet Packet, err error) {
// read V
buf := new(bytes.Buffer)
total := 0
for {
valBuf := make([]byte, d.len.VSize())
n, err := d.rd.Read(valBuf)
if n > 0 {
total += n
buf.Write(valBuf[:n])
}
if total >= d.len.VSize() || err != nil {
break
}
}
packet = &StreamPacket{
t: d.tag,
l: *d.len,
vBuf: buf.Bytes(),
chunkMode: false,
}
return packet, nil
}
func (d *Decoder) readTL() (err error) {
if d.rd == nil {
return errNilReader
}
// read T
d.tag, err = spec.ReadT(d.rd)
if err != nil {
return err
}
// read L
d.len, err = spec.ReadL(d.rd)
return err
}

View File

@ -2,157 +2,76 @@ package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/spec"
"fmt"
"git.hpds.cc/Component/mq_coder/encoding"
)
// Encoder is the tool for creating a packet easily
type Encoder struct {
tag spec.T
len *spec.L
valReader io.Reader
valReaderSize int
nodes map[int]Packet
state int
size int32 // size of value
isStreamMode bool
valBuf *bytes.Buffer
done bool
seqId int
isNodeMode bool
// Encoder will encode object to encoding
type encoder struct {
seqID byte
valbuf []byte
isNode bool
isArray bool
buf *bytes.Buffer
complete bool
}
// SetSeqId set sequenceId of a packet, if this packet contains other
// packets, isNode should set to true
func (b *Encoder) SetSeqId(seqId int, isNode bool) {
// init
b.valBuf = new(bytes.Buffer)
b.nodes = make(map[int]Packet)
// set seqId
b.seqId = seqId
b.isNodeMode = isNode
type iEncoder interface {
Encode() []byte
}
// SetBytesV set bytes type as V
func (b *Encoder) SetBytesV(buf []byte) {
b.size += int32(len(buf))
b.valBuf.Write(buf)
b.isStreamMode = false
b.state |= 0x04
func (enc *encoder) GetValBuf() []byte {
return enc.valbuf
}
// SetReaderV set io.Reader type as V
func (b *Encoder) SetReaderV(r io.Reader, size int) {
b.isStreamMode = true
b.valReader = r
b.state |= 0x04
b.size += int32(size)
b.valReaderSize = size
func (enc *encoder) IsEmpty() bool {
return len(enc.valbuf) == 0
}
// AddPacket add a Packet child to this packet, this packet must be NodeMode
func (b *Encoder) AddPacket(child Packet) error {
// only packet is in node mode can add other packets
if !b.isNodeMode {
return errNotNodeMode
func (enc *encoder) AddBytes(buf []byte) {
enc.valbuf = append(enc.valbuf, buf...)
}
func (enc *encoder) addRawPacket(en iEncoder) {
enc.valbuf = append(enc.valbuf, en.Encode()...)
}
// setTag write tag as seqID
func (enc *encoder) writeTag() {
if enc.seqID > 0x3F {
panic(fmt.Errorf("sid should be in [0..0x3F]"))
}
if b.done {
return errInvalidAdding
if enc.isNode {
enc.seqID = enc.seqID | 0x80
}
b.nodes[child.SeqId()] = child
buf := child.Bytes()
b.SetBytesV(buf)
return nil
if enc.isArray {
enc.seqID = enc.seqID | 0x40
}
enc.buf.WriteByte(enc.seqID)
}
// AddStreamPacket will put a StreamPacket in chunked mode to current packet.
func (b *Encoder) AddStreamPacket(child Packet) (err error) {
// if this packet is in stream mode, can not add any packets
if b.done {
return errInvalidAdding
}
// only accept packet in stream mode
if !child.IsStreamMode() {
return errNonStreamPacket
}
// set the valReader of this packet to the child's
b.valReader = child.VReader()
// valReaderSize will be the same as child's
b.valReaderSize = child.VSize()
// add this child packet
b.nodes[child.SeqId()] = child
// add the size of child's V to L of this packet
b.size += int32(child.Size())
// put the bytes of child to valBuf
buf := child.Bytes()
b.valBuf.Write(buf)
// update state
b.state |= 0x04
b.isStreamMode = true
b.done = true
return nil
}
// Packet return a Packet instance.
func (b *Encoder) Packet() (Packet, error) {
err := b.generateT()
func (enc *encoder) writeLengthBuf() {
vallen := len(enc.valbuf)
size := encoding.SizeOfPVarInt32(int32(vallen))
codec := encoding.VarCodec{Size: size}
tmp := make([]byte, size)
err := codec.EncodePVarInt32(tmp, int32(vallen))
if err != nil {
return nil, err
panic(err)
}
err = b.generateL()
if err != nil {
return nil, err
}
if b.state != 0x07 {
return nil, errBuildIncomplete
}
if b.isStreamMode {
return &StreamPacket{
t: b.tag,
l: *b.len,
vr: b.valReader,
vBuf: b.valBuf.Bytes(),
chunkMode: true,
chunkSize: b.valReaderSize,
}, err
}
// not streaming mode
return &StreamPacket{
t: b.tag,
l: *b.len,
vBuf: b.valBuf.Bytes(),
chunkMode: false,
}, err
enc.buf.Write(tmp)
}
// will generate T of a TLV.
func (b *Encoder) generateT() error {
t, err := spec.NewT(b.seqId)
t.SetNodeMode(b.isNodeMode)
if err != nil {
return err
// Encode returns a final Y3 encoded byte slice
func (enc *encoder) Encode() []byte {
if !enc.complete {
// Tag
enc.writeTag()
// Len
enc.writeLengthBuf()
// Val
enc.buf.Write(enc.valbuf)
enc.complete = true
}
b.tag = t
b.state |= 0x01
return nil
}
// will generate L of a TLV.
func (b *Encoder) generateL() error {
l, err := spec.NewL(int(b.size))
if err != nil {
return err
}
b.len = &l
b.state |= 0x02
return nil
return enc.buf.Bytes()
}

View File

@ -1,98 +0,0 @@
package mq_coder
import (
"git.hpds.cc/Component/mq_coder/encoding"
)
// SetUtf8StringV set utf-8 string type value as V
func (b *Encoder) SetUtf8StringV(v string) {
buf := []byte(v)
b.SetBytesV(buf)
}
// SetInt32V set an int32 type value as V
func (b *Encoder) SetInt32V(v int32) error {
size := encoding.SizeOfNVarInt32(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarInt32(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetUInt32V set an uint32 type value as V
func (b *Encoder) SetUInt32V(v uint32) error {
size := encoding.SizeOfNVarUInt32(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarUInt32(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetInt64V set an int64 type value as V
func (b *Encoder) SetInt64V(v int64) error {
size := encoding.SizeOfNVarInt64(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarInt64(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetUInt64V set an uint64 type value as V
func (b *Encoder) SetUInt64V(v uint64) error {
size := encoding.SizeOfNVarUInt64(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarUInt64(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetFloat32V set a float32 type value as V
func (b *Encoder) SetFloat32V(v float32) error {
size := encoding.SizeOfVarFloat32(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeVarFloat32(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetFloat64V set a float64 type value as V
func (b *Encoder) SetFloat64V(v float64) error {
size := encoding.SizeOfVarFloat64(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeVarFloat64(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetBoolV set bool type value as V
func (b *Encoder) SetBoolV(v bool) {
var size = encoding.SizeOfPVarUInt32(uint32(1))
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
_ = codec.EncodePVarBool(buf, v)
b.SetBytesV(buf)
}

96
node_decoder.go Normal file
View File

@ -0,0 +1,96 @@
package mq_coder
import (
"bytes"
"errors"
"git.hpds.cc/Component/mq_coder/encoding"
"git.hpds.cc/Component/mq_coder/utils"
)
func parsePayload(b []byte) (consumedBytes int, ifNodePacket bool, np *NodePacket, pp *PrimitivePacket, err error) {
if len(b) == 0 {
return 0, false, nil, nil, errors.New("parsePacket params can not be nil")
}
pos := 0
// NodePacket
if ok := utils.IsNodePacket(b[pos]); ok {
np = &NodePacket{}
endPos, err := DecodeToNodePacket(b, np)
return endPos, true, np, nil, err
}
pp = &PrimitivePacket{}
state, err := DecodeToPrimitivePacket(b, pp)
return state.ConsumedBytes, false, nil, pp, err
}
// DecodeToNodePacket parse out whole buffer to a NodePacket
func DecodeToNodePacket(buf []byte, pct *NodePacket) (consumedBytes int, err error) {
if len(buf) == 0 {
return 0, errors.New("empty buf")
}
pct.packet = &packet{
valBuf: buf,
buf: &bytes.Buffer{},
}
pct.NodePackets = map[byte]NodePacket{}
pct.PrimitivePackets = map[byte]PrimitivePacket{}
pos := 0
// `Tag`
tag := NewTag(buf[pos])
pct.packet.tag = tag
pct.buf.WriteByte(buf[pos])
pos++
// `Length`: the type is `varint`
tmpBuf := buf[pos:]
var valLen int32
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(tmpBuf, &valLen)
if err != nil {
return 0, err
}
pct.packet.length = int(valLen)
pct.buf.Write(buf[pos : pos+codec.Size])
pos += codec.Size
// if `Length` is 0, means empty node packet
if valLen == 0 {
return pos, nil
}
// `Value`
// `raw` is pct.Length() length
vl := int(valLen)
if vl < 0 {
return pos, errors.New("found L of V smaller than 0")
}
endPos := pos + vl
pct.packet.valBuf = buf[pos:endPos]
pct.buf.Write(buf[pos:endPos])
// Parse value to Packet
for {
if pos >= endPos || pos >= len(buf) {
break
}
_p, isNode, np, pp, err := parsePayload(buf[pos:endPos])
pos += _p
if err != nil {
return 0, err
}
if isNode {
pct.NodePackets[np.packet.tag.SeqId()] = *np
} else {
pct.PrimitivePackets[byte(pp.SeqId())] = *pp
}
}
consumedBytes = endPos
return consumedBytes, nil
}

33
node_encoder.go Normal file
View File

@ -0,0 +1,33 @@
package mq_coder
import (
"bytes"
)
// NodePacketEncoder used for encode a node packet
type NodePacketEncoder struct {
*encoder
}
// NewNodePacketEncoder returns an Encoder for node packet
func NewNodePacketEncoder(sid byte) *NodePacketEncoder {
nodeEnc := &NodePacketEncoder{
encoder: &encoder{
isNode: true,
buf: new(bytes.Buffer),
},
}
nodeEnc.seqID = sid
return nodeEnc
}
// AddNodePacket add new node to this node
func (enc *NodePacketEncoder) AddNodePacket(np *NodePacketEncoder) {
enc.addRawPacket(np)
}
// AddPrimitivePacket add new primitive to this node
func (enc *NodePacketEncoder) AddPrimitivePacket(np *PrimitivePacketEncoder) {
enc.addRawPacket(np)
}

10
node_packet.go Normal file
View File

@ -0,0 +1,10 @@
package mq_coder
// NodePacket describes complex values
type NodePacket struct {
*packet
// NodePackets store all the node packets
NodePackets map[byte]NodePacket
// PrimitivePackets store all the primitive packets
PrimitivePackets map[byte]PrimitivePacket
}

160
packet.go
View File

@ -2,155 +2,37 @@ package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
"git.hpds.cc/Component/mq_coder/spec"
)
// StreamPacket implement the Packet interface.
type StreamPacket struct {
t spec.T
l spec.L
vBuf []byte
vr io.Reader
chunkMode bool
chunkSize int
// packet is the base type of the NodePacket and PrimitivePacket
type packet struct {
tag *Tag
length int
valBuf []byte
buf *bytes.Buffer
}
var _ Packet = &StreamPacket{}
// SeqId returns the sequenceId of this packet
func (p *StreamPacket) SeqId() int { return p.t.Sid() }
// Size returns the size of whole packet.
func (p *StreamPacket) Size() int {
// T.Size + L.Size + V.Size
return p.t.Size() + p.l.Size() + p.l.VSize()
// GetRawBytes get all raw bytes of this packet
func (bp *packet) GetRawBytes() []byte {
return bp.buf.Bytes()
}
// VSize returns the size of V.
func (p *StreamPacket) VSize() int { return p.l.VSize() }
// Bytes return the raw bytes of this packet. V will be absent if
// is in chunked mode
func (p *StreamPacket) Bytes() []byte {
buf := new(bytes.Buffer)
// the raw bytes of T and L
p.writeTL(buf)
// p.valBuf stores the raw bytes of V
buf.Write(p.vBuf)
return buf.Bytes()
// Length return the length of Val this packet
func (bp *packet) Length() int {
return bp.length
}
// VReader return an io.Reader which can be read as the content of V.
func (p *StreamPacket) VReader() io.Reader {
if !p.chunkMode {
return bytes.NewReader(p.vBuf)
}
return p.vr
// SeqId returns Tag of this packet
func (bp *packet) SeqId() byte {
return bp.tag.SeqId()
}
// Reader return an io.Reader which can be read as the whole bytes of
// this packet. This function only available if this V of packet is in
// chunked mode.
func (p *StreamPacket) Reader() io.Reader {
if !p.chunkMode {
buf := new(bytes.Buffer)
buf.Write(p.t.Bytes())
buf.Write(p.l.Bytes())
buf.Write(p.vBuf)
return buf
}
buf := new(bytes.Buffer)
// T and L of this packet
p.writeTL(buf)
// V of this packet
buf.Write(p.vBuf)
return &chunkVReader{
buf: buf,
src: p.vr,
totalSize: p.Size(),
ChunkVSize: p.VSize(),
}
// IsSlice determine if the current node is a Slice
func (bp *packet) IsSlice() bool {
return bp.tag.IsSlice()
}
// IsStreamMode returns a bool value indicates if the V of
// this packet is in stream mode
func (p *StreamPacket) IsStreamMode() bool {
return p.chunkMode
}
// IsNodeMode returns a bool value indicates if this packet
// is node mode
func (p *StreamPacket) IsNodeMode() bool {
return p.t.IsNodeMode()
}
// write the raw bytes of T and L to given buf
func (p *StreamPacket) writeTL(buf *bytes.Buffer) {
buf.Write(p.t.Bytes())
buf.Write(p.l.Bytes())
}
// BytesV return V as bytes
func (p *StreamPacket) BytesV() []byte {
return p.vBuf
}
// UTF8StringV return V as utf-8 string
func (p *StreamPacket) Utf8StringV() string {
return string(p.vBuf)
}
// Int32V return V as int32
func (p *StreamPacket) Int32V() (val int32, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeNVarInt32(p.vBuf, &val)
return val, err
}
// UInt32V return V as uint32
func (p *StreamPacket) UInt32V() (val uint32, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeNVarUInt32(p.vBuf, &val)
return val, err
}
// Int64V return V as int64
func (p *StreamPacket) Int64V() (val int64, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeNVarInt64(p.vBuf, &val)
return val, err
}
// UInt64V return V as uint64
func (p *StreamPacket) UInt64V() (val uint64, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeNVarUInt64(p.vBuf, &val)
return val, err
}
// Float32V return V as float32
func (p *StreamPacket) Float32V() (val float32, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeVarFloat32(p.vBuf, &val)
return val, err
}
// Float64V return V as float64
func (p *StreamPacket) Float64V() (val float64, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeVarFloat64(p.vBuf, &val)
return val, err
}
// BoolV return V as bool
func (p *StreamPacket) BoolV() (val bool, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodePVarBool(p.vBuf, &val)
return val, err
// GetValBuf get raw buffer of Val of this packet
func (bp *packet) GetValBuf() []byte {
return bp.valBuf
}

100
parser.go Normal file
View File

@ -0,0 +1,100 @@
package mq_coder
import (
"bytes"
"errors"
"fmt"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
)
var (
ErrMalformed = errors.New("coder.ReadPacket: malformed")
)
// ReadPacket will try to read encoded packet from the reader
func ReadPacket(reader io.Reader) ([]byte, error) {
tag, err := readByte(reader)
if err != nil {
return nil, err
}
// buf will contain a complete encoded handshakeFrame
buf := bytes.Buffer{}
// the first byte is coder.Tag
// write coder.Tag bytes
buf.WriteByte(tag)
// read y3.Length bytes, a varInt format
lenBuf := bytes.Buffer{}
for {
b, e := readByte(reader)
if e != nil {
return nil, err
}
lenBuf.WriteByte(b)
if b&0x80 != 0x80 {
break
}
}
// parse to coder.Length
var length int32
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
if err != nil {
return nil, ErrMalformed
}
// validate len decoded from stream
if length < 0 {
return nil, fmt.Errorf("coder.ReadPacket() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
}
// write coder.Length bytes
buf.Write(lenBuf.Bytes())
// read next {len} bytes as coder.Value
valBuf := bytes.Buffer{}
// every batch read 512 bytes, if next reads < 512, read
var count int
for {
batchReadSize := 1024 * 1024
var tmpBuf []byte
if int(length)-count < batchReadSize {
tmpBuf = make([]byte, int(length)-count)
} else {
tmpBuf = make([]byte, batchReadSize)
}
p, e := reader.Read(tmpBuf)
count += p
if e != nil {
if e == io.EOF {
valBuf.Write(tmpBuf[:p])
break
}
return nil, fmt.Errorf("y3 parse valBuf error: %v", err)
}
valBuf.Write(tmpBuf[:p])
if count == int(length) {
break
}
}
if count < int(length) {
// return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count)
return nil, ErrMalformed
}
// write coder.Value bytes
buf.Write(valBuf.Bytes())
return buf.Bytes(), nil
}
func readByte(reader io.Reader) (byte, error) {
var b [1]byte
_, err := reader.Read(b[:])
return b[0], err
}

88
primitive_decoder.go Normal file
View File

@ -0,0 +1,88 @@
package mq_coder
import (
"bytes"
"errors"
"fmt"
"git.hpds.cc/Component/mq_coder/encoding"
)
// DecodeState represents the state of decoding
type DecodeState struct {
// ConsumedBytes is the bytes consumed by decoder
ConsumedBytes int
// SizeL is the bytes length of value
SizeL int
}
// DecodeToPrimitivePacket parse out whole buffer to a PrimitivePacket
//
// Examples:
// [0x01, 0x01, 0x01] -> Key=0x01, Value=0x01
// [0x41, 0x06, 0x03, 0x01, 0x61, 0x04, 0x01, 0x62] -> key=0x03, value=0x61; key=0x04, value=0x62
func DecodeToPrimitivePacket(buf []byte, p *PrimitivePacket) (*DecodeState, error) {
decoder := &DecodeState{
ConsumedBytes: 0,
SizeL: 0,
}
if buf == nil || len(buf) < primitivePacketBufferMinimalLength {
return decoder, errors.New("invalid packet minimal size")
}
p.packet = &packet{
valBuf: []byte{},
buf: &bytes.Buffer{},
}
var pos = 0
// first byte is `Tag`
p.tag = NewTag(buf[pos])
p.buf.WriteByte(buf[pos])
pos++
decoder.ConsumedBytes = pos
// read `Varint` from buf for `Length of value`
tmpBuf := buf[pos:]
var bufLen int32
codec := encoding.VarCodec{}
err := codec.DecodePVarInt32(tmpBuf, &bufLen)
if err != nil {
return decoder, err
}
if codec.Size < 1 {
return decoder, errors.New("malformed, size of Length can not smaller than 1")
}
// codec.Size describes how many bytes used to represent `Length`
p.buf.Write(buf[pos : pos+codec.Size])
pos += codec.Size
decoder.ConsumedBytes = pos
decoder.SizeL = codec.Size
// if length<0, error on decoding
if bufLen < 0 {
return decoder, errors.New("invalid packet, negative length")
}
// the length of value
p.length = int(bufLen)
if p.length == 0 {
p.valBuf = nil
return decoder, nil
}
// the next `p.length` bytes store value
endPos := pos + p.length
if pos > endPos || endPos > len(buf) || pos > len(buf) {
return decoder, fmt.Errorf("beyond the boundary, pos=%v, endPos=%v", pos, endPos)
}
p.valBuf = buf[pos:endPos]
p.buf.Write(buf[pos:endPos])
decoder.ConsumedBytes = endPos
return decoder, nil
}

112
primitive_encoder.go Normal file
View File

@ -0,0 +1,112 @@
package mq_coder
import (
"bytes"
"git.hpds.cc/Component/mq_coder/encoding"
)
// PrimitivePacketEncoder used for encode a primitive packet
type PrimitivePacketEncoder struct {
*encoder
}
// NewPrimitivePacketEncoder return an Encoder for primitive packet
func NewPrimitivePacketEncoder(sid byte) *PrimitivePacketEncoder {
prim := &PrimitivePacketEncoder{
encoder: &encoder{
isNode: false,
buf: new(bytes.Buffer),
},
}
prim.seqID = sid
return prim
}
// SetInt32Value encode int32 value
func (enc *PrimitivePacketEncoder) SetInt32Value(v int32) {
size := encoding.SizeOfNVarInt32(v)
codec := encoding.VarCodec{Size: size}
enc.valbuf = make([]byte, size)
err := codec.EncodeNVarInt32(enc.valbuf, v)
if err != nil {
panic(err)
}
}
// SetUInt32Value encode uint32 value
func (enc *PrimitivePacketEncoder) SetUInt32Value(v uint32) {
size := encoding.SizeOfNVarUInt32(v)
codec := encoding.VarCodec{Size: size}
enc.valbuf = make([]byte, size)
err := codec.EncodeNVarUInt32(enc.valbuf, v)
if err != nil {
panic(err)
}
}
// SetInt64Value encode int64 value
func (enc *PrimitivePacketEncoder) SetInt64Value(v int64) {
size := encoding.SizeOfNVarInt64(v)
codec := encoding.VarCodec{Size: size}
enc.valbuf = make([]byte, size)
err := codec.EncodeNVarInt64(enc.valbuf, v)
if err != nil {
panic(err)
}
}
// SetUInt64Value encode uint64 value
func (enc *PrimitivePacketEncoder) SetUInt64Value(v uint64) {
size := encoding.SizeOfNVarUInt64(v)
codec := encoding.VarCodec{Size: size}
enc.valbuf = make([]byte, size)
err := codec.EncodeNVarUInt64(enc.valbuf, v)
if err != nil {
panic(err)
}
}
// SetFloat32Value encode float32 value
func (enc *PrimitivePacketEncoder) SetFloat32Value(v float32) {
var size = encoding.SizeOfVarFloat32(v)
codec := encoding.VarCodec{Size: size}
enc.valbuf = make([]byte, size)
err := codec.EncodeVarFloat32(enc.valbuf, v)
if err != nil {
panic(err)
}
}
// SetFloat64Value encode float64 value
func (enc *PrimitivePacketEncoder) SetFloat64Value(v float64) {
var size = encoding.SizeOfVarFloat64(v)
codec := encoding.VarCodec{Size: size}
enc.valbuf = make([]byte, size)
err := codec.EncodeVarFloat64(enc.valbuf, v)
if err != nil {
panic(err)
}
}
// SetBoolValue encode bool value
func (enc *PrimitivePacketEncoder) SetBoolValue(v bool) {
var size = encoding.SizeOfPVarUInt32(uint32(1))
codec := encoding.VarCodec{Size: size}
enc.valbuf = make([]byte, size)
err := codec.EncodePVarBool(enc.valbuf, v)
if err != nil {
panic(err)
}
}
// SetStringValue encode string
func (enc *PrimitivePacketEncoder) SetStringValue(v string) {
enc.valbuf = []byte(v)
}
// SetBytesValue encode []byte
func (enc *PrimitivePacketEncoder) SetBytesValue(v []byte) {
enc.valbuf = v
}

100
primitive_packet.go Normal file
View File

@ -0,0 +1,100 @@
package mq_coder
import (
"git.hpds.cc/Component/mq_coder/encoding"
)
// 最小长度为2个字节
const primitivePacketBufferMinimalLength = 2
// PrimitivePacket describes primitive value type,
type PrimitivePacket struct {
*packet
}
// ToInt32 parse raw as int32 value
func (p *PrimitivePacket) ToInt32() (int32, error) {
var val int32
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeNVarInt32(p.packet.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToUInt32 parse raw as uint32 value
func (p *PrimitivePacket) ToUInt32() (uint32, error) {
var val uint32
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeNVarUInt32(p.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToInt64 parse raw as int64 value
func (p *PrimitivePacket) ToInt64() (int64, error) {
var val int64
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeNVarInt64(p.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToUInt64 parse raw as uint64 value
func (p *PrimitivePacket) ToUInt64() (uint64, error) {
var val uint64
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeNVarUInt64(p.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToFloat32 parse raw as float32 value
func (p *PrimitivePacket) ToFloat32() (float32, error) {
var val float32
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeVarFloat32(p.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToFloat64 parse raw as float64 value
func (p *PrimitivePacket) ToFloat64() (float64, error) {
var val float64
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeVarFloat64(p.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToBool parse raw as bool value
func (p *PrimitivePacket) ToBool() (bool, error) {
var val bool
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodePVarBool(p.valBuf, &val)
if err != nil {
return false, err
}
return val, nil
}
// ToUTF8String parse raw data as string value
func (p *PrimitivePacket) ToUTF8String() (string, error) {
return string(p.valBuf), nil
}
// ToBytes returns raw buffer data
func (p *PrimitivePacket) ToBytes() []byte {
return p.valBuf
}

View File

@ -1,26 +0,0 @@
package spec
import (
"errors"
"io"
)
const (
maxSeqId = 0x3F
flagBitNode = 0x80
wipeFlagBits = 0x3F
msb = 0x80
)
var (
errInvalidSeqId = errors.New("coder.Builder: SeqId should >= 0 and =< 0x3F")
)
func readByte(reader io.Reader) (byte, error) {
var b [1]byte
n, err := reader.Read(b[:])
if n == 0 {
return 0x00, err
}
return b[0], err
}

View File

@ -1,27 +0,0 @@
@startuml
namespace spec {
class L << (S,Aquamarine) >> {
- buf []byte
- size int
- len int
+ Bytes() []byte
+ Size() int
+ VSize() int
}
class T << (S,Aquamarine) >> {
+ Sid() int
+ Bytes() []byte
+ IsNodeMode() bool
+ SetNodeMode(flag bool)
+ Size() int
}
class spec.T << (T, #FF7700) >> {
}
}
"__builtin__.byte" #.. "spec.T"
@enduml

View File

@ -1,54 +0,0 @@
package spec
import (
"io"
)
// T is the Tag in a TLV structure
type T byte
// NewT returns a T with sequenceID. If this packet contains other
// packets, this packet will be a "node packet", the T of this packet
// will set MSB to T.
func NewT(seqId int) (T, error) {
if seqId < 0 || seqId > maxSeqId {
return 0, errInvalidSeqId
}
return T(seqId), nil
}
// Sid returns the sequenceId of this packet.
func (t T) Sid() int {
return int(t & wipeFlagBits)
}
// Bytes returns raw bytes of T.
func (t T) Bytes() []byte {
return []byte{byte(t)}
}
// IsNodeMode will return true if this packet contains other packets.
// Otherwise, return false.
func (t T) IsNodeMode() bool {
return t&flagBitNode == flagBitNode
}
// SetNodeMode will set T to indicates this packet contains
// other packets.
func (t *T) SetNodeMode(flag bool) {
if flag {
*t |= flagBitNode
}
}
// Size return the size of T raw bytes.
func (t T) Size() int {
return 1
}
// ReadT read T from a bufio.Reader
func ReadT(rd io.Reader) (T, error) {
b, err := readByte(rd)
return T(b), err
}

View File

@ -1,89 +0,0 @@
package spec
import (
"bytes"
"errors"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
)
// L is the Length in a TLV structure
type L struct {
buf []byte
size int
len int
}
// NewL will take an int type len as parameter and return L to
// represent the size of V in a TLV. an integer will be encoded as
// a PVarInt32 type to represent the value.
func NewL(len int) (L, error) {
var l = L{}
if len < -1 {
return l, errors.New("y3.L: len can't less than -1")
}
valLen := int32(len)
l.size = encoding.SizeOfPVarInt32(valLen)
codec := encoding.VarCodec{Size: l.size}
tmp := make([]byte, l.size)
err := codec.EncodePVarInt32(tmp, valLen)
if err != nil {
panic(err)
}
l.buf = make([]byte, l.size)
copy(l.buf, tmp)
l.len = len
return l, nil
}
// Bytes will return the raw bytes of L.
func (l L) Bytes() []byte {
return l.buf
}
// Size returns how many bytes used to represent this L.
func (l L) Size() int {
return l.size
}
// VSize returns the size of V.
func (l L) VSize() int {
return int(l.len)
}
// ReadL read L from bufio.Reader
func ReadL(r io.Reader) (*L, error) {
lenBuf := bytes.Buffer{}
for {
b, err := readByte(r)
if err != nil {
return nil, err
}
lenBuf.WriteByte(b)
if b&msb != msb {
break
}
}
buf := lenBuf.Bytes()
// decode to L
length, err := decodeL(buf)
if err != nil {
return nil, err
}
return &L{
buf: buf,
len: int(length),
size: len(buf),
}, nil
}
func decodeL(buf []byte) (length int32, err error) {
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(buf, &length)
return length, err
}

115
stream_decoder.go Normal file
View File

@ -0,0 +1,115 @@
package mq_coder
import (
"bytes"
"errors"
"fmt"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
)
// StreamReader read an coder packet from io.Reader, and return
// the ValReader after decode out Tag and Len
type StreamReader struct {
src io.Reader
// Tag of a coder packet
Tag byte
// Len of a coder packet
Len int
// Val of a coder packet
Val io.Reader
}
// NewStreamParser create a new coder StreamReader
func NewStreamParser(reader io.Reader) *StreamReader {
return &StreamReader{
src: reader,
}
}
func (sr *StreamReader) GetValBuffer() ([]byte, error) {
buf, err := io.ReadAll(sr.Val)
return buf, err
}
// Do must run in a goroutine
func (sr *StreamReader) Do() error {
if sr.src == nil {
return errors.New("coder: nil source reader")
}
tag, err := readByte(sr.src)
if err != nil {
return err
}
// the first byte is coder.Tag
sr.Tag = tag
// read coder.Length bytes, a varInt format
lenBuf := bytes.Buffer{}
for {
b, e := readByte(sr.src)
if e != nil {
return err
}
lenBuf.WriteByte(b)
if b&0x80 != 0x80 {
break
}
}
// parse to coder.Length
var length int32
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
if err != nil {
return err
}
// validate len decoded from stream
if length < 0 {
return fmt.Errorf("coder: streamParse() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
}
sr.Len = int(length)
// read next {len} bytes as coder.Value
sr.Val = &valR{
length: int(length),
src: sr.src,
}
return nil
}
type valR struct {
length int
off int
src io.Reader
}
func (r *valR) Read(p []byte) (n int, err error) {
if r.src == nil {
return 0, nil
}
if r.off >= r.length {
return 0, io.EOF
}
bound := len(p)
if len(p) > r.length-r.off {
bound = r.length - r.off
}
// update read
r.off, err = r.src.Read(p[0:bound])
return r.off, err
}
func StreamReadPacket(reader io.Reader) (*StreamReader, error) {
sp := NewStreamParser(reader)
err := sp.Do()
return sp, err
}

138
stream_encoder.go Normal file
View File

@ -0,0 +1,138 @@
package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
)
type StreamEncoder struct {
tag byte
buf *bytes.Buffer
pBuf *bytes.Buffer
len int
sLen int
reader *yR
}
func NewStreamEncoder(tag byte) *StreamEncoder {
var se = &StreamEncoder{
tag: tag,
buf: new(bytes.Buffer),
pBuf: new(bytes.Buffer),
}
return se
}
func (se *StreamEncoder) AddPacket(packet *PrimitivePacketEncoder) {
node := packet.Encode()
se.AddPacketBuffer(node)
}
func (se *StreamEncoder) AddPacketBuffer(buf []byte) {
se.pBuf.Write(buf)
se.growLen(len(se.pBuf.Bytes()))
}
func (se *StreamEncoder) AddStreamPacket(tag byte, length int, reader io.Reader) {
se.sLen = length
// s-Tag
se.pBuf.WriteByte(tag)
se.growLen(1)
// calculate s-Len
size := encoding.SizeOfPVarInt32(int32(length))
codec := encoding.VarCodec{Size: size}
tmp := make([]byte, size)
err := codec.EncodePVarInt32(tmp, int32(length))
if err != nil {
panic(err)
}
se.pBuf.Write(tmp)
se.growLen(size)
// total buf
se.buf.WriteByte(se.tag)
se.growLen(length)
// calculate total Len buf
size = encoding.SizeOfPVarInt32(int32(se.len))
codec = encoding.VarCodec{Size: size}
tmp = make([]byte, size)
err = codec.EncodePVarInt32(tmp, int32(se.len))
if err != nil {
panic(err)
}
se.buf.Write(tmp) //lenBuf
se.buf.Write(se.pBuf.Bytes())
se.growLen(size) // total length
se.growLen(1) // parent tag
se.reader = &yR{
buf: se.buf,
src: reader,
length: se.len,
sLen: se.sLen,
}
}
func (se *StreamEncoder) GetReader() io.Reader {
if se.reader != nil {
return se.reader
}
return new(bytes.Buffer)
}
// Pipe can pipe data to os.StdOut
func (se *StreamEncoder) Pipe(writer io.Writer) {
}
func (se *StreamEncoder) GetLen() int {
if se.reader != nil {
return se.len
}
return 0
}
func (se *StreamEncoder) growLen(step int) {
se.len += step
}
type yR struct {
src io.Reader
buf *bytes.Buffer
length int
off int
sLen int
}
func (r *yR) Read(p []byte) (n int, err error) {
if r.src == nil {
return 0, nil
}
if r.off >= r.length {
return 0, io.EOF
}
if r.off < r.length-r.sLen {
n, err = r.buf.Read(p)
r.off += n
if err != nil {
if err == io.EOF {
return n, nil
} else {
return 0, err
}
}
return n, nil
} else {
n, err = r.src.Read(p)
r.off += n
if err != nil {
return n, err
}
return n, nil
}
}

38
tag.go Normal file
View File

@ -0,0 +1,38 @@
package mq_coder
import (
"git.hpds.cc/Component/mq_coder/utils"
)
// Tag represents the Tag of TLV,
// MSB used to represent the packet type, 0x80 means a node packet, otherwise is a primitive packet.
// Low 7 bits represent Sequence ID, like `key` in JSON format
type Tag struct {
raw byte
}
// IsNode returns true is MSB is 1.
func (t *Tag) IsNode() bool {
return t.raw&utils.MSB == utils.MSB
}
// IsSlice determine if the current node is a Slice
func (t *Tag) IsSlice() bool {
return t.raw&utils.SliceFlag == utils.SliceFlag
}
// SeqId get the sequence ID, as key in JSON format
func (t *Tag) SeqId() byte {
//return t.raw & packet utils.DropMSB
return t.raw & utils.DropMSBArrayFlag
}
// NewTag create a NodePacket Tag field
func NewTag(b byte) *Tag {
return &Tag{raw: b}
}
// Raw return the original byte
func (t *Tag) Raw() byte {
return t.raw
}

18
utils/common.go Normal file
View File

@ -0,0 +1,18 @@
package utils
// MSB is `1000 0000` describes this is a node packet, otherwise, is a primitive packet
const MSB byte = 0x80
// DropMSB is `0111 1111`, used to remove MSB flag bit
const DropMSB byte = 0x3F
// SliceFlag is `0100 0000`, describes this packet is a Slice type
const SliceFlag byte = 0x40
// DropMSBArrayFlag is `0011 1111`, used to remove MSB and Slice flag bit
const DropMSBArrayFlag byte = 0x3F
// IsNodePacket returns true if the tag represents a node package
func IsNodePacket(tag byte) bool {
return tag&MSB == MSB
}