Compare commits

..

No commits in common. "master" and "v1.4" have entirely different histories.
master ... v1.4

23 changed files with 789 additions and 925 deletions

72
chunkedVReader.go Normal file
View File

@ -0,0 +1,72 @@
package mq_coder
import (
"bytes"
"io"
"io/ioutil"
)
type chunkVReader struct {
src io.Reader // the reader parts of V
buf *bytes.Buffer // the bytes part of V
totalSize int // size of whole buffer of this packet
off int // last read op
ChunkVSize int // the size of chunked V
}
// Read implement io.Reader interface
func (r *chunkVReader) Read(p []byte) (n int, err error) {
if r.src == nil {
return 0, nil
}
if r.off >= r.totalSize {
return 0, io.EOF
}
if r.off < r.totalSize-r.ChunkVSize {
n, err := r.buf.Read(p)
r.off += n
if err != nil {
if err == io.EOF {
return n, nil
} else {
return 0, err
}
}
return n, nil
}
n, err = r.src.Read(p)
r.off += n
if err != nil {
return n, err
}
return n, nil
}
// WriteTo implement io.WriteTo interface
func (r *chunkVReader) WriteTo(w io.Writer) (n int64, err error) {
if r.src == nil {
return 0, nil
}
// first, write existed buffer
m, err := w.Write(r.buf.Bytes())
if err != nil {
return 0, err
}
n += int64(m)
// last, write from reader
buf, err := ioutil.ReadAll(r.src)
if err != nil && err != io.EOF {
return 0, errWriteFromReader
}
m, err = w.Write(buf)
if err != nil {
return 0, err
}
n += int64(m)
return n, nil
}

54
coder.go Normal file
View File

@ -0,0 +1,54 @@
package mq_coder
import (
"errors"
"io"
)
var (
errBuildIncomplete = errors.New("coder.Encoder: invalid structure of packet")
errInvalidAdding = errors.New("coder.Encoder: can not add this Packet after StreamPacket has been add")
errNonStreamPacket = errors.New("coder.Packet: this packet is not in node mode")
errWriteFromReader = errors.New("coder.streamV: write from reader error")
errNotNodeMode = errors.New("coder.Encoder: packet should be in node mode can be add other packets as child")
errNilReader = errors.New("coder.Decoder: nil source reader")
)
// Packet 编解码器包
type Packet interface {
// SeqId 返回此数据包的序列Id.
SeqId() int
// Size 返回整个数据包的大小
Size() int
// VSize 返回V的大小.
VSize() int
// Bytes 此数据包的全部字节.
Bytes() []byte
// Reader 返回 io.Reader 字节.
Reader() io.Reader
// VReader 返回 io.Reader 的 流字节.
VReader() io.Reader
// IsStreamMode 是否流模式
IsStreamMode() bool
// IsNodeMode 是否节点模式
IsNodeMode() bool
// BytesV 返回流字节
BytesV() []byte
// Utf8StringV 返回流的utf8字符串值
Utf8StringV() string
// Int32V 返回流的int32值
Int32V() (val int32, err error)
// UInt32V 返回流的uint32值
UInt32V() (val uint32, err error)
// Int64V 返回流的int64值
Int64V() (val int64, err error)
// UInt64V 返回流的uint64值
UInt64V() (val uint64, err error)
// Float32V 返回流的float32值
Float32V() (val float32, err error)
// Float64V 返回流的float64值
Float64V() (val float64, err error)
// BoolV 返回流的bool值
BoolV() (val bool, err error)
}

93
decoder.go Normal file
View File

@ -0,0 +1,93 @@
package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/spec"
)
// Decoder is the tool for decoding packet from stream
type Decoder struct {
tag spec.T
len *spec.L
rd io.Reader
}
// NewDecoder returns a Decoder from an io.Reader
func NewDecoder(reader io.Reader) *Decoder {
return &Decoder{
rd: reader,
}
}
// SeqId return the SequenceID of the decoding packet
func (d *Decoder) SeqId() int {
return d.tag.Sid()
}
// UnderlyingReader returns the reader this decoder using
func (d *Decoder) UnderlyingReader() io.Reader {
return d.rd
}
// ReadHeader will block until io.EOF or receive T and L of a packet.
func (d *Decoder) ReadHeader() error {
// only read T and L
return d.readTL()
}
// GetChunkedPacket will block until io.EOF or receive V of a packet in chunked mode.
func (d *Decoder) GetChunkedPacket() Packet {
return &StreamPacket{
t: d.tag,
l: *d.len,
vr: d.rd,
chunkMode: true,
chunkSize: d.len.VSize(),
}
}
// GetFullFilledPacket read full Packet from given io.Reader
func (d *Decoder) GetFullFilledPacket() (packet Packet, err error) {
// read V
buf := new(bytes.Buffer)
total := 0
for {
valBuf := make([]byte, d.len.VSize())
n, err := d.rd.Read(valBuf)
if n > 0 {
total += n
buf.Write(valBuf[:n])
}
if total >= d.len.VSize() || err != nil {
break
}
}
packet = &StreamPacket{
t: d.tag,
l: *d.len,
vBuf: buf.Bytes(),
chunkMode: false,
}
return packet, nil
}
func (d *Decoder) readTL() (err error) {
if d.rd == nil {
return errNilReader
}
// read T
d.tag, err = spec.ReadT(d.rd)
if err != nil {
return err
}
// read L
d.len, err = spec.ReadL(d.rd)
return err
}

