This commit is contained in:
wangjian 2022-10-07 15:30:45 +08:00
commit 3680ef7a22
21 changed files with 1625 additions and 0 deletions

72
chunkedVReader.go Normal file
View File

@ -0,0 +1,72 @@
package mq_coder
import (
"bytes"
"io"
"io/ioutil"
)
type chunkVReader struct {
src io.Reader // the reader parts of V
buf *bytes.Buffer // the bytes part of V
totalSize int // size of whole buffer of this packet
off int // last read op
ChunkVSize int // the size of chunked V
}
// Read implement io.Reader interface
func (r *chunkVReader) Read(p []byte) (n int, err error) {
if r.src == nil {
return 0, nil
}
if r.off >= r.totalSize {
return 0, io.EOF
}
if r.off < r.totalSize-r.ChunkVSize {
n, err := r.buf.Read(p)
r.off += n
if err != nil {
if err == io.EOF {
return n, nil
} else {
return 0, err
}
}
return n, nil
}
n, err = r.src.Read(p)
r.off += n
if err != nil {
return n, err
}
return n, nil
}
// WriteTo implement io.WriteTo interface
func (r *chunkVReader) WriteTo(w io.Writer) (n int64, err error) {
if r.src == nil {
return 0, nil
}
// first, write existed buffer
m, err := w.Write(r.buf.Bytes())
if err != nil {
return 0, err
}
n += int64(m)
// last, write from reader
buf, err := ioutil.ReadAll(r.src)
if err != nil && err != io.EOF {
return 0, errWriteFromReader
}
m, err = w.Write(buf)
if err != nil {
return 0, err
}
n += int64(m)
return n, nil
}

54
coder.go Normal file
View File

@ -0,0 +1,54 @@
package mq_coder
import (
"errors"
"io"
)
var (
errBuildIncomplete = errors.New("coder.Encoder: invalid structure of packet")
errInvalidAdding = errors.New("coder.Encoder: can not add this Packet after StreamPacket has been add")
errNonStreamPacket = errors.New("coder.Packet: this packet is not in node mode")
errWriteFromReader = errors.New("coder.streamV: write from reader error")
errNotNodeMode = errors.New("coder.Encoder: packet should be in node mode can be add other packets as child")
errNilReader = errors.New("coder.Decoder: nil source reader")
)
// Packet 编解码器包
type Packet interface {
// SeqId 返回此数据包的序列Id.
SeqId() int
// Size 返回整个数据包的大小
Size() int
// VSize 返回V的大小.
VSize() int
// Bytes 此数据包的全部字节.
Bytes() []byte
// Reader 返回 io.Reader 字节.
Reader() io.Reader
// VReader 返回 io.Reader 的 流字节.
VReader() io.Reader
// IsStreamMode 是否流模式
IsStreamMode() bool
// IsNodeMode 是否节点模式
IsNodeMode() bool
// BytesV 返回流字节
BytesV() []byte
// UTF8StringV 返回流的utf8字符串值
UTF8StringV() string
// Int32V 返回流的int32值
Int32V() (val int32, err error)
// UInt32V 返回流的uint32值
UInt32V() (val uint32, err error)
// Int64V 返回流的int64值
Int64V() (val int64, err error)
// UInt64V 返回流的uint64值
UInt64V() (val uint64, err error)
// Float32V 返回流的float32值
Float32V() (val float32, err error)
// Float64V 返回流的float64值
Float64V() (val float64, err error)
// BoolV 返回流的bool值
BoolV() (val bool, err error)
}

93
decoder.go Normal file
View File

@ -0,0 +1,93 @@
package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/spec"
)
// Decoder is the tool for decoding packet from stream
type Decoder struct {
tag spec.T
len *spec.L
rd io.Reader
}
// NewDecoder returns a Decoder from an io.Reader
func NewDecoder(reader io.Reader) *Decoder {
return &Decoder{
rd: reader,
}
}
// SeqID return the SequenceID of the decoding packet
func (d *Decoder) SeqID() int {
return d.tag.Sid()
}
// UnderlyingReader returns the reader this decoder using
func (d *Decoder) UnderlyingReader() io.Reader {
return d.rd
}
// ReadHeader will block until io.EOF or receive T and L of a packet.
func (d *Decoder) ReadHeader() error {
// only read T and L
return d.readTL()
}
// GetChunkedPacket will block until io.EOF or receive V of a packet in chunked mode.
func (d *Decoder) GetChunkedPacket() Packet {
return &StreamPacket{
t: d.tag,
l: *d.len,
vr: d.rd,
chunkMode: true,
chunkSize: d.len.VSize(),
}
}
// GetFullFilledPacket read full Packet from given io.Reader
func (d *Decoder) GetFullFilledPacket() (packet Packet, err error) {
// read V
buf := new(bytes.Buffer)
total := 0
for {
valBuf := make([]byte, d.len.VSize())
n, err := d.rd.Read(valBuf)
if n > 0 {
total += n
buf.Write(valBuf[:n])
}
if total >= d.len.VSize() || err != nil {
break
}
}
packet = &StreamPacket{
t: d.tag,
l: *d.len,
vbuf: buf.Bytes(),
chunkMode: false,
}
return packet, nil
}
func (d *Decoder) readTL() (err error) {
if d.rd == nil {
return errNilReader
}
// read T
d.tag, err = spec.ReadT(d.rd)
if err != nil {
return err
}
// read L
d.len, err = spec.ReadL(d.rd)
return err
}

158
encoder.go Normal file
View File

@ -0,0 +1,158 @@
package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/spec"
)
// Encoder is the tool for creating a packet easily
type Encoder struct {
tag spec.T
len *spec.L
valReader io.Reader
valReaderSize int
nodes map[int]Packet
state int
size int32 // size of value
isStreamMode bool
valBuf *bytes.Buffer
done bool
seqID int
isNodeMode bool
}
// SetSeqId set sequenceId of a packet, if this packet contains other
// packets, isNode should set to true
func (b *Encoder) SetSeqId(seqId int, isNode bool) {
// init
b.valBuf = new(bytes.Buffer)
b.nodes = make(map[int]Packet)
// set seqID
b.seqID = seqId
b.isNodeMode = isNode
}
// SetBytesV set bytes type as V
func (b *Encoder) SetBytesV(buf []byte) {
b.size += int32(len(buf))
b.valBuf.Write(buf)
b.isStreamMode = false
b.state |= 0x04
}
// SetReaderV set io.Reader type as V
func (b *Encoder) SetReaderV(r io.Reader, size int) {
b.isStreamMode = true
b.valReader = r
b.state |= 0x04
b.size += int32(size)
b.valReaderSize = size
}
// AddPacket add a Packet child to this packet, this packet must be NodeMode
func (b *Encoder) AddPacket(child Packet) error {
// only packet is in node mode can add other packets
if !b.isNodeMode {
return errNotNodeMode
}
if b.done {
return errInvalidAdding
}
b.nodes[child.SeqId()] = child
buf := child.Bytes()
b.SetBytesV(buf)
return nil
}
// AddStreamPacket will put a StreamPacket in chunked mode to current packet.
func (b *Encoder) AddStreamPacket(child Packet) (err error) {
// if this packet is in stream mode, can not add any packets
if b.done {
return errInvalidAdding
}
// only accept packet in stream mode
if !child.IsStreamMode() {
return errNonStreamPacket
}
// set the valReader of this packet to the child's
b.valReader = child.VReader()
// valReaderSize will be the same as child's
b.valReaderSize = child.VSize()
// add this child packet
b.nodes[child.SeqId()] = child
// add the size of child's V to L of this packet
b.size += int32(child.Size())
// put the bytes of child to valBuf
buf := child.Bytes()
b.valBuf.Write(buf)
// update state
b.state |= 0x04
b.isStreamMode = true
b.done = true
return nil
}
// Packet return a Packet instance.
func (b *Encoder) Packet() (Packet, error) {
err := b.generateT()
if err != nil {
return nil, err
}
err = b.generateL()
if err != nil {
return nil, err
}
if b.state != 0x07 {
return nil, errBuildIncomplete
}
if b.isStreamMode {
return &StreamPacket{
t: b.tag,
l: *b.len,
vr: b.valReader,
vbuf: b.valBuf.Bytes(),
chunkMode: true,
chunkSize: b.valReaderSize,
}, err
}
// not streaming mode
return &StreamPacket{
t: b.tag,
l: *b.len,
vbuf: b.valBuf.Bytes(),
chunkMode: false,
}, err
}
// will generate T of a TLV.
func (b *Encoder) generateT() error {
t, err := spec.NewT(b.seqID)
t.SetNodeMode(b.isNodeMode)
if err != nil {
return err
}
b.tag = t
b.state |= 0x01
return nil
}
// will generate L of a TLV.
func (b *Encoder) generateL() error {
l, err := spec.NewL(int(b.size))
if err != nil {
return err
}
b.len = &l
b.state |= 0x02
return nil
}

98
encoder_sugar.go Normal file
View File