View File

@ -2,76 +2,157 @@ package mq_coder
import (
"bytes"
"fmt"
"git.hpds.cc/Component/mq_coder/encoding"
"io"
"git.hpds.cc/Component/mq_coder/spec"
)
// Encoder will encode object to encoding
type encoder struct {
seqID byte
valBuf []byte
isNode bool
isArray bool
buf *bytes.Buffer
complete bool
// Encoder is the tool for creating a packet easily
type Encoder struct {
tag spec.T
len *spec.L
valReader io.Reader
valReaderSize int
nodes map[int]Packet
state int
size int32 // size of value
isStreamMode bool
valBuf *bytes.Buffer
done bool
seqId int
isNodeMode bool
}
type iEncoder interface {
Encode() []byte
// SetSeqId set sequenceId of a packet, if this packet contains other
// packets, isNode should set to true
func (b *Encoder) SetSeqId(seqId int, isNode bool) {
// init
b.valBuf = new(bytes.Buffer)
b.nodes = make(map[int]Packet)
// set seqId
b.seqId = seqId
b.isNodeMode = isNode
}
func (enc *encoder) GetValBuf() []byte {
return enc.valBuf
// SetBytesV set bytes type as V
func (b *Encoder) SetBytesV(buf []byte) {
b.size += int32(len(buf))
b.valBuf.Write(buf)
b.isStreamMode = false
b.state |= 0x04
}
func (enc *encoder) IsEmpty() bool {
return len(enc.valBuf) == 0
// SetReaderV set io.Reader type as V
func (b *Encoder) SetReaderV(r io.Reader, size int) {
b.isStreamMode = true
b.valReader = r
b.state |= 0x04
b.size += int32(size)
b.valReaderSize = size
}
func (enc *encoder) AddBytes(buf []byte) {
enc.valBuf = append(enc.valBuf, buf...)
}
func (enc *encoder) addRawPacket(en iEncoder) {
enc.valBuf = append(enc.valBuf, en.Encode()...)
}
// setTag write tag as seqID
func (enc *encoder) writeTag() {
if enc.seqID > 0x3F {
panic(fmt.Errorf("sid should be in [0..0x3F]"))
// AddPacket add a Packet child to this packet, this packet must be NodeMode
func (b *Encoder) AddPacket(child Packet) error {
// only packet is in node mode can add other packets
if !b.isNodeMode {
return errNotNodeMode
}
if enc.isNode {
enc.seqID = enc.seqID | 0x80
if b.done {
return errInvalidAdding
}
if enc.isArray {
enc.seqID = enc.seqID | 0x40
}
enc.buf.WriteByte(enc.seqID)
b.nodes[child.SeqId()] = child
buf := child.Bytes()
b.SetBytesV(buf)
return nil
}
func (enc *encoder) writeLengthBuf() {
valLen := len(enc.valBuf)
size := encoding.SizeOfPVarInt32(int32(valLen))
codec := encoding.VarCodec{Size: size}
tmp := make([]byte, size)
err := codec.EncodePVarInt32(tmp, int32(valLen))
// AddStreamPacket will put a StreamPacket in chunked mode to current packet.
func (b *Encoder) AddStreamPacket(child Packet) (err error) {
// if this packet is in stream mode, can not add any packets
if b.done {
return errInvalidAdding
}
// only accept packet in stream mode
if !child.IsStreamMode() {
return errNonStreamPacket
}
// set the valReader of this packet to the child's
b.valReader = child.VReader()
// valReaderSize will be the same as child's
b.valReaderSize = child.VSize()
// add this child packet
b.nodes[child.SeqId()] = child
// add the size of child's V to L of this packet
b.size += int32(child.Size())
// put the bytes of child to valBuf
buf := child.Bytes()
b.valBuf.Write(buf)
// update state
b.state |= 0x04
b.isStreamMode = true
b.done = true
return nil
}
// Packet return a Packet instance.
func (b *Encoder) Packet() (Packet, error) {
err := b.generateT()
if err != nil {
panic(err)
return nil, err
}
enc.buf.Write(tmp)
err = b.generateL()
if err != nil {
return nil, err
}
if b.state != 0x07 {
return nil, errBuildIncomplete
}
if b.isStreamMode {
return &StreamPacket{
t: b.tag,
l: *b.len,
vr: b.valReader,
vBuf: b.valBuf.Bytes(),
chunkMode: true,
chunkSize: b.valReaderSize,
}, err
}
// not streaming mode
return &StreamPacket{
t: b.tag,
l: *b.len,
vBuf: b.valBuf.Bytes(),
chunkMode: false,
}, err
}
// Encode returns a final Y3 encoded byte slice
func (enc *encoder) Encode() []byte {
if !enc.complete {
// Tag
enc.writeTag()
// Len
enc.writeLengthBuf()
// Val
enc.buf.Write(enc.valBuf)
enc.complete = true
// will generate T of a TLV.
func (b *Encoder) generateT() error {
t, err := spec.NewT(b.seqId)
t.SetNodeMode(b.isNodeMode)
if err != nil {
return err
}
return enc.buf.Bytes()
b.tag = t
b.state |= 0x01
return nil
}
// will generate L of a TLV.
func (b *Encoder) generateL() error {
l, err := spec.NewL(int(b.size))
if err != nil {
return err
}
b.len = &l
b.state |= 0x02
return nil
}

98
encoder_sugar.go Normal file
View File

@ -0,0 +1,98 @@
package mq_coder
import (
"git.hpds.cc/Component/mq_coder/encoding"
)
// SetUtf8StringV set utf-8 string type value as V
func (b *Encoder) SetUtf8StringV(v string) {
buf := []byte(v)
b.SetBytesV(buf)
}
// SetInt32V set an int32 type value as V
func (b *Encoder) SetInt32V(v int32) error {
size := encoding.SizeOfNVarInt32(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarInt32(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetUInt32V set an uint32 type value as V
func (b *Encoder) SetUInt32V(v uint32) error {
size := encoding.SizeOfNVarUInt32(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarUInt32(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetInt64V set an int64 type value as V
func (b *Encoder) SetInt64V(v int64) error {
size := encoding.SizeOfNVarInt64(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarInt64(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetUInt64V set an uint64 type value as V
func (b *Encoder) SetUInt64V(v uint64) error {
size := encoding.SizeOfNVarUInt64(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarUInt64(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetFloat32V set a float32 type value as V
func (b *Encoder) SetFloat32V(v float32) error {
size := encoding.SizeOfVarFloat32(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeVarFloat32(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetFloat64V set a float64 type value as V
func (b *Encoder) SetFloat64V(v float64) error {
size := encoding.SizeOfVarFloat64(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeVarFloat64(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetBoolV set bool type value as V
func (b *Encoder) SetBoolV(v bool) {
var size = encoding.SizeOfPVarUInt32(uint32(1))
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
_ = codec.EncodePVarBool(buf, v)
b.SetBytesV(buf)
}

View File

@ -60,7 +60,7 @@ func (codec *VarCodec) encodeVarFloat(buffer []byte, bits uint64, width int) err
return errors.New("nothing to encode")
}
const unit = 8 // the bit width of encoding unit
const unit = 8 // bit width of encoding unit
var gap, mask = codec.sizeOfGap(width)
for (codec.Size & mask) > 0 {
@ -81,7 +81,7 @@ func (codec *VarCodec) decodeVarFloat(buffer []byte, bits *uint64, width int) er
return errors.New("nothing to decode")
}
const unit = 8 // the bit width of encoding unit
const unit = 8 // bit width of encoding unit
var gap, mask = codec.sizeOfGap(width)
for (codec.Size & mask) > 0 {

2
go.mod
View File

@ -1,6 +1,6 @@
module git.hpds.cc/Component/mq_coder
go 1.19
go 1.18
require github.com/stretchr/testify v1.8.0

View File

@ -1,96 +0,0 @@
package mq_coder
import (
"bytes"
"errors"
"git.hpds.cc/Component/mq_coder/encoding"
"git.hpds.cc/Component/mq_coder/utils"
)
func parsePayload(b []byte) (consumedBytes int, ifNodePacket bool, np *NodePacket, pp *PrimitivePacket, err error) {
if len(b) == 0 {
return 0, false, nil, nil, errors.New("parsePacket params can not be nil")
}
pos := 0
// NodePacket
if ok := utils.IsNodePacket(b[pos]); ok {
np = &NodePacket{}
endPos, err := DecodeToNodePacket(b, np)
return endPos, true, np, nil, err
}
pp = &PrimitivePacket{}
state, err := DecodeToPrimitivePacket(b, pp)
return state.ConsumedBytes, false, nil, pp, err
}
// DecodeToNodePacket parse out whole buffer to a NodePacket
func DecodeToNodePacket(buf []byte, pct *NodePacket) (consumedBytes int, err error) {
if len(buf) == 0 {
return 0, errors.New("empty buf")
}
pct.packet = &packet{
valBuf: buf,
buf: &bytes.Buffer{},
}
pct.NodePackets = map[byte]NodePacket{}
pct.PrimitivePackets = map[byte]PrimitivePacket{}
pos := 0
// `Tag`
tag := NewTag(buf[pos])
pct.packet.tag = tag
pct.buf.WriteByte(buf[pos])
pos++
// `Length`: the type is `varInt`
tmpBuf := buf[pos:]
var valLen int32
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(tmpBuf, &valLen)
if err != nil {
return 0, err
}
pct.packet.length = int(valLen)
pct.buf.Write(buf[pos : pos+codec.Size])
pos += codec.Size
// if `Length` is 0, means empty node packet
if valLen == 0 {
return pos, nil
}
// `Value`
// `raw` is pct.Length() length
vl := int(valLen)
if vl < 0 {
return pos, errors.New("found L of V smaller than 0")
}
endPos := pos + vl
pct.packet.valBuf = buf[pos:endPos]
pct.buf.Write(buf[pos:endPos])
// Parse value to Packet
for {
if pos >= endPos || pos >= len(buf) {
break
}
_p, isNode, np, pp, err := parsePayload(buf[pos:endPos])
pos += _p
if err != nil {
return 0, err
}
if isNode {
pct.NodePackets[np.packet.tag.SeqId()] = *np
} else {
pct.PrimitivePackets[pp.SeqId()] = *pp
}
}
consumedBytes = endPos
return consumedBytes, nil
}

View File

@ -1,33 +0,0 @@
package mq_coder
import (
"bytes"
)
// NodePacketEncoder used for encode a node packet
type NodePacketEncoder struct {
*encoder
}
// NewNodePacketEncoder returns an Encoder for node packet
func NewNodePacketEncoder(sid byte) *NodePacketEncoder {
nodeEnc := &NodePacketEncoder{
encoder: &encoder{
isNode: true,
buf: new(bytes.Buffer),
},
}
nodeEnc.seqID = sid
return nodeEnc
}
// AddNodePacket add new node to this node
func (enc *NodePacketEncoder) AddNodePacket(np *NodePacketEncoder) {
enc.addRawPacket(np)
}
// AddPrimitivePacket add new primitive to this node
func (enc *NodePacketEncoder) AddPrimitivePacket(np *PrimitivePacketEncoder) {
enc.addRawPacket(np)
}

View File

@ -1,10 +0,0 @@
package mq_coder
// NodePacket describes complex values
type NodePacket struct {
*packet
// NodePackets store all the node packets
NodePackets map[byte]NodePacket
// PrimitivePackets store all the primitive packets
PrimitivePackets map[byte]PrimitivePacket
}

160
packet.go
View File

@ -2,37 +2,155 @@ package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
"git.hpds.cc/Component/mq_coder/spec"
)
// packet is the base type of the NodePacket and PrimitivePacket
type packet struct {
tag *Tag
length int
valBuf []byte
buf *bytes.Buffer
// StreamPacket implement the Packet interface.
type StreamPacket struct {
t spec.T
l spec.L
vBuf []byte
vr io.Reader
chunkMode bool
chunkSize int
}
// GetRawBytes get all raw bytes of this packet
func (bp *packet) GetRawBytes() []byte {
return bp.buf.Bytes()
var _ Packet = &StreamPacket{}
// SeqId returns the sequenceId of this packet
func (p *StreamPacket) SeqId() int { return p.t.Sid() }
// Size returns the size of whole packet.
func (p *StreamPacket) Size() int {
// T.Size + L.Size + V.Size
return p.t.Size() + p.l.Size() + p.l.VSize()
}
// Length return the length of Val this packet
func (bp *packet) Length() int {
return bp.length
// VSize returns the size of V.
func (p *StreamPacket) VSize() int { return p.l.VSize() }
// Bytes return the raw bytes of this packet. V will be absent if
// is in chunked mode
func (p *StreamPacket) Bytes() []byte {
buf := new(bytes.Buffer)
// the raw bytes of T and L
p.writeTL(buf)
// p.valBuf stores the raw bytes of V
buf.Write(p.vBuf)
return buf.Bytes()
}
// SeqId returns Tag of this packet
func (bp *packet) SeqId() byte {
return bp.tag.SeqId()
// VReader return an io.Reader which can be read as the content of V.
func (p *StreamPacket) VReader() io.Reader {
if !p.chunkMode {
return bytes.NewReader(p.vBuf)
}
return p.vr
}
// IsSlice determine if the current node is a Slice
func (bp *packet) IsSlice() bool {
return bp.tag.IsSlice()
// Reader return an io.Reader which can be read as the whole bytes of
// this packet. This function only available if this V of packet is in
// chunked mode.
func (p *StreamPacket) Reader() io.Reader {
if !p.chunkMode {
buf := new(bytes.Buffer)
buf.Write(p.t.Bytes())
buf.Write(p.l.Bytes())
buf.Write(p.vBuf)
return buf
}
buf := new(bytes.Buffer)
// T and L of this packet
p.writeTL(buf)
// V of this packet
buf.Write(p.vBuf)
return &chunkVReader{
buf: buf,
src: p.vr,
totalSize: p.Size(),
ChunkVSize: p.VSize(),
}
}
// GetValBuf get raw buffer of Val of this packet
func (bp *packet) GetValBuf() []byte {
return bp.valBuf
// IsStreamMode returns a bool value indicates if the V of
// this packet is in stream mode
func (p *StreamPacket) IsStreamMode() bool {
return p.chunkMode
}
// IsNodeMode returns a bool value indicates if this packet
// is node mode
func (p *StreamPacket) IsNodeMode() bool {
return p.t.IsNodeMode()
}
// write the raw bytes of T and L to given buf
func (p *StreamPacket) writeTL(buf *bytes.Buffer) {
buf.Write(p.t.Bytes())
buf.Write(p.l.Bytes())
}
// BytesV return V as bytes
func (p *StreamPacket) BytesV() []byte {
return p.vBuf
}
// UTF8StringV return V as utf-8 string
func (p *StreamPacket) Utf8StringV() string {
return string(p.vBuf)
}
// Int32V return V as int32
func (p *StreamPacket) Int32V() (val int32, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeNVarInt32(p.vBuf, &val)
return val, err
}
// UInt32V return V as uint32
func (p *StreamPacket) UInt32V() (val uint32, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeNVarUInt32(p.vBuf, &val)
return val, err
}
// Int64V return V as int64
func (p *StreamPacket) Int64V() (val int64, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeNVarInt64(p.vBuf, &val)
return val, err
}
// UInt64V return V as uint64
func (p *StreamPacket) UInt64V() (val uint64, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeNVarUInt64(p.vBuf, &val)
return val, err
}
// Float32V return V as float32
func (p *StreamPacket) Float32V() (val float32, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeVarFloat32(p.vBuf, &val)
return val, err
}
// Float64V return V as float64
func (p *StreamPacket) Float64V() (val float64, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodeVarFloat64(p.vBuf, &val)
return val, err
}
// BoolV return V as bool
func (p *StreamPacket) BoolV() (val bool, err error) {
codec := encoding.VarCodec{Size: len(p.vBuf)}
err = codec.DecodePVarBool(p.vBuf, &val)
return val, err
}

100
parser.go
View File

@ -1,100 +0,0 @@
package mq_coder
import (
"bytes"
"errors"
"fmt"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
)
var (
ErrMalformed = errors.New("coder.ReadPacket: malformed")
)
// ReadPacket will try to read encoded packet from the reader
func ReadPacket(reader io.Reader) ([]byte, error) {
tag, err := readByte(reader)
if err != nil {
return nil, err
}
// buf will contain a complete encoded handshakeFrame
buf := bytes.Buffer{}
// the first byte is coder.Tag
// write coder.Tag bytes
buf.WriteByte(tag)
// read y3.Length bytes, a varInt format
lenBuf := bytes.Buffer{}
for {
b, e := readByte(reader)
if e != nil {
return nil, err
}
lenBuf.WriteByte(b)
if b&0x80 != 0x80 {
break
}
}
// parse to coder.Length
var length int32
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
if err != nil {
return nil, ErrMalformed
}
// validate len decoded from stream
if length < 0 {
return nil, fmt.Errorf("coder.ReadPacket() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
}
// write coder.Length bytes
buf.Write(lenBuf.Bytes())
// read next {len} bytes as coder.Value
valBuf := bytes.Buffer{}
// every batch read 512 bytes, if next reads < 512, read
var count int
for {
batchReadSize := 1024 * 1024
var tmpBuf []byte
if int(length)-count < batchReadSize {
tmpBuf = make([]byte, int(length)-count)
} else {
tmpBuf = make([]byte, batchReadSize)
}
p, e := reader.Read(tmpBuf)
count += p
if e != nil {
if e == io.EOF {
valBuf.Write(tmpBuf[:p])
break
}
return nil, fmt.Errorf("y3 parse valBuf error: %v", err)
}
valBuf.Write(tmpBuf[:p])
if count == int(length) {
break
}
}
if count < int(length) {
// return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count)
return nil, ErrMalformed
}
// write coder.Value bytes
buf.Write(valBuf.Bytes())
return buf.Bytes(), nil
}
func readByte(reader io.Reader) (byte, error) {
var b [1]byte
_, err := reader.Read(b[:])
return b[0], err
}

View File

@ -1,88 +0,0 @@
package mq_coder
import (
"bytes"
"errors"
"fmt"
"git.hpds.cc/Component/mq_coder/encoding"
)
// DecodeState represents the state of decoding
type DecodeState struct {
// ConsumedBytes is the bytes consumed by decoder
ConsumedBytes int
// SizeL is the bytes length of value
SizeL int
}
// DecodeToPrimitivePacket parse out whole buffer to a PrimitivePacket
//
// Examples:
// [0x01, 0x01, 0x01] -> Key=0x01, Value=0x01
// [0x41, 0x06, 0x03, 0x01, 0x61, 0x04, 0x01, 0x62] -> key=0x03, value=0x61; key=0x04, value=0x62
func DecodeToPrimitivePacket(buf []byte, p *PrimitivePacket) (*DecodeState, error) {
decoder := &DecodeState{
ConsumedBytes: 0,
SizeL: 0,
}
if buf == nil || len(buf) < primitivePacketBufferMinimalLength {
return decoder, errors.New("invalid packet minimal size")
}
p.packet = &packet{
valBuf: []byte{},
buf: &bytes.Buffer{},
}
var pos = 0
// first byte is `Tag`
p.tag = NewTag(buf[pos])
p.buf.WriteByte(buf[pos])
pos++
decoder.ConsumedBytes = pos
// read `Variant` from buf for `Length of value`
tmpBuf := buf[pos:]
var bufLen int32
codec := encoding.VarCodec{}
err := codec.DecodePVarInt32(tmpBuf, &bufLen)
if err != nil {
return decoder, err
}
if codec.Size < 1 {
return decoder, errors.New("malformed, size of Length can not smaller than 1")
}
// codec.Size describes how many bytes used to represent `Length`
p.buf.Write(buf[pos : pos+codec.Size])
pos += codec.Size
decoder.ConsumedBytes = pos
decoder.SizeL = codec.Size
// if length<0, error on decoding
if bufLen < 0 {
return decoder, errors.New("invalid packet, negative length")
}
// the length of value
p.length = int(bufLen)
if p.length == 0 {
p.valBuf = nil
return decoder, nil
}
// the next `p.length` bytes store value
endPos := pos + p.length
if pos > endPos || endPos > len(buf) || pos > len(buf) {
return decoder, fmt.Errorf("beyond the boundary, pos=%v, endPos=%v", pos, endPos)
}
p.valBuf = buf[pos:endPos]
p.buf.Write(buf[pos:endPos])
decoder.ConsumedBytes = endPos
return decoder, nil
}

View File

@ -1,112 +0,0 @@
package mq_coder
import (
"bytes"
"git.hpds.cc/Component/mq_coder/encoding"
)
// PrimitivePacketEncoder used for encode a primitive packet
type PrimitivePacketEncoder struct {
*encoder
}
// NewPrimitivePacketEncoder return an Encoder for primitive packet
func NewPrimitivePacketEncoder(sid byte) *PrimitivePacketEncoder {
prim := &PrimitivePacketEncoder{
encoder: &encoder{
isNode: false,
buf: new(bytes.Buffer),
},
}
prim.seqID = sid
return prim
}
// SetInt32Value encode int32 value
func (enc *PrimitivePacketEncoder) SetInt32Value(v int32) {
size := encoding.SizeOfNVarInt32(v)
codec := encoding.VarCodec{Size: size}
enc.valBuf = make([]byte, size)
err := codec.EncodeNVarInt32(enc.valBuf, v)
if err != nil {
panic(err)
}
}
// SetUInt32Value encode uint32 value
func (enc *PrimitivePacketEncoder) SetUInt32Value(v uint32) {
size := encoding.SizeOfNVarUInt32(v)
codec := encoding.VarCodec{Size: size}
enc.valBuf = make([]byte, size)
err := codec.EncodeNVarUInt32(enc.valBuf, v)
if err != nil {
panic(err)
}
}
// SetInt64Value encode int64 value
func (enc *PrimitivePacketEncoder) SetInt64Value(v int64) {
size := encoding.SizeOfNVarInt64(v)
codec := encoding.VarCodec{Size: size}
enc.valBuf = make([]byte, size)
err := codec.EncodeNVarInt64(enc.valBuf, v)
if err != nil {
panic(err)
}
}
// SetUInt64Value encode uint64 value
func (enc *PrimitivePacketEncoder) SetUInt64Value(v uint64) {
size := encoding.SizeOfNVarUInt64(v)
codec := encoding.VarCodec{Size: size}
enc.valBuf = make([]byte, size)
err := codec.EncodeNVarUInt64(enc.valBuf, v)
if err != nil {
panic(err)
}
}
// SetFloat32Value encode float32 value
func (enc *PrimitivePacketEncoder) SetFloat32Value(v float32) {
var size = encoding.SizeOfVarFloat32(v)
codec := encoding.VarCodec{Size: size}
enc.valBuf = make([]byte, size)
err := codec.EncodeVarFloat32(enc.valBuf, v)
if err != nil {
panic(err)
}
}
// SetFloat64Value encode float64 value
func (enc *PrimitivePacketEncoder) SetFloat64Value(v float64) {
var size = encoding.SizeOfVarFloat64(v)
codec := encoding.VarCodec{Size: size}
enc.valBuf = make([]byte, size)
err := codec.EncodeVarFloat64(enc.valBuf, v)
if err != nil {
panic(err)
}
}
// SetBoolValue encode bool value
func (enc *PrimitivePacketEncoder) SetBoolValue(v bool) {
var size = encoding.SizeOfPVarUInt32(uint32(1))
codec := encoding.VarCodec{Size: size}
enc.valBuf = make([]byte, size)
err := codec.EncodePVarBool(enc.valBuf, v)
if err != nil {
panic(err)
}
}
// SetStringValue encode string
func (enc *PrimitivePacketEncoder) SetStringValue(v string) {
enc.valBuf = []byte(v)
}
// SetBytesValue encode []byte
func (enc *PrimitivePacketEncoder) SetBytesValue(v []byte) {
enc.valBuf = v
}

View File

@ -1,100 +0,0 @@
package mq_coder
import (
"git.hpds.cc/Component/mq_coder/encoding"
)
// 最小长度为2个字节
const primitivePacketBufferMinimalLength = 2
// PrimitivePacket describes primitive value type,
type PrimitivePacket struct {
*packet
}
// ToInt32 parse raw as int32 value
func (p *PrimitivePacket) ToInt32() (int32, error) {
var val int32
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeNVarInt32(p.packet.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToUInt32 parse raw as uint32 value
func (p *PrimitivePacket) ToUInt32() (uint32, error) {
var val uint32
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeNVarUInt32(p.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToInt64 parse raw as int64 value
func (p *PrimitivePacket) ToInt64() (int64, error) {
var val int64
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeNVarInt64(p.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToUInt64 parse raw as uint64 value
func (p *PrimitivePacket) ToUInt64() (uint64, error) {
var val uint64
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeNVarUInt64(p.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToFloat32 parse raw as float32 value
func (p *PrimitivePacket) ToFloat32() (float32, error) {
var val float32
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeVarFloat32(p.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToFloat64 parse raw as float64 value
func (p *PrimitivePacket) ToFloat64() (float64, error) {
var val float64
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodeVarFloat64(p.valBuf, &val)
if err != nil {
return 0, err
}
return val, nil
}
// ToBool parse raw as bool value
func (p *PrimitivePacket) ToBool() (bool, error) {
var val bool
codec := encoding.VarCodec{Size: len(p.valBuf)}
err := codec.DecodePVarBool(p.valBuf, &val)
if err != nil {
return false, err
}
return val, nil
}
// ToUTF8String parse raw data as string value
func (p *PrimitivePacket) ToUTF8String() (string, error) {
return string(p.valBuf), nil
}
// ToBytes returns raw buffer data
func (p *PrimitivePacket) ToBytes() []byte {
return p.valBuf
}

26
spec/spec.go Normal file
View File

@ -0,0 +1,26 @@
package spec
import (
"errors"
"io"
)
const (
maxSeqId = 0x3F
flagBitNode = 0x80
wipeFlagBits = 0x3F
msb = 0x80
)
var (
errInvalidSeqId = errors.New("coder.Builder: SeqId should >= 0 and =< 0x3F")
)
func readByte(reader io.Reader) (byte, error) {
var b [1]byte
n, err := reader.Read(b[:])
if n == 0 {
return 0x00, err
}
return b[0], err
}

27
spec/spec.puml Normal file
View File

@ -0,0 +1,27 @@
@startuml
namespace spec {
class L << (S,Aquamarine) >> {
- buf []byte
- size int
- len int
+ Bytes() []byte
+ Size() int
+ VSize() int
}
class T << (S,Aquamarine) >> {
+ Sid() int
+ Bytes() []byte
+ IsNodeMode() bool
+ SetNodeMode(flag bool)
+ Size() int
}
class spec.T << (T, #FF7700) >> {
}
}
"__builtin__.byte" #.. "spec.T"
@enduml

54
spec/tlv.t.go Normal file
View File

@ -0,0 +1,54 @@
package spec
import (
"io"
)
// T is the Tag in a TLV structure
type T byte
// NewT returns a T with sequenceID. If this packet contains other
// packets, this packet will be a "node packet", the T of this packet
// will set MSB to T.
func NewT(seqId int) (T, error) {
if seqId < 0 || seqId > maxSeqId {
return 0, errInvalidSeqId
}
return T(seqId), nil
}
// Sid returns the sequenceId of this packet.
func (t T) Sid() int {
return int(t & wipeFlagBits)
}
// Bytes returns raw bytes of T.
func (t T) Bytes() []byte {
return []byte{byte(t)}
}
// IsNodeMode will return true if this packet contains other packets.
// Otherwise, return false.
func (t T) IsNodeMode() bool {
return t&flagBitNode == flagBitNode
}
// SetNodeMode will set T to indicates this packet contains
// other packets.
func (t *T) SetNodeMode(flag bool) {
if flag {
*t |= flagBitNode
}
}
// Size return the size of T raw bytes.
func (t T) Size() int {
return 1
}
// ReadT read T from a bufio.Reader
func ReadT(rd io.Reader) (T, error) {
b, err := readByte(rd)
return T(b), err
}

89
spec/tvl.l.go Normal file
View File

@ -0,0 +1,89 @@
package spec
import (
"bytes"
"errors"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
)
// L is the Length in a TLV structure
type L struct {
buf []byte
size int
len int
}
// NewL will take an int type len as parameter and return L to
// represent the size of V in a TLV. an integer will be encoded as
// a PVarInt32 type to represent the value.
func NewL(len int) (L, error) {
var l = L{}
if len < -1 {
return l, errors.New("y3.L: len can't less than -1")
}
valLen := int32(len)
l.size = encoding.SizeOfPVarInt32(valLen)
codec := encoding.VarCodec{Size: l.size}
tmp := make([]byte, l.size)
err := codec.EncodePVarInt32(tmp, valLen)
if err != nil {
panic(err)
}
l.buf = make([]byte, l.size)
copy(l.buf, tmp)
l.len = len
return l, nil
}
// Bytes will return the raw bytes of L.
func (l L) Bytes() []byte {
return l.buf
}
// Size returns how many bytes used to represent this L.
func (l L) Size() int {
return l.size
}
// VSize returns the size of V.
func (l L) VSize() int {
return int(l.len)
}
// ReadL read L from bufio.Reader
func ReadL(r io.Reader) (*L, error) {
lenBuf := bytes.Buffer{}
for {
b, err := readByte(r)
if err != nil {
return nil, err
}
lenBuf.WriteByte(b)
if b&msb != msb {
break
}
}
buf := lenBuf.Bytes()
// decode to L
length, err := decodeL(buf)
if err != nil {
return nil, err
}
return &L{
buf: buf,
len: int(length),
size: len(buf),
}, nil
}
func decodeL(buf []byte) (length int32, err error) {
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(buf, &length)
return length, err
}

View File

@ -1,115 +0,0 @@
package mq_coder
import (
"bytes"
"errors"
"fmt"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
)
// StreamReader read an coder packet from io.Reader, and return
// the ValReader after decode out Tag and Len
type StreamReader struct {
src io.Reader
// Tag of a coder packet
Tag byte
// Len of a coder packet
Len int
// Val of a coder packet
Val io.Reader
}
// NewStreamParser create a new coder StreamReader
func NewStreamParser(reader io.Reader) *StreamReader {
return &StreamReader{
src: reader,
}
}
func (sr *StreamReader) GetValBuffer() ([]byte, error) {
buf, err := io.ReadAll(sr.Val)
return buf, err
}
// Do must run in a goroutine
func (sr *StreamReader) Do() error {
if sr.src == nil {
return errors.New("coder: nil source reader")
}
tag, err := readByte(sr.src)
if err != nil {
return err
}
// the first byte is coder.Tag
sr.Tag = tag
// read coder.Length bytes, a varInt format
lenBuf := bytes.Buffer{}
for {
b, e := readByte(sr.src)
if e != nil {
return err
}
lenBuf.WriteByte(b)
if b&0x80 != 0x80 {
break
}
}
// parse to coder.Length
var length int32
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
if err != nil {
return err
}
// validate len decoded from stream
if length < 0 {
return fmt.Errorf("coder: streamParse() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
}
sr.Len = int(length)
// read next {len} bytes as coder.Value
sr.Val = &valR{
length: int(length),
src: sr.src,
}
return nil
}
type valR struct {
length int
off int
src io.Reader
}
func (r *valR) Read(p []byte) (n int, err error) {
if r.src == nil {
return 0, nil
}
if r.off >= r.length {
return 0, io.EOF
}
bound := len(p)
if len(p) > r.length-r.off {
bound = r.length - r.off
}
// update read
r.off, err = r.src.Read(p[0:bound])
return r.off, err
}
func StreamReadPacket(reader io.Reader) (*StreamReader, error) {
sp := NewStreamParser(reader)
err := sp.Do()
return sp, err
}

View File

@ -1,138 +0,0 @@
package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
)
type StreamEncoder struct {
tag byte
buf *bytes.Buffer
pBuf *bytes.Buffer
len int
sLen int
reader *yR
}
func NewStreamEncoder(tag byte) *StreamEncoder {
var se = &StreamEncoder{
tag: tag,
buf: new(bytes.Buffer),
pBuf: new(bytes.Buffer),
}
return se
}
func (se *StreamEncoder) AddPacket(packet *PrimitivePacketEncoder) {
node := packet.Encode()
se.AddPacketBuffer(node)
}
func (se *StreamEncoder) AddPacketBuffer(buf []byte) {
se.pBuf.Write(buf)
se.growLen(len(se.pBuf.Bytes()))
}
func (se *StreamEncoder) AddStreamPacket(tag byte, length int, reader io.Reader) {
se.sLen = length
// s-Tag
se.pBuf.WriteByte(tag)
se.growLen(1)
// calculate s-Len
size := encoding.SizeOfPVarInt32(int32(length))
codec := encoding.VarCodec{Size: size}
tmp := make([]byte, size)
err := codec.EncodePVarInt32(tmp, int32(length))
if err != nil {
panic(err)
}
se.pBuf.Write(tmp)
se.growLen(size)
// total buf
se.buf.WriteByte(se.tag)
se.growLen(length)
// calculate total Len buf
size = encoding.SizeOfPVarInt32(int32(se.len))
codec = encoding.VarCodec{Size: size}
tmp = make([]byte, size)
err = codec.EncodePVarInt32(tmp, int32(se.len))
if err != nil {
panic(err)
}
se.buf.Write(tmp) //lenBuf
se.buf.Write(se.pBuf.Bytes())
se.growLen(size) // total length
se.growLen(1) // parent tag
se.reader = &yR{
buf: se.buf,
src: reader,
length: se.len,
sLen: se.sLen,
}
}
func (se *StreamEncoder) GetReader() io.Reader {
if se.reader != nil {
return se.reader
}
return new(bytes.Buffer)
}
// Pipe can pipe data to os.StdOut
func (se *StreamEncoder) Pipe(writer io.Writer) {
}
func (se *StreamEncoder) GetLen() int {
if se.reader != nil {
return se.len
}
return 0
}
func (se *StreamEncoder) growLen(step int) {
se.len += step
}
type yR struct {
src io.Reader
buf *bytes.Buffer
length int
off int
sLen int
}
func (r *yR) Read(p []byte) (n int, err error) {
if r.src == nil {
return 0, nil
}
if r.off >= r.length {
return 0, io.EOF
}
if r.off < r.length-r.sLen {
n, err = r.buf.Read(p)
r.off += n
if err != nil {
if err == io.EOF {
return n, nil
} else {
return 0, err
}
}
return n, nil
} else {
n, err = r.src.Read(p)
r.off += n
if err != nil {
return n, err
}
return n, nil
}
}

38
tag.go
View File

@ -1,38 +0,0 @@
package mq_coder
import (
"git.hpds.cc/Component/mq_coder/utils"
)
// Tag represents the Tag of TLV,
// MSB used to represent the packet type, 0x80 means a node packet, otherwise is a primitive packet.
// Low 7 bits represent Sequence ID, like `key` in JSON format
type Tag struct {
raw byte
}
// IsNode returns true is MSB is 1.
func (t *Tag) IsNode() bool {
return t.raw&utils.MSB == utils.MSB
}
// IsSlice determine if the current node is a Slice
func (t *Tag) IsSlice() bool {
return t.raw&utils.SliceFlag == utils.SliceFlag
}
// SeqId get the sequence ID, as key in JSON format
func (t *Tag) SeqId() byte {
//return t.raw & packet utils.DropMSB
return t.raw & utils.DropMSBArrayFlag
}
// NewTag create a NodePacket Tag field
func NewTag(b byte) *Tag {
return &Tag{raw: b}
}
// Raw return the original byte
func (t *Tag) Raw() byte {
return t.raw
}

View File

@ -1,18 +0,0 @@
package utils
// MSB is `1000 0000` describes this is a node packet, otherwise, is a primitive packet
const MSB byte = 0x80
// DropMSB is `0111 1111`, used to remove MSB flag bit
const DropMSB byte = 0x3F
// SliceFlag is `0100 0000`, describes this packet is a Slice type
const SliceFlag byte = 0x40
// DropMSBArrayFlag is `0011 1111`, used to remove MSB and Slice flag bit
const DropMSBArrayFlag byte = 0x3F
// IsNodePacket returns true if the tag represents a node package
func IsNodePacket(tag byte) bool {
return tag&MSB == MSB
}