@ -0,0 +1,98 @@
package mq_coder
import (
"git.hpds.cc/Component/mq_coder/encoding"
)
// SetUTF8StringV set utf-8 string type value as V
func (b *Encoder) SetUTF8StringV(v string) {
buf := []byte(v)
b.SetBytesV(buf)
}
// SetInt32V set an int32 type value as V
func (b *Encoder) SetInt32V(v int32) error {
size := encoding.SizeOfNVarInt32(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarInt32(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetUInt32V set an uint32 type value as V
func (b *Encoder) SetUInt32V(v uint32) error {
size := encoding.SizeOfNVarUInt32(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarUInt32(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetInt64V set an int64 type value as V
func (b *Encoder) SetInt64V(v int64) error {
size := encoding.SizeOfNVarInt64(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarInt64(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetUInt64V set an uint64 type value as V
func (b *Encoder) SetUInt64V(v uint64) error {
size := encoding.SizeOfNVarUInt64(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeNVarUInt64(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetFloat32V set a float32 type value as V
func (b *Encoder) SetFloat32V(v float32) error {
size := encoding.SizeOfVarFloat32(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeVarFloat32(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetFloat64V set a float64 type value as V
func (b *Encoder) SetFloat64V(v float64) error {
size := encoding.SizeOfVarFloat64(v)
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
err := codec.EncodeVarFloat64(buf, v)
if err != nil {
return err
}
b.SetBytesV(buf)
return nil
}
// SetBoolV set bool type value as V
func (b *Encoder) SetBoolV(v bool) {
var size = encoding.SizeOfPVarUInt32(uint32(1))
codec := encoding.VarCodec{Size: size}
buf := make([]byte, size)
_ = codec.EncodePVarBool(buf, v)
b.SetBytesV(buf)
}

42
encoding/encoding.puml Normal file
View File

@ -0,0 +1,42 @@
@startuml
namespace encoding {
class VarCodec << (S,Aquamarine) >> {
+ Ptr int
+ Size int
- encodeNVarInt(buffer []byte, value int64) error
- decodeNVarInt(buffer []byte, value *int64) error
- encodePVarInt(buffer []byte, value int64) error
- decodePVarInt(buffer []byte, value *int64) error
- encodeVarFloat(buffer []byte, bits uint64, width int) error
- decodeVarFloat(buffer []byte, bits *uint64, width int) error
- sizeOfGap(width int) (int, int)
+ EncodeNVarInt32(buffer []byte, value int32) error
+ DecodeNVarInt32(buffer []byte, value *int32) error
+ EncodeNVarUInt32(buffer []byte, value uint32) error
+ DecodeNVarUInt32(buffer []byte, value *uint32) error
+ EncodeNVarInt64(buffer []byte, value int64) error
+ DecodeNVarInt64(buffer []byte, value *int64) error
+ EncodeNVarUInt64(buffer []byte, value uint64) error
+ DecodeNVarUInt64(buffer []byte, value *uint64) error
+ EncodePVarBool(buffer []byte, value bool) error
+ DecodePVarBool(buffer []byte, value *bool) error
+ EncodePVarInt32(buffer []byte, value int32) error
+ DecodePVarInt32(buffer []byte, value *int32) error
+ EncodePVarUInt32(buffer []byte, value uint32) error
+ DecodePVarUInt32(buffer []byte, value *uint32) error
+ EncodePVarInt64(buffer []byte, value int64) error
+ DecodePVarInt64(buffer []byte, value *int64) error
+ EncodePVarUInt64(buffer []byte, value uint64) error
+ DecodePVarUInt64(buffer []byte, value *uint64) error
+ EncodeVarFloat32(buffer []byte, value float32) error
+ DecodeVarFloat32(buffer []byte, value *float32) error
+ EncodeVarFloat64(buffer []byte, value float64) error
+ DecodeVarFloat64(buffer []byte, value *float64) error
}
}
@enduml

132
encoding/nvarint.go Normal file
View File

@ -0,0 +1,132 @@
package encoding
import (
"errors"
)
// SizeOfNVarInt32 return the buffer size after encoding value as NVarInt32
func SizeOfNVarInt32(value int32) int {
return sizeOfNVarInt(int64(value), 32)
}
// EncodeNVarInt32 encode value as NVarInt32 to buffer
func (codec *VarCodec) EncodeNVarInt32(buffer []byte, value int32) error {
return codec.encodeNVarInt(buffer, int64(value))
}
// DecodeNVarInt32 decode to value as NVarInt32 from buffer
func (codec *VarCodec) DecodeNVarInt32(buffer []byte, value *int32) error {
var val = int64(*value)
var err = codec.decodeNVarInt(buffer, &val)
*value = int32(val)
return err
}
// SizeOfNVarUInt32 return the buffer size after encoding value as NVarUInt32
func SizeOfNVarUInt32(value uint32) int {
return sizeOfNVarInt(int64(int32(value)), 32)
}
// EncodeNVarUInt32 encode value as NVarUInt32 to buffer
func (codec *VarCodec) EncodeNVarUInt32(buffer []byte, value uint32) error {
return codec.encodeNVarInt(buffer, int64(int32(value)))
}
// DecodeNVarUInt32 decode to value as NVarUInt32 from buffer
func (codec *VarCodec) DecodeNVarUInt32(buffer []byte, value *uint32) error {
var val = int64(int32(*value))
var err = codec.decodeNVarInt(buffer, &val)
*value = uint32(val)
return err
}
// SizeOfNVarInt64 return the buffer size after encoding value as NVarInt64
func SizeOfNVarInt64(value int64) int {
return sizeOfNVarInt(value, 64)
}
// EncodeNVarInt64 encode value as NVarInt64 to buffer
func (codec *VarCodec) EncodeNVarInt64(buffer []byte, value int64) error {
return codec.encodeNVarInt(buffer, value)
}
// DecodeNVarInt64 decode to value as NVarInt64 from buffer
func (codec *VarCodec) DecodeNVarInt64(buffer []byte, value *int64) error {
return codec.decodeNVarInt(buffer, value)
}
// SizeOfNVarUInt64 return the buffer size after encoding value as NVarUInt64
func SizeOfNVarUInt64(value uint64) int {
return sizeOfNVarInt(int64(value), 64)
}
// EncodeNVarUInt64 encode value as NVarUInt64 to buffer
func (codec *VarCodec) EncodeNVarUInt64(buffer []byte, value uint64) error {
return codec.encodeNVarInt(buffer, int64(value))
}
// DecodeNVarUInt64 decode to value as NVarUInt64 from buffer
func (codec *VarCodec) DecodeNVarUInt64(buffer []byte, value *uint64) error {
var val = int64(*value)
var err = codec.decodeNVarInt(buffer, &val)
*value = uint64(val)
return err
}
// SizeOfNVarInt return the buffer size after encoding value as NVarInt
func sizeOfNVarInt(value int64, width int) int {
const unit = 8 // bit width of encoding unit
var lead = value >> (width - 1)
for size := width / unit - 1; size > 0; size-- {
var lookAhead = value >> (size*unit - 1)
if lookAhead != lead {
return size + 1
}
}
return 1
}
func (codec *VarCodec) encodeNVarInt(buffer []byte, value int64) error {
if codec == nil || codec.Size == 0 {
return errors.New("nothing to encode")
}
const unit = 8 // bit width of encoding unit
for codec.Size > 0 {
if codec.Ptr >= len(buffer) {
return ErrBufferInsufficient
}
codec.Size--
buffer[codec.Ptr] = byte(value >> (codec.Size * unit))
codec.Ptr++
}
return nil
}
func (codec *VarCodec) decodeNVarInt(buffer []byte, value *int64) error {
if codec == nil || codec.Size == 0 {
return errors.New("nothing to decode")
}
const unit = 8 // bit width of encoding unit
if codec.Size > 0 { // initialize sign bit
if codec.Ptr >= len(buffer) {
return ErrBufferInsufficient
}
*value = int64(int8(buffer[codec.Ptr]) >> (unit - 1))
codec.Size = -codec.Size
}
for codec.Size < 0 {
if codec.Ptr >= len(buffer) {
return ErrBufferInsufficient
}
codec.Size++
*value = (*value << unit) | int64(buffer[codec.Ptr])
codec.Ptr++
}
return nil
}

108
encoding/nvarint_test.go Normal file
View File

@ -0,0 +1,108 @@
package encoding
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestNInt32(t *testing.T) {
testNVarInt32(t, -1, []byte{0xFF})
testNVarInt32(t, -5, []byte{0xFB})
testNVarInt32(t, 63, []byte{0x3F})
testNVarInt32(t, -65, []byte{0xBF})
testNVarInt32(t, 127, []byte{0x7F})
testNVarInt32(t, 255, []byte{0x00, 0xFF})
testNVarInt32(t, -4097, []byte{0xEF, 0xFF})
testNVarInt32(t, -8193, []byte{0xDF, 0xFF})
testNVarInt32(t, -2097152, []byte{0xE0, 0x00, 0x00})
testNVarInt32(t, -134217729, []byte{0xF7, 0xFF, 0xFF, 0xFF})
testNVarInt32(t, -2147483648, []byte{0x80, 0x00, 0x00, 0x00})
}
func TestNUInt32(t *testing.T) {
testNVarUInt32(t, 1, []byte{0x01})
testNVarUInt32(t, 127, []byte{0x7F})
testNVarUInt32(t, 128, []byte{0x00, 0x80})
testNVarUInt32(t, 130, []byte{0x00, 0x82})
testNVarUInt32(t, 1048576, []byte{0x10, 0x00, 0x00})
testNVarUInt32(t, 134217728, []byte{0x08, 0x00, 0x00, 0x00})
testNVarUInt32(t, 4294967295, []byte{0xFF})
}
func TestNInt64(t *testing.T) {
testNVarInt64(t, 0, []byte{0x00})
testNVarInt64(t, 1, []byte{0x01})
testNVarInt64(t, -1, []byte{0xFF})
}
func TestNUInt64(t *testing.T) {
testNVarUInt64(t, 0, []byte{0x00})
testNVarUInt64(t, 1, []byte{0x01})
testNVarUInt64(t, 18446744073709551615, []byte{0xFF})
}
func testNVarInt32(t *testing.T, value int32, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%X): %X", value, uint32(value), bytes)
var size = SizeOfNVarInt32(value)
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodeNVarInt32(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val int32
codec = VarCodec{Size: len(bytes)}
assert.Nil(t, codec.DecodeNVarInt32(bytes, &val), msg)
assert.Equal(t, value, val, msg)
}
func testNVarUInt32(t *testing.T, value uint32, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%X): %X", value, value, bytes)
var size = SizeOfNVarUInt32(value)
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodeNVarUInt32(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val uint32
codec = VarCodec{Size: len(bytes)}
assert.Nil(t, codec.DecodeNVarUInt32(bytes, &val), msg)
assert.Equal(t, value, val, msg)
}
func testNVarInt64(t *testing.T, value int64, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%X): %X", value, uint64(value), bytes)
var size = SizeOfNVarInt64(value)
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodeNVarInt64(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val int64
codec = VarCodec{Size: len(bytes)}
assert.Nil(t, codec.DecodeNVarInt64(bytes, &val), msg)
assert.Equal(t, value, val, msg)
}
func testNVarUInt64(t *testing.T, value uint64, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%X): %X", value, value, bytes)
var size = SizeOfNVarUInt64(value)
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodeNVarUInt64(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val uint64
codec = VarCodec{Size: len(bytes)}
assert.Nil(t, codec.DecodeNVarUInt64(bytes, &val), msg)
assert.Equal(t, value, val)
}

27
encoding/pvarbool.go Normal file
View File

@ -0,0 +1,27 @@
package encoding
// EncodePVarBool encode value as PVarBool to buffer
func (codec *VarCodec) EncodePVarBool(buffer []byte, value bool) error {
tmp := int64(1)
if !value {
tmp = int64(0)
}
return codec.encodePVarInt(buffer, tmp)
}
// DecodePVarBool decode to value as PVarBool from buffer
func (codec *VarCodec) DecodePVarBool(buffer []byte, value *bool) error {
if len(buffer) == 0 {
*value = false
return nil
}
var tmp int64
var err = codec.decodePVarInt(buffer, &tmp)
if tmp == 1 {
*value = true
} else {
*value = false
}
return err
}

29
encoding/pvarbool_test.go Normal file
View File

@ -0,0 +1,29 @@
package encoding
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestPVarBool(t *testing.T) {
testPVarBool(t, true, []byte{0x01})
testPVarBool(t, false, []byte{0x00})
}
func testPVarBool(t *testing.T, value bool, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%v): %X", value, value, bytes)
var size = SizeOfPVarUInt32(uint32(1))
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodePVarBool(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val bool
codec = VarCodec{}
assert.Nil(t, codec.DecodePVarBool(bytes, &val), msg)
assert.Equal(t, value, val, msg)
}

147
encoding/pvarint.go Normal file
View File

@ -0,0 +1,147 @@
package encoding
import (
"errors"
)
// SizeOfPVarInt32 return the buffer size after encoding value as PVarInt32
func SizeOfPVarInt32(value int32) int {
return sizeOfPVarInt(int64(value), 32)
}
// EncodePVarInt32 encode value as PVarInt32 to buffer
func (codec *VarCodec) EncodePVarInt32(buffer []byte, value int32) error {
return codec.encodePVarInt(buffer, int64(value))
}
// DecodePVarInt32 decode to value as PVarInt32 from buffer
func (codec *VarCodec) DecodePVarInt32(buffer []byte, value *int32) error {
var val = int64(*value)
var err = codec.decodePVarInt(buffer, &val)
*value = int32(val)
return err
}
// SizeOfPVarUInt32 return the buffer size after encoding value as PVarUInt32
func SizeOfPVarUInt32(value uint32) int {
return sizeOfPVarInt(int64(int32(value)), 32)
}
// EncodePVarUInt32 encode value as PVarUInt32 to buffer
func (codec *VarCodec) EncodePVarUInt32(buffer []byte, value uint32) error {
return codec.encodePVarInt(buffer, int64(int32(value)))
}
// DecodePVarUInt32 decode to value as PVarUInt32 from buffer
func (codec *VarCodec) DecodePVarUInt32(buffer []byte, value *uint32) error {
var val = int64(int32(*value))
var err = codec.decodePVarInt(buffer, &val)
*value = uint32(val)
return err
}
// SizeOfPVarInt64 return the buffer size after encoding value as PVarInt64
func SizeOfPVarInt64(value int64) int {
return sizeOfPVarInt(value, 64)
}
// EncodePVarInt64 encode value as PVarInt64 to buffer
func (codec *VarCodec) EncodePVarInt64(buffer []byte, value int64) error {
return codec.encodePVarInt(buffer, value)
}
// DecodePVarInt64 decode to value as PVarInt64 from buffer
func (codec *VarCodec) DecodePVarInt64(buffer []byte, value *int64) error {
return codec.decodePVarInt(buffer, value)
}
// SizeOfPVarUInt64 return the buffer size after encoding value as PVarUInt64
func SizeOfPVarUInt64(value uint64) int {
return sizeOfPVarInt(int64(value), 64)
}
// EncodePVarUInt64 encode value as PVarUInt64 to buffer
func (codec *VarCodec) EncodePVarUInt64(buffer []byte, value uint64) error {
return codec.encodePVarInt(buffer, int64(value))
}
// DecodePVarUInt64 decode to value as PVarUInt64 from buffer
func (codec *VarCodec) DecodePVarUInt64(buffer []byte, value *uint64) error {
var val = int64(*value)
var err = codec.decodePVarInt(buffer, &val)
*value = uint64(val)
return err
}
func sizeOfPVarInt(value int64, width int) int {
const unit = 7 // bit width of encoding unit
var lead = value >> (width - 1)
for size := width / unit; size > 0; size-- {
var lookAhead = value >> (size*unit - 1)
if lookAhead != lead {
return size + 1
}
}
return 1
}
func (codec *VarCodec) encodePVarInt(buffer []byte, value int64) error {
if codec == nil || codec.Size == 0 {
return errors.New("nothing to encode")
}
if codec.Ptr >= len(buffer) {
return ErrBufferInsufficient
}
const unit = 7 // bit width of encoding unit
const more = -1 << 7 // continuation bits
for codec.Size > 1 {
codec.Size--
var part = value >> (codec.Size * unit)
buffer[codec.Ptr] = byte(part | more)
codec.Ptr++
if codec.Ptr >= len(buffer) {
return ErrBufferInsufficient
}
}
const mask = -1 ^ (-1 << unit) // mask for encoding unit
codec.Size = 0
buffer[codec.Ptr] = byte(value & mask)
codec.Ptr++
return nil
}
func (codec *VarCodec) decodePVarInt(buffer []byte, value *int64) error {
if codec == nil {
return errors.New("nothing to decode")
}
if codec.Ptr >= len(buffer) {
return ErrBufferInsufficient
}
const unit = 7 // bit width of encoding unit
if codec.Size == 0 { // initialize sign bit
const flag = 8 - unit // bit width for non-encoding bits
*value = int64(int8(buffer[codec.Ptr]) << flag >> unit)
}
const mask = -1 ^ (-1 << unit) // mask for encoding unit
for {
var part = int8(buffer[codec.Ptr])
codec.Ptr++
codec.Size++
*value = (*value << unit) | int64(mask & part)
if part >= 0 { // it's the last byte
return nil
}
if codec.Ptr >= len(buffer) {
return ErrBufferInsufficient
}
}
}

108
encoding/pvarint_test.go Normal file
View File

@ -0,0 +1,108 @@
package encoding
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestPInt32(t *testing.T) {
testPVarInt32(t, -1, []byte{0x7F})
testPVarInt32(t, -5, []byte{0x7B})
testPVarInt32(t, 63, []byte{0x3F})
testPVarInt32(t, -65, []byte{0xFF, 0x3F})
testPVarInt32(t, 127, []byte{0x80, 0x7F})
testPVarInt32(t, 255, []byte{0x81, 0x7F})
testPVarInt32(t, -4097, []byte{0xDF, 0x7F})
testPVarInt32(t, -8193, []byte{0xFF, 0xBF, 0x7F})
testPVarInt32(t, -2097152, []byte{0xFF, 0x80, 0x80, 0x00})
testPVarInt32(t, -134217729, []byte{0xFF, 0xBF, 0xFF, 0xFF, 0x7F})
testPVarInt32(t, -2147483648, []byte{0xF8, 0x80, 0x80, 0x80, 0x00})
}
func TestPUInt32(t *testing.T) {
testPVarUInt32(t, 1, []byte{0x01})
testPVarUInt32(t, 127, []byte{0x80, 0x7F})
testPVarUInt32(t, 128, []byte{0x81, 0x00})
testPVarUInt32(t, 130, []byte{0x81, 0x02})
testPVarUInt32(t, 1048576, []byte{0x80, 0xC0, 0x80, 0x00})
testPVarUInt32(t, 134217728, []byte{0x80, 0xC0, 0x80, 0x80, 0x00})
testPVarUInt32(t, 4294967295, []byte{0x7F})
}
func TestPInt64(t *testing.T) {
testPVarInt64(t, 0, []byte{0x00})
testPVarInt64(t, 1, []byte{0x01})
testPVarInt64(t, -1, []byte{0x7F})
}
func TestPUInt64(t *testing.T) {
testPVarUInt64(t, 0, []byte{0x00})
testPVarUInt64(t, 1, []byte{0x01})
testPVarUInt64(t, 18446744073709551615, []byte{0x7F})
}
func testPVarInt32(t *testing.T, value int32, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%X): %X", value, uint32(value), bytes)
var size = SizeOfPVarInt32(value)
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodePVarInt32(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val int32
codec = VarCodec{}
assert.Nil(t, codec.DecodePVarInt32(bytes, &val), msg)
assert.Equal(t, value, val, msg)
}
func testPVarUInt32(t *testing.T, value uint32, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%X): %X", value, value, bytes)
var size = SizeOfPVarUInt32(value)
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodePVarUInt32(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val uint32
codec = VarCodec{}
assert.Nil(t, codec.DecodePVarUInt32(bytes, &val), msg)
assert.Equal(t, value, val, msg)
}
func testPVarInt64(t *testing.T, value int64, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%X): %X", value, uint64(value), bytes)
var size = SizeOfPVarInt64(value)
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodePVarInt64(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val int64
codec = VarCodec{}
assert.Nil(t, codec.DecodePVarInt64(bytes, &val), msg)
assert.Equal(t, value, val, msg)
}
func testPVarUInt64(t *testing.T, value uint64, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%X): %X", value, value, bytes)
var size = SizeOfPVarUInt64(value)
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodePVarUInt64(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val uint64
codec = VarCodec{}
assert.Nil(t, codec.DecodePVarUInt64(bytes, &val), msg)
assert.Equal(t, value, val)
}

17
encoding/varcodec.go Normal file
View File

@ -0,0 +1,17 @@
package encoding
import (
"errors"
)
// ErrBufferInsufficient describes error when encode/decode malformed VarInt
var ErrBufferInsufficient = errors.New("buffer insufficient")
// VarCodec for encode/decode VarInt
type VarCodec struct {
// next ptr in buf
Ptr int
// Encoder: bytes are to be written,
// Decoder: bytes have been consumed
Size int
}

118
encoding/varfloat.go Normal file
View File

@ -0,0 +1,118 @@
package encoding
import (
"errors"
"math"
mbit "math/bits"
)
// SizeOfVarFloat32 return the buffer size after encoding value as VarFloat32
func SizeOfVarFloat32(value float32) int {
return sizeOfVarFloat(uint64(math.Float32bits(value)), 4)
}
// EncodeVarFloat32 encode value as VarFloat32 to buffer
func (codec *VarCodec) EncodeVarFloat32(buffer []byte, value float32) error {
return codec.encodeVarFloat(buffer, uint64(math.Float32bits(value)), 4)
}
// DecodeVarFloat32 decode to value as VarFloat32 from buffer
func (codec *VarCodec) DecodeVarFloat32(buffer []byte, value *float32) error {
var bits = uint64(math.Float32bits(*value))
var err = codec.decodeVarFloat(buffer, &bits, 4)
*value = math.Float32frombits(uint32(bits))
return err
}
// SizeOfVarFloat64 return the buffer size after encoding value as VarFloat32
func SizeOfVarFloat64(value float64) int {
return sizeOfVarFloat(math.Float64bits(value), 8)
}
// EncodeVarFloat64 encode value as VarFloat64 to buffer
func (codec *VarCodec) EncodeVarFloat64(buffer []byte, value float64) error {
return codec.encodeVarFloat(buffer, math.Float64bits(value), 8)
}
// DecodeVarFloat64 decode to value as VarFloat64 from buffer
func (codec *VarCodec) DecodeVarFloat64(buffer []byte, value *float64) error {
var bits = math.Float64bits(*value)
var err = codec.decodeVarFloat(buffer, &bits, 8)
*value = math.Float64frombits(bits)
return err
}
func sizeOfVarFloat(bits uint64, width int) int {
const unit = 8 // bit width of encoding unit
const mask = uint64(0xFF) // mask of encoding unit
for s := 0; width > 1; s += unit {
if bits & (mask << s) != 0 {
return width
}
width--
}
return 1
}
func (codec *VarCodec) encodeVarFloat(buffer []byte, bits uint64, width int) error {
if codec == nil || codec.Size == 0 {
return errors.New("nothing to encode")
}
const unit = 8 // bit width of encoding unit
var gap, mask = codec.sizeOfGap(width)
for (codec.Size & mask) > 0 {
if codec.Ptr >= len(buffer) {
return ErrBufferInsufficient
}
codec.Size--
buffer[codec.Ptr] = byte(bits >> ((codec.Size & mask + gap) * unit))
codec.Ptr++
}
codec.Size = 0
return nil
}
func (codec *VarCodec) decodeVarFloat(buffer []byte, bits *uint64, width int) error {
if codec == nil || codec.Size == 0 {
return errors.New("nothing to decode")
}
const unit = 8 // bit width of encoding unit
var gap, mask = codec.sizeOfGap(width)
for (codec.Size & mask) > 0 {
if codec.Ptr >= len(buffer) {
return ErrBufferInsufficient
}
codec.Size--
*bits = (*bits << unit) | uint64(buffer[codec.Ptr])
codec.Ptr++
}
*bits <<= gap * unit
codec.Size = 0
return nil
}
func (codec *VarCodec) sizeOfGap(width int) (int, int) {
var ms = mbit.OnesCount(^uint(0)) // machine bit width for an int
var size = ms - 8 // bit width of effective size
var mask = -1 ^ (-1 << size) // mask of effective size
var gap = 0 // gap between encoded size and decoded size
if codec.Size > 0 {
if width > codec.Size {
gap = width - codec.Size
}
var sign = -1 << (ms - 1) // single sign bit for an int
codec.Size = sign | (gap << size) | (codec.Size & mask)
} else {
gap = (codec.Size >> size) & 0x7F
}
return gap, mask
}

61
encoding/varfloat_test.go Normal file
View File

@ -0,0 +1,61 @@
package encoding
import (
"fmt"
"math"
"testing"
"github.com/stretchr/testify/assert"
)
func TestFloat32(t *testing.T) {
testVarFloat32(t, 0, []byte{0x00})
testVarFloat32(t, 1, []byte{0x3F, 0x80})
testVarFloat32(t, 25, []byte{0x41, 0xC8})
testVarFloat32(t, -2, []byte{0xC0})
testVarFloat32(t, 0.25, []byte{0x3E, 0x80})
testVarFloat32(t, 0.375, []byte{0x3E, 0xC0})
testVarFloat32(t, 12.375, []byte{0x41, 0x46})
testVarFloat32(t, 68.123, []byte{0x42, 0x88, 0x3E, 0xFA})
}
func TestFloat64(t *testing.T) {
testVarFloat64(t, 0, []byte{0x00})
testVarFloat64(t, 1, []byte{0x3F, 0xF0})
testVarFloat64(t, 2, []byte{0x40})
testVarFloat64(t, 23, []byte{0x40, 0x37})
testVarFloat64(t, -2, []byte{0xC0})
testVarFloat64(t, 0.01171875, []byte{0x3F, 0x88})
}
func testVarFloat32(t *testing.T, value float32, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%X): %X", value, math.Float32bits(value), bytes)
var size = SizeOfVarFloat32(value)
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodeVarFloat32(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val float32
codec = VarCodec{Size: len(bytes)}
assert.Nil(t, codec.DecodeVarFloat32(bytes, &val), msg)
assert.Equal(t, value, val, msg)
}
func testVarFloat64(t *testing.T, value float64, bytes []byte) {
var msg = fmt.Sprintf("tester %v (%X): %X", value, math.Float64bits(value), bytes)
var size = SizeOfVarFloat64(value)
assert.Equal(t, len(bytes), size, msg)
buffer := make([]byte, len(bytes))
codec := VarCodec{Size: size}
assert.Nil(t, codec.EncodeVarFloat64(buffer, value), msg)
assert.Equal(t, bytes, buffer, msg)
var val float64
codec = VarCodec{Size: len(bytes)}
assert.Nil(t, codec.DecodeVarFloat64(bytes, &val), msg)
assert.Equal(t, value, val, msg)
}

8
go.mod Normal file
View File

@ -0,0 +1,8 @@
module mq_coder
go 1.18
replace (
git.hpds.cc/Component/mq_coder/encoding latest => target mq_coder/encoding
git.hpds.cc/Component/mq_coder/spec latest => target mq_coder/spec
)

156
packet.go Normal file
View File

@ -0,0 +1,156 @@
package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
"git.hpds.cc/Component/mq_coder/spec"
)
// StreamPacket implement the Packet interface.
type StreamPacket struct {
t spec.T
l spec.L
vbuf []byte
vr io.Reader
chunkMode bool
chunkSize int
}
var _ Packet = &StreamPacket{}
// SeqId returns the sequenceId of this packet
func (p *StreamPacket) SeqId() int { return p.t.Sid() }
// Size returns the size of whole packet.
func (p *StreamPacket) Size() int {
// T.Size + L.Size + V.Size
return p.t.Size() + p.l.Size() + p.l.VSize()
}
// VSize returns the size of V.
func (p *StreamPacket) VSize() int { return p.l.VSize() }
// Bytes return the raw bytes of this packet. V will be absent if
// is in chunked mode
func (p *StreamPacket) Bytes() []byte {
buf := new(bytes.Buffer)
// the raw bytes of T and L
p.writeTL(buf)
// p.valbuf stores the raw bytes of V
buf.Write(p.vbuf)
return buf.Bytes()
}
// VReader return an io.Reader which can be read as the content of V.
func (p *StreamPacket) VReader() io.Reader {
if !p.chunkMode {
return bytes.NewReader(p.vbuf)
}
return p.vr
}
// Reader return an io.Reader which can be read as the whole bytes of
// this packet. This function only available if this V of packet is in
// chunked mode.
func (p *StreamPacket) Reader() io.Reader {
if !p.chunkMode {
buf := new(bytes.Buffer)
buf.Write(p.t.Bytes())
buf.Write(p.l.Bytes())
buf.Write(p.vbuf)
return buf
}
buf := new(bytes.Buffer)
// T and L of this packet
p.writeTL(buf)
// V of this packet
buf.Write(p.vbuf)
return &chunkVReader{
buf: buf,
src: p.vr,
totalSize: p.Size(),
ChunkVSize: p.VSize(),
}
}
// IsStreamMode returns a bool value indicates if the V of
// this packet is in stream mode
func (p *StreamPacket) IsStreamMode() bool {
return p.chunkMode
}
// IsNodeMode returns a bool value indicates if this packet
// is node mode
func (p *StreamPacket) IsNodeMode() bool {
return p.t.IsNodeMode()
}
// write the raw bytes of T and L to given buf
func (p *StreamPacket) writeTL(buf *bytes.Buffer) {
buf.Write(p.t.Bytes())
buf.Write(p.l.Bytes())
}
// BytesV return V as bytes
func (p *StreamPacket) BytesV() []byte {
return p.vbuf
}
// UTF8StringV return V as utf-8 string
func (p *StreamPacket) UTF8StringV() string {
return string(p.vbuf)
}
// Int32V return V as int32
func (p *StreamPacket) Int32V() (val int32, err error) {
codec := encoding.VarCodec{Size: len(p.vbuf)}
err = codec.DecodeNVarInt32(p.vbuf, &val)
return val, err
}
// UInt32V return V as uint32
func (p *StreamPacket) UInt32V() (val uint32, err error) {
codec := encoding.VarCodec{Size: len(p.vbuf)}
err = codec.DecodeNVarUInt32(p.vbuf, &val)
return val, err
}
// Int64V return V as int64
func (p *StreamPacket) Int64V() (val int64, err error) {
codec := encoding.VarCodec{Size: len(p.vbuf)}
err = codec.DecodeNVarInt64(p.vbuf, &val)
return val, err
}
// UInt64V return V as uint64
func (p *StreamPacket) UInt64V() (val uint64, err error) {
codec := encoding.VarCodec{Size: len(p.vbuf)}
err = codec.DecodeNVarUInt64(p.vbuf, &val)
return val, err
}
// Float32V return V as float32
func (p *StreamPacket) Float32V() (val float32, err error) {
codec := encoding.VarCodec{Size: len(p.vbuf)}
err = codec.DecodeVarFloat32(p.vbuf, &val)
return val, err
}
// Float64V return V as float64
func (p *StreamPacket) Float64V() (val float64, err error) {
codec := encoding.VarCodec{Size: len(p.vbuf)}
err = codec.DecodeVarFloat64(p.vbuf, &val)
return val, err
}
// BoolV return V as bool
func (p *StreamPacket) BoolV() (val bool, err error) {
codec := encoding.VarCodec{Size: len(p.vbuf)}
err = codec.DecodePVarBool(p.vbuf, &val)
return val, err
}

26
spec/spec.go Normal file
View File

@ -0,0 +1,26 @@
package spec
import (
"errors"
"io"
)
const (
maxSeqID = 0x3F
flagBitNode = 0x80
wipeFlagBits = 0x3F
msb = 0x80
)
var (
errInvalidSeqID = errors.New("y3.Builder: SeqID should >= 0 and =< 0x3F")
)
func readByte(reader io.Reader) (byte, error) {
var b [1]byte
n, err := reader.Read(b[:])
if n == 0 {
return 0x00, err
}
return b[0], err
}

27
spec/spec.puml Normal file
View File

@ -0,0 +1,27 @@
@startuml
namespace spec {
class L << (S,Aquamarine) >> {
- buf []byte
- size int
- len int
+ Bytes() []byte
+ Size() int
+ VSize() int
}
class T << (S,Aquamarine) >> {
+ Sid() int
+ Bytes() []byte
+ IsNodeMode() bool
+ SetNodeMode(flag bool)
+ Size() int
}
class spec.T << (T, #FF7700) >> {
}
}
"__builtin__.byte" #.. "spec.T"
@enduml

54
spec/tlv.t.go Normal file
View File

@ -0,0 +1,54 @@
package spec
import (
"io"
)
// T is the Tag in a TLV structure
type T byte
// NewT returns a T with sequenceID. If this packet contains other
// packets, this packet will be a "node packet", the T of this packet
// will set MSB to T.
func NewT(seqID int) (T, error) {
if seqID < 0 || seqID > maxSeqID {
return 0, errInvalidSeqID
}
return T(seqID), nil
}
// Sid returns the sequenceID of this packet.
func (t T) Sid() int {
return int(t & wipeFlagBits)
}
// Bytes returns raw bytes of T.
func (t T) Bytes() []byte {
return []byte{byte(t)}
}
// IsNodeMode will return true if this packet contains other packets.
// Otherwise return flase.
func (t T) IsNodeMode() bool {
return t&flagBitNode == flagBitNode
}
// SetNodeMode will set T to indicates this packet contains
// other packets.
func (t *T) SetNodeMode(flag bool) {
if flag {
*t |= flagBitNode
}
}
// Size return the size of T raw bytes.
func (t T) Size() int {
return 1
}
// ReadT read T from a bufio.Reader
func ReadT(rd io.Reader) (T, error) {
b, err := readByte(rd)
return T(b), err
}

90
spec/tvl.l.go Normal file
View File

@ -0,0 +1,90 @@
package spec
import (
"bytes"
"errors"
"io"
//"git.hpds.cc/Component/mq_coder/encoding"
"mq_coder/encoding"
)
// L is the Length in a TLV structure
type L struct {
buf []byte
size int
len int
}
// NewL will take an int type len as parameter and return L to
// represent the size of V in a TLV. an integer will be encoded as
// a PVarInt32 type to represent the value.
func NewL(len int) (L, error) {
var l = L{}
if len < -1 {
return l, errors.New("y3.L: len can't less than -1")
}
valLen := int32(len)
l.size = encoding.SizeOfPVarInt32(valLen)
codec := encoding.VarCodec{Size: l.size}
tmp := make([]byte, l.size)
err := codec.EncodePVarInt32(tmp, valLen)
if err != nil {
panic(err)
}
l.buf = make([]byte, l.size)
copy(l.buf, tmp)
l.len = len
return l, nil
}
// Bytes will return the raw bytes of L.
func (l L) Bytes() []byte {
return l.buf
}
// Size returns how many bytes used to represent this L.
func (l L) Size() int {
return l.size
}
// VSize returns the size of V.
func (l L) VSize() int {
return int(l.len)
}
// ReadL read L from bufio.Reader
func ReadL(r io.Reader) (*L, error) {
lenBuf := bytes.Buffer{}
for {
b, err := readByte(r)
if err != nil {
return nil, err
}
lenBuf.WriteByte(b)
if b&msb != msb {
break
}
}
buf := lenBuf.Bytes()
// decode to L
length, err := decodeL(buf)
if err != nil {
return nil, err
}
return &L{
buf: buf,
len: int(length),
size: len(buf),
}, nil
}
func decodeL(buf []byte) (length int32, err error) {
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(buf, &length)
return length, err
}