commit 3680ef7a223646b70aeb353d6c14f0ec639aac2e Author: wangjian Date: Fri Oct 7 15:30:45 2022 +0800 init diff --git a/chunkedVReader.go b/chunkedVReader.go new file mode 100644 index 0000000..33be7c6 --- /dev/null +++ b/chunkedVReader.go @@ -0,0 +1,72 @@ +package mq_coder + +import ( + "bytes" + "io" + "io/ioutil" +) + +type chunkVReader struct { + src io.Reader // the reader parts of V + buf *bytes.Buffer // the bytes part of V + totalSize int // size of whole buffer of this packet + off int // last read op + ChunkVSize int // the size of chunked V +} + +// Read implement io.Reader interface +func (r *chunkVReader) Read(p []byte) (n int, err error) { + if r.src == nil { + return 0, nil + } + + if r.off >= r.totalSize { + return 0, io.EOF + } + + if r.off < r.totalSize-r.ChunkVSize { + n, err := r.buf.Read(p) + r.off += n + if err != nil { + if err == io.EOF { + return n, nil + } else { + return 0, err + } + } + return n, nil + } + n, err = r.src.Read(p) + r.off += n + if err != nil { + return n, err + } + return n, nil +} + +// WriteTo implement io.WriteTo interface +func (r *chunkVReader) WriteTo(w io.Writer) (n int64, err error) { + if r.src == nil { + return 0, nil + } + + // first, write existed buffer + m, err := w.Write(r.buf.Bytes()) + if err != nil { + return 0, err + } + n += int64(m) + + // last, write from reader + buf, err := ioutil.ReadAll(r.src) + if err != nil && err != io.EOF { + return 0, errWriteFromReader + } + m, err = w.Write(buf) + if err != nil { + return 0, err + } + + n += int64(m) + return n, nil +} diff --git a/coder.go b/coder.go new file mode 100644 index 0000000..06557c5 --- /dev/null +++ b/coder.go @@ -0,0 +1,54 @@ +package mq_coder + +import ( + "errors" + "io" +) + +var ( + errBuildIncomplete = errors.New("coder.Encoder: invalid structure of packet") + errInvalidAdding = errors.New("coder.Encoder: can not add this Packet after StreamPacket has been add") + errNonStreamPacket = errors.New("coder.Packet: this packet is not in node mode") + errWriteFromReader = errors.New("coder.streamV: write from reader error") + errNotNodeMode = errors.New("coder.Encoder: packet should be in node mode can be add other packets as child") + errNilReader = errors.New("coder.Decoder: nil source reader") +) + +// Packet 编解码器包 +type Packet interface { + // SeqId 返回此数据包的序列Id. + SeqId() int + // Size 返回整个数据包的大小 + Size() int + // VSize 返回V的大小. + VSize() int + // Bytes 此数据包的全部字节. + Bytes() []byte + // Reader 返回 io.Reader 字节. + Reader() io.Reader + // VReader 返回 io.Reader 的 流字节. + VReader() io.Reader + // IsStreamMode 是否流模式 + IsStreamMode() bool + // IsNodeMode 是否节点模式 + IsNodeMode() bool + + // BytesV 返回流字节 + BytesV() []byte + // UTF8StringV 返回流的utf8字符串值 + UTF8StringV() string + // Int32V 返回流的int32值 + Int32V() (val int32, err error) + // UInt32V 返回流的uint32值 + UInt32V() (val uint32, err error) + // Int64V 返回流的int64值 + Int64V() (val int64, err error) + // UInt64V 返回流的uint64值 + UInt64V() (val uint64, err error) + // Float32V 返回流的float32值 + Float32V() (val float32, err error) + // Float64V 返回流的float64值 + Float64V() (val float64, err error) + // BoolV 返回流的bool值 + BoolV() (val bool, err error) +} diff --git a/decoder.go b/decoder.go new file mode 100644 index 0000000..70c6470 --- /dev/null +++ b/decoder.go @@ -0,0 +1,93 @@ +package mq_coder + +import ( + "bytes" + "io" + + "git.hpds.cc/Component/mq_coder/spec" +) + +// Decoder is the tool for decoding packet from stream +type Decoder struct { + tag spec.T + len *spec.L + rd io.Reader +} + +// NewDecoder returns a Decoder from an io.Reader +func NewDecoder(reader io.Reader) *Decoder { + return &Decoder{ + rd: reader, + } +} + +// SeqID return the SequenceID of the decoding packet +func (d *Decoder) SeqID() int { + return d.tag.Sid() +} + +// UnderlyingReader returns the reader this decoder using +func (d *Decoder) UnderlyingReader() io.Reader { + return d.rd +} + +// ReadHeader will block until io.EOF or receive T and L of a packet. +func (d *Decoder) ReadHeader() error { + // only read T and L + return d.readTL() +} + +// GetChunkedPacket will block until io.EOF or receive V of a packet in chunked mode. +func (d *Decoder) GetChunkedPacket() Packet { + return &StreamPacket{ + t: d.tag, + l: *d.len, + vr: d.rd, + chunkMode: true, + chunkSize: d.len.VSize(), + } +} + +// GetFullFilledPacket read full Packet from given io.Reader +func (d *Decoder) GetFullFilledPacket() (packet Packet, err error) { + // read V + buf := new(bytes.Buffer) + total := 0 + for { + valBuf := make([]byte, d.len.VSize()) + n, err := d.rd.Read(valBuf) + if n > 0 { + total += n + buf.Write(valBuf[:n]) + } + if total >= d.len.VSize() || err != nil { + break + } + } + + packet = &StreamPacket{ + t: d.tag, + l: *d.len, + vbuf: buf.Bytes(), + chunkMode: false, + } + + return packet, nil +} + +func (d *Decoder) readTL() (err error) { + if d.rd == nil { + return errNilReader + } + + // read T + d.tag, err = spec.ReadT(d.rd) + if err != nil { + return err + } + + // read L + d.len, err = spec.ReadL(d.rd) + + return err +} diff --git a/encoder.go b/encoder.go new file mode 100644 index 0000000..66eaf30 --- /dev/null +++ b/encoder.go @@ -0,0 +1,158 @@ +package mq_coder + +import ( + "bytes" + "io" + + "git.hpds.cc/Component/mq_coder/spec" +) + +// Encoder is the tool for creating a packet easily +type Encoder struct { + tag spec.T + len *spec.L + valReader io.Reader + valReaderSize int + nodes map[int]Packet + state int + size int32 // size of value + isStreamMode bool + valBuf *bytes.Buffer + done bool + seqID int + isNodeMode bool +} + +// SetSeqId set sequenceId of a packet, if this packet contains other +// packets, isNode should set to true +func (b *Encoder) SetSeqId(seqId int, isNode bool) { + // init + b.valBuf = new(bytes.Buffer) + b.nodes = make(map[int]Packet) + // set seqID + b.seqID = seqId + b.isNodeMode = isNode +} + +// SetBytesV set bytes type as V +func (b *Encoder) SetBytesV(buf []byte) { + b.size += int32(len(buf)) + b.valBuf.Write(buf) + b.isStreamMode = false + b.state |= 0x04 +} + +// SetReaderV set io.Reader type as V +func (b *Encoder) SetReaderV(r io.Reader, size int) { + b.isStreamMode = true + b.valReader = r + b.state |= 0x04 + b.size += int32(size) + b.valReaderSize = size +} + +// AddPacket add a Packet child to this packet, this packet must be NodeMode +func (b *Encoder) AddPacket(child Packet) error { + // only packet is in node mode can add other packets + if !b.isNodeMode { + return errNotNodeMode + } + + if b.done { + return errInvalidAdding + } + b.nodes[child.SeqId()] = child + buf := child.Bytes() + b.SetBytesV(buf) + return nil +} + +// AddStreamPacket will put a StreamPacket in chunked mode to current packet. +func (b *Encoder) AddStreamPacket(child Packet) (err error) { + // if this packet is in stream mode, can not add any packets + if b.done { + return errInvalidAdding + } + + // only accept packet in stream mode + if !child.IsStreamMode() { + return errNonStreamPacket + } + + // set the valReader of this packet to the child's + b.valReader = child.VReader() + + // valReaderSize will be the same as child's + b.valReaderSize = child.VSize() + // add this child packet + b.nodes[child.SeqId()] = child + // add the size of child's V to L of this packet + b.size += int32(child.Size()) + // put the bytes of child to valBuf + buf := child.Bytes() + b.valBuf.Write(buf) + // update state + b.state |= 0x04 + b.isStreamMode = true + b.done = true + return nil +} + +// Packet return a Packet instance. +func (b *Encoder) Packet() (Packet, error) { + err := b.generateT() + if err != nil { + return nil, err + } + + err = b.generateL() + if err != nil { + return nil, err + } + + if b.state != 0x07 { + return nil, errBuildIncomplete + } + + if b.isStreamMode { + return &StreamPacket{ + t: b.tag, + l: *b.len, + vr: b.valReader, + vbuf: b.valBuf.Bytes(), + chunkMode: true, + chunkSize: b.valReaderSize, + }, err + } + + // not streaming mode + return &StreamPacket{ + t: b.tag, + l: *b.len, + vbuf: b.valBuf.Bytes(), + chunkMode: false, + }, err +} + +// will generate T of a TLV. +func (b *Encoder) generateT() error { + t, err := spec.NewT(b.seqID) + t.SetNodeMode(b.isNodeMode) + if err != nil { + return err + } + b.tag = t + b.state |= 0x01 + return nil +} + +// will generate L of a TLV. +func (b *Encoder) generateL() error { + l, err := spec.NewL(int(b.size)) + if err != nil { + return err + } + b.len = &l + b.state |= 0x02 + return nil +} diff --git a/encoder_sugar.go b/encoder_sugar.go new file mode 100644 index 0000000..dcba977 --- /dev/null +++ b/encoder_sugar.go @@ -0,0 +1,98 @@ +package mq_coder + +import ( + "git.hpds.cc/Component/mq_coder/encoding" +) + +// SetUTF8StringV set utf-8 string type value as V +func (b *Encoder) SetUTF8StringV(v string) { + buf := []byte(v) + b.SetBytesV(buf) +} + +// SetInt32V set an int32 type value as V +func (b *Encoder) SetInt32V(v int32) error { + size := encoding.SizeOfNVarInt32(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeNVarInt32(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetUInt32V set an uint32 type value as V +func (b *Encoder) SetUInt32V(v uint32) error { + size := encoding.SizeOfNVarUInt32(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeNVarUInt32(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetInt64V set an int64 type value as V +func (b *Encoder) SetInt64V(v int64) error { + size := encoding.SizeOfNVarInt64(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeNVarInt64(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetUInt64V set an uint64 type value as V +func (b *Encoder) SetUInt64V(v uint64) error { + size := encoding.SizeOfNVarUInt64(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeNVarUInt64(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetFloat32V set a float32 type value as V +func (b *Encoder) SetFloat32V(v float32) error { + size := encoding.SizeOfVarFloat32(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeVarFloat32(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetFloat64V set a float64 type value as V +func (b *Encoder) SetFloat64V(v float64) error { + size := encoding.SizeOfVarFloat64(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeVarFloat64(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetBoolV set bool type value as V +func (b *Encoder) SetBoolV(v bool) { + var size = encoding.SizeOfPVarUInt32(uint32(1)) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + _ = codec.EncodePVarBool(buf, v) + b.SetBytesV(buf) +} diff --git a/encoding/encoding.puml b/encoding/encoding.puml new file mode 100644 index 0000000..e2c9be8 --- /dev/null +++ b/encoding/encoding.puml @@ -0,0 +1,42 @@ +@startuml +namespace encoding { + class VarCodec << (S,Aquamarine) >> { + + Ptr int + + Size int + + - encodeNVarInt(buffer []byte, value int64) error + - decodeNVarInt(buffer []byte, value *int64) error + - encodePVarInt(buffer []byte, value int64) error + - decodePVarInt(buffer []byte, value *int64) error + - encodeVarFloat(buffer []byte, bits uint64, width int) error + - decodeVarFloat(buffer []byte, bits *uint64, width int) error + - sizeOfGap(width int) (int, int) + + + EncodeNVarInt32(buffer []byte, value int32) error + + DecodeNVarInt32(buffer []byte, value *int32) error + + EncodeNVarUInt32(buffer []byte, value uint32) error + + DecodeNVarUInt32(buffer []byte, value *uint32) error + + EncodeNVarInt64(buffer []byte, value int64) error + + DecodeNVarInt64(buffer []byte, value *int64) error + + EncodeNVarUInt64(buffer []byte, value uint64) error + + DecodeNVarUInt64(buffer []byte, value *uint64) error + + EncodePVarBool(buffer []byte, value bool) error + + DecodePVarBool(buffer []byte, value *bool) error + + EncodePVarInt32(buffer []byte, value int32) error + + DecodePVarInt32(buffer []byte, value *int32) error + + EncodePVarUInt32(buffer []byte, value uint32) error + + DecodePVarUInt32(buffer []byte, value *uint32) error + + EncodePVarInt64(buffer []byte, value int64) error + + DecodePVarInt64(buffer []byte, value *int64) error + + EncodePVarUInt64(buffer []byte, value uint64) error + + DecodePVarUInt64(buffer []byte, value *uint64) error + + EncodeVarFloat32(buffer []byte, value float32) error + + DecodeVarFloat32(buffer []byte, value *float32) error + + EncodeVarFloat64(buffer []byte, value float64) error + + DecodeVarFloat64(buffer []byte, value *float64) error + + } +} + + +@enduml \ No newline at end of file diff --git a/encoding/nvarint.go b/encoding/nvarint.go new file mode 100644 index 0000000..2b3552c --- /dev/null +++ b/encoding/nvarint.go @@ -0,0 +1,132 @@ +package encoding + +import ( + "errors" +) + +// SizeOfNVarInt32 return the buffer size after encoding value as NVarInt32 +func SizeOfNVarInt32(value int32) int { + return sizeOfNVarInt(int64(value), 32) +} + +// EncodeNVarInt32 encode value as NVarInt32 to buffer +func (codec *VarCodec) EncodeNVarInt32(buffer []byte, value int32) error { + return codec.encodeNVarInt(buffer, int64(value)) +} + +// DecodeNVarInt32 decode to value as NVarInt32 from buffer +func (codec *VarCodec) DecodeNVarInt32(buffer []byte, value *int32) error { + var val = int64(*value) + var err = codec.decodeNVarInt(buffer, &val) + *value = int32(val) + return err +} + +// SizeOfNVarUInt32 return the buffer size after encoding value as NVarUInt32 +func SizeOfNVarUInt32(value uint32) int { + return sizeOfNVarInt(int64(int32(value)), 32) +} + +// EncodeNVarUInt32 encode value as NVarUInt32 to buffer +func (codec *VarCodec) EncodeNVarUInt32(buffer []byte, value uint32) error { + return codec.encodeNVarInt(buffer, int64(int32(value))) +} + +// DecodeNVarUInt32 decode to value as NVarUInt32 from buffer +func (codec *VarCodec) DecodeNVarUInt32(buffer []byte, value *uint32) error { + var val = int64(int32(*value)) + var err = codec.decodeNVarInt(buffer, &val) + *value = uint32(val) + return err +} + +// SizeOfNVarInt64 return the buffer size after encoding value as NVarInt64 +func SizeOfNVarInt64(value int64) int { + return sizeOfNVarInt(value, 64) +} + +// EncodeNVarInt64 encode value as NVarInt64 to buffer +func (codec *VarCodec) EncodeNVarInt64(buffer []byte, value int64) error { + return codec.encodeNVarInt(buffer, value) +} + +// DecodeNVarInt64 decode to value as NVarInt64 from buffer +func (codec *VarCodec) DecodeNVarInt64(buffer []byte, value *int64) error { + return codec.decodeNVarInt(buffer, value) +} + +// SizeOfNVarUInt64 return the buffer size after encoding value as NVarUInt64 +func SizeOfNVarUInt64(value uint64) int { + return sizeOfNVarInt(int64(value), 64) +} + +// EncodeNVarUInt64 encode value as NVarUInt64 to buffer +func (codec *VarCodec) EncodeNVarUInt64(buffer []byte, value uint64) error { + return codec.encodeNVarInt(buffer, int64(value)) +} + +// DecodeNVarUInt64 decode to value as NVarUInt64 from buffer +func (codec *VarCodec) DecodeNVarUInt64(buffer []byte, value *uint64) error { + var val = int64(*value) + var err = codec.decodeNVarInt(buffer, &val) + *value = uint64(val) + return err +} + +// SizeOfNVarInt return the buffer size after encoding value as NVarInt +func sizeOfNVarInt(value int64, width int) int { + const unit = 8 // bit width of encoding unit + + var lead = value >> (width - 1) + for size := width / unit - 1; size > 0; size-- { + var lookAhead = value >> (size*unit - 1) + if lookAhead != lead { + return size + 1 + } + } + return 1 +} + +func (codec *VarCodec) encodeNVarInt(buffer []byte, value int64) error { + if codec == nil || codec.Size == 0 { + return errors.New("nothing to encode") + } + + const unit = 8 // bit width of encoding unit + for codec.Size > 0 { + if codec.Ptr >= len(buffer) { + return ErrBufferInsufficient + } + + codec.Size-- + buffer[codec.Ptr] = byte(value >> (codec.Size * unit)) + codec.Ptr++ + } + return nil +} + +func (codec *VarCodec) decodeNVarInt(buffer []byte, value *int64) error { + if codec == nil || codec.Size == 0 { + return errors.New("nothing to decode") + } + + const unit = 8 // bit width of encoding unit + if codec.Size > 0 { // initialize sign bit + if codec.Ptr >= len(buffer) { + return ErrBufferInsufficient + } + *value = int64(int8(buffer[codec.Ptr]) >> (unit - 1)) + codec.Size = -codec.Size + } + + for codec.Size < 0 { + if codec.Ptr >= len(buffer) { + return ErrBufferInsufficient + } + + codec.Size++ + *value = (*value << unit) | int64(buffer[codec.Ptr]) + codec.Ptr++ + } + return nil +} diff --git a/encoding/nvarint_test.go b/encoding/nvarint_test.go new file mode 100644 index 0000000..3a7a04c --- /dev/null +++ b/encoding/nvarint_test.go @@ -0,0 +1,108 @@ +package encoding + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNInt32(t *testing.T) { + testNVarInt32(t, -1, []byte{0xFF}) + testNVarInt32(t, -5, []byte{0xFB}) + testNVarInt32(t, 63, []byte{0x3F}) + testNVarInt32(t, -65, []byte{0xBF}) + testNVarInt32(t, 127, []byte{0x7F}) + testNVarInt32(t, 255, []byte{0x00, 0xFF}) + testNVarInt32(t, -4097, []byte{0xEF, 0xFF}) + testNVarInt32(t, -8193, []byte{0xDF, 0xFF}) + testNVarInt32(t, -2097152, []byte{0xE0, 0x00, 0x00}) + testNVarInt32(t, -134217729, []byte{0xF7, 0xFF, 0xFF, 0xFF}) + testNVarInt32(t, -2147483648, []byte{0x80, 0x00, 0x00, 0x00}) +} + +func TestNUInt32(t *testing.T) { + testNVarUInt32(t, 1, []byte{0x01}) + testNVarUInt32(t, 127, []byte{0x7F}) + testNVarUInt32(t, 128, []byte{0x00, 0x80}) + testNVarUInt32(t, 130, []byte{0x00, 0x82}) + testNVarUInt32(t, 1048576, []byte{0x10, 0x00, 0x00}) + testNVarUInt32(t, 134217728, []byte{0x08, 0x00, 0x00, 0x00}) + testNVarUInt32(t, 4294967295, []byte{0xFF}) +} + +func TestNInt64(t *testing.T) { + testNVarInt64(t, 0, []byte{0x00}) + testNVarInt64(t, 1, []byte{0x01}) + testNVarInt64(t, -1, []byte{0xFF}) +} + +func TestNUInt64(t *testing.T) { + testNVarUInt64(t, 0, []byte{0x00}) + testNVarUInt64(t, 1, []byte{0x01}) + testNVarUInt64(t, 18446744073709551615, []byte{0xFF}) +} + +func testNVarInt32(t *testing.T, value int32, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%X): %X", value, uint32(value), bytes) + var size = SizeOfNVarInt32(value) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodeNVarInt32(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val int32 + codec = VarCodec{Size: len(bytes)} + assert.Nil(t, codec.DecodeNVarInt32(bytes, &val), msg) + assert.Equal(t, value, val, msg) +} + +func testNVarUInt32(t *testing.T, value uint32, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%X): %X", value, value, bytes) + var size = SizeOfNVarUInt32(value) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodeNVarUInt32(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val uint32 + codec = VarCodec{Size: len(bytes)} + assert.Nil(t, codec.DecodeNVarUInt32(bytes, &val), msg) + assert.Equal(t, value, val, msg) +} + +func testNVarInt64(t *testing.T, value int64, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%X): %X", value, uint64(value), bytes) + var size = SizeOfNVarInt64(value) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodeNVarInt64(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val int64 + codec = VarCodec{Size: len(bytes)} + assert.Nil(t, codec.DecodeNVarInt64(bytes, &val), msg) + assert.Equal(t, value, val, msg) +} + +func testNVarUInt64(t *testing.T, value uint64, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%X): %X", value, value, bytes) + var size = SizeOfNVarUInt64(value) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodeNVarUInt64(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val uint64 + codec = VarCodec{Size: len(bytes)} + assert.Nil(t, codec.DecodeNVarUInt64(bytes, &val), msg) + assert.Equal(t, value, val) +} diff --git a/encoding/pvarbool.go b/encoding/pvarbool.go new file mode 100644 index 0000000..58fbdb7 --- /dev/null +++ b/encoding/pvarbool.go @@ -0,0 +1,27 @@ +package encoding + +// EncodePVarBool encode value as PVarBool to buffer +func (codec *VarCodec) EncodePVarBool(buffer []byte, value bool) error { + tmp := int64(1) + if !value { + tmp = int64(0) + } + return codec.encodePVarInt(buffer, tmp) +} + +// DecodePVarBool decode to value as PVarBool from buffer +func (codec *VarCodec) DecodePVarBool(buffer []byte, value *bool) error { + if len(buffer) == 0 { + *value = false + return nil + } + + var tmp int64 + var err = codec.decodePVarInt(buffer, &tmp) + if tmp == 1 { + *value = true + } else { + *value = false + } + return err +} diff --git a/encoding/pvarbool_test.go b/encoding/pvarbool_test.go new file mode 100644 index 0000000..29a3289 --- /dev/null +++ b/encoding/pvarbool_test.go @@ -0,0 +1,29 @@ +package encoding + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPVarBool(t *testing.T) { + testPVarBool(t, true, []byte{0x01}) + testPVarBool(t, false, []byte{0x00}) +} + +func testPVarBool(t *testing.T, value bool, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%v): %X", value, value, bytes) + var size = SizeOfPVarUInt32(uint32(1)) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodePVarBool(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val bool + codec = VarCodec{} + assert.Nil(t, codec.DecodePVarBool(bytes, &val), msg) + assert.Equal(t, value, val, msg) +} diff --git a/encoding/pvarint.go b/encoding/pvarint.go new file mode 100644 index 0000000..ca1171e --- /dev/null +++ b/encoding/pvarint.go @@ -0,0 +1,147 @@ +package encoding + +import ( + "errors" +) + +// SizeOfPVarInt32 return the buffer size after encoding value as PVarInt32 +func SizeOfPVarInt32(value int32) int { + return sizeOfPVarInt(int64(value), 32) +} + +// EncodePVarInt32 encode value as PVarInt32 to buffer +func (codec *VarCodec) EncodePVarInt32(buffer []byte, value int32) error { + return codec.encodePVarInt(buffer, int64(value)) +} + +// DecodePVarInt32 decode to value as PVarInt32 from buffer +func (codec *VarCodec) DecodePVarInt32(buffer []byte, value *int32) error { + var val = int64(*value) + var err = codec.decodePVarInt(buffer, &val) + *value = int32(val) + return err +} + +// SizeOfPVarUInt32 return the buffer size after encoding value as PVarUInt32 +func SizeOfPVarUInt32(value uint32) int { + return sizeOfPVarInt(int64(int32(value)), 32) +} + +// EncodePVarUInt32 encode value as PVarUInt32 to buffer +func (codec *VarCodec) EncodePVarUInt32(buffer []byte, value uint32) error { + return codec.encodePVarInt(buffer, int64(int32(value))) +} + +// DecodePVarUInt32 decode to value as PVarUInt32 from buffer +func (codec *VarCodec) DecodePVarUInt32(buffer []byte, value *uint32) error { + var val = int64(int32(*value)) + var err = codec.decodePVarInt(buffer, &val) + *value = uint32(val) + return err +} + +// SizeOfPVarInt64 return the buffer size after encoding value as PVarInt64 +func SizeOfPVarInt64(value int64) int { + return sizeOfPVarInt(value, 64) +} + +// EncodePVarInt64 encode value as PVarInt64 to buffer +func (codec *VarCodec) EncodePVarInt64(buffer []byte, value int64) error { + return codec.encodePVarInt(buffer, value) +} + +// DecodePVarInt64 decode to value as PVarInt64 from buffer +func (codec *VarCodec) DecodePVarInt64(buffer []byte, value *int64) error { + return codec.decodePVarInt(buffer, value) +} + +// SizeOfPVarUInt64 return the buffer size after encoding value as PVarUInt64 +func SizeOfPVarUInt64(value uint64) int { + return sizeOfPVarInt(int64(value), 64) +} + +// EncodePVarUInt64 encode value as PVarUInt64 to buffer +func (codec *VarCodec) EncodePVarUInt64(buffer []byte, value uint64) error { + return codec.encodePVarInt(buffer, int64(value)) +} + +// DecodePVarUInt64 decode to value as PVarUInt64 from buffer +func (codec *VarCodec) DecodePVarUInt64(buffer []byte, value *uint64) error { + var val = int64(*value) + var err = codec.decodePVarInt(buffer, &val) + *value = uint64(val) + return err +} + +func sizeOfPVarInt(value int64, width int) int { + const unit = 7 // bit width of encoding unit + + var lead = value >> (width - 1) + for size := width / unit; size > 0; size-- { + var lookAhead = value >> (size*unit - 1) + if lookAhead != lead { + return size + 1 + } + } + return 1 +} + +func (codec *VarCodec) encodePVarInt(buffer []byte, value int64) error { + if codec == nil || codec.Size == 0 { + return errors.New("nothing to encode") + } + if codec.Ptr >= len(buffer) { + return ErrBufferInsufficient + } + + const unit = 7 // bit width of encoding unit + const more = -1 << 7 // continuation bits + for codec.Size > 1 { + codec.Size-- + var part = value >> (codec.Size * unit) + + buffer[codec.Ptr] = byte(part | more) + codec.Ptr++ + + if codec.Ptr >= len(buffer) { + return ErrBufferInsufficient + } + } + + const mask = -1 ^ (-1 << unit) // mask for encoding unit + codec.Size = 0 + buffer[codec.Ptr] = byte(value & mask) + codec.Ptr++ + return nil +} + +func (codec *VarCodec) decodePVarInt(buffer []byte, value *int64) error { + if codec == nil { + return errors.New("nothing to decode") + } + if codec.Ptr >= len(buffer) { + return ErrBufferInsufficient + } + + const unit = 7 // bit width of encoding unit + if codec.Size == 0 { // initialize sign bit + const flag = 8 - unit // bit width for non-encoding bits + *value = int64(int8(buffer[codec.Ptr]) << flag >> unit) + } + + const mask = -1 ^ (-1 << unit) // mask for encoding unit + for { + var part = int8(buffer[codec.Ptr]) + codec.Ptr++ + + codec.Size++ + *value = (*value << unit) | int64(mask & part) + + if part >= 0 { // it's the last byte + return nil + } + if codec.Ptr >= len(buffer) { + return ErrBufferInsufficient + } + } +} diff --git a/encoding/pvarint_test.go b/encoding/pvarint_test.go new file mode 100644 index 0000000..6c992df --- /dev/null +++ b/encoding/pvarint_test.go @@ -0,0 +1,108 @@ +package encoding + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPInt32(t *testing.T) { + testPVarInt32(t, -1, []byte{0x7F}) + testPVarInt32(t, -5, []byte{0x7B}) + testPVarInt32(t, 63, []byte{0x3F}) + testPVarInt32(t, -65, []byte{0xFF, 0x3F}) + testPVarInt32(t, 127, []byte{0x80, 0x7F}) + testPVarInt32(t, 255, []byte{0x81, 0x7F}) + testPVarInt32(t, -4097, []byte{0xDF, 0x7F}) + testPVarInt32(t, -8193, []byte{0xFF, 0xBF, 0x7F}) + testPVarInt32(t, -2097152, []byte{0xFF, 0x80, 0x80, 0x00}) + testPVarInt32(t, -134217729, []byte{0xFF, 0xBF, 0xFF, 0xFF, 0x7F}) + testPVarInt32(t, -2147483648, []byte{0xF8, 0x80, 0x80, 0x80, 0x00}) +} + +func TestPUInt32(t *testing.T) { + testPVarUInt32(t, 1, []byte{0x01}) + testPVarUInt32(t, 127, []byte{0x80, 0x7F}) + testPVarUInt32(t, 128, []byte{0x81, 0x00}) + testPVarUInt32(t, 130, []byte{0x81, 0x02}) + testPVarUInt32(t, 1048576, []byte{0x80, 0xC0, 0x80, 0x00}) + testPVarUInt32(t, 134217728, []byte{0x80, 0xC0, 0x80, 0x80, 0x00}) + testPVarUInt32(t, 4294967295, []byte{0x7F}) +} + +func TestPInt64(t *testing.T) { + testPVarInt64(t, 0, []byte{0x00}) + testPVarInt64(t, 1, []byte{0x01}) + testPVarInt64(t, -1, []byte{0x7F}) +} + +func TestPUInt64(t *testing.T) { + testPVarUInt64(t, 0, []byte{0x00}) + testPVarUInt64(t, 1, []byte{0x01}) + testPVarUInt64(t, 18446744073709551615, []byte{0x7F}) +} + +func testPVarInt32(t *testing.T, value int32, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%X): %X", value, uint32(value), bytes) + var size = SizeOfPVarInt32(value) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodePVarInt32(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val int32 + codec = VarCodec{} + assert.Nil(t, codec.DecodePVarInt32(bytes, &val), msg) + assert.Equal(t, value, val, msg) +} + +func testPVarUInt32(t *testing.T, value uint32, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%X): %X", value, value, bytes) + var size = SizeOfPVarUInt32(value) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodePVarUInt32(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val uint32 + codec = VarCodec{} + assert.Nil(t, codec.DecodePVarUInt32(bytes, &val), msg) + assert.Equal(t, value, val, msg) +} + +func testPVarInt64(t *testing.T, value int64, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%X): %X", value, uint64(value), bytes) + var size = SizeOfPVarInt64(value) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodePVarInt64(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val int64 + codec = VarCodec{} + assert.Nil(t, codec.DecodePVarInt64(bytes, &val), msg) + assert.Equal(t, value, val, msg) +} + +func testPVarUInt64(t *testing.T, value uint64, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%X): %X", value, value, bytes) + var size = SizeOfPVarUInt64(value) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodePVarUInt64(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val uint64 + codec = VarCodec{} + assert.Nil(t, codec.DecodePVarUInt64(bytes, &val), msg) + assert.Equal(t, value, val) +} diff --git a/encoding/varcodec.go b/encoding/varcodec.go new file mode 100644 index 0000000..01fffbd --- /dev/null +++ b/encoding/varcodec.go @@ -0,0 +1,17 @@ +package encoding + +import ( + "errors" +) + +// ErrBufferInsufficient describes error when encode/decode malformed VarInt +var ErrBufferInsufficient = errors.New("buffer insufficient") + +// VarCodec for encode/decode VarInt +type VarCodec struct { + // next ptr in buf + Ptr int + // Encoder: bytes are to be written, + // Decoder: bytes have been consumed + Size int +} diff --git a/encoding/varfloat.go b/encoding/varfloat.go new file mode 100644 index 0000000..e281980 --- /dev/null +++ b/encoding/varfloat.go @@ -0,0 +1,118 @@ +package encoding + +import ( + "errors" + "math" + mbit "math/bits" +) + +// SizeOfVarFloat32 return the buffer size after encoding value as VarFloat32 +func SizeOfVarFloat32(value float32) int { + return sizeOfVarFloat(uint64(math.Float32bits(value)), 4) +} + +// EncodeVarFloat32 encode value as VarFloat32 to buffer +func (codec *VarCodec) EncodeVarFloat32(buffer []byte, value float32) error { + return codec.encodeVarFloat(buffer, uint64(math.Float32bits(value)), 4) +} + +// DecodeVarFloat32 decode to value as VarFloat32 from buffer +func (codec *VarCodec) DecodeVarFloat32(buffer []byte, value *float32) error { + var bits = uint64(math.Float32bits(*value)) + var err = codec.decodeVarFloat(buffer, &bits, 4) + *value = math.Float32frombits(uint32(bits)) + return err +} + +// SizeOfVarFloat64 return the buffer size after encoding value as VarFloat32 +func SizeOfVarFloat64(value float64) int { + return sizeOfVarFloat(math.Float64bits(value), 8) +} + +// EncodeVarFloat64 encode value as VarFloat64 to buffer +func (codec *VarCodec) EncodeVarFloat64(buffer []byte, value float64) error { + return codec.encodeVarFloat(buffer, math.Float64bits(value), 8) +} + +// DecodeVarFloat64 decode to value as VarFloat64 from buffer +func (codec *VarCodec) DecodeVarFloat64(buffer []byte, value *float64) error { + var bits = math.Float64bits(*value) + var err = codec.decodeVarFloat(buffer, &bits, 8) + *value = math.Float64frombits(bits) + return err +} + +func sizeOfVarFloat(bits uint64, width int) int { + const unit = 8 // bit width of encoding unit + const mask = uint64(0xFF) // mask of encoding unit + + for s := 0; width > 1; s += unit { + if bits & (mask << s) != 0 { + return width + } + width-- + } + return 1 +} + +func (codec *VarCodec) encodeVarFloat(buffer []byte, bits uint64, width int) error { + if codec == nil || codec.Size == 0 { + return errors.New("nothing to encode") + } + + const unit = 8 // bit width of encoding unit + var gap, mask = codec.sizeOfGap(width) + + for (codec.Size & mask) > 0 { + if codec.Ptr >= len(buffer) { + return ErrBufferInsufficient + } + codec.Size-- + buffer[codec.Ptr] = byte(bits >> ((codec.Size & mask + gap) * unit)) + codec.Ptr++ + } + + codec.Size = 0 + return nil +} + +func (codec *VarCodec) decodeVarFloat(buffer []byte, bits *uint64, width int) error { + if codec == nil || codec.Size == 0 { + return errors.New("nothing to decode") + } + + const unit = 8 // bit width of encoding unit + var gap, mask = codec.sizeOfGap(width) + + for (codec.Size & mask) > 0 { + if codec.Ptr >= len(buffer) { + return ErrBufferInsufficient + } + codec.Size-- + *bits = (*bits << unit) | uint64(buffer[codec.Ptr]) + codec.Ptr++ + } + + *bits <<= gap * unit + codec.Size = 0 + return nil +} + +func (codec *VarCodec) sizeOfGap(width int) (int, int) { + var ms = mbit.OnesCount(^uint(0)) // machine bit width for an int + var size = ms - 8 // bit width of effective size + var mask = -1 ^ (-1 << size) // mask of effective size + + var gap = 0 // gap between encoded size and decoded size + if codec.Size > 0 { + if width > codec.Size { + gap = width - codec.Size + } + var sign = -1 << (ms - 1) // single sign bit for an int + codec.Size = sign | (gap << size) | (codec.Size & mask) + } else { + gap = (codec.Size >> size) & 0x7F + } + + return gap, mask +} diff --git a/encoding/varfloat_test.go b/encoding/varfloat_test.go new file mode 100644 index 0000000..1b8ff13 --- /dev/null +++ b/encoding/varfloat_test.go @@ -0,0 +1,61 @@ +package encoding + +import ( + "fmt" + "math" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFloat32(t *testing.T) { + testVarFloat32(t, 0, []byte{0x00}) + testVarFloat32(t, 1, []byte{0x3F, 0x80}) + testVarFloat32(t, 25, []byte{0x41, 0xC8}) + testVarFloat32(t, -2, []byte{0xC0}) + testVarFloat32(t, 0.25, []byte{0x3E, 0x80}) + testVarFloat32(t, 0.375, []byte{0x3E, 0xC0}) + testVarFloat32(t, 12.375, []byte{0x41, 0x46}) + testVarFloat32(t, 68.123, []byte{0x42, 0x88, 0x3E, 0xFA}) +} + +func TestFloat64(t *testing.T) { + testVarFloat64(t, 0, []byte{0x00}) + testVarFloat64(t, 1, []byte{0x3F, 0xF0}) + testVarFloat64(t, 2, []byte{0x40}) + testVarFloat64(t, 23, []byte{0x40, 0x37}) + testVarFloat64(t, -2, []byte{0xC0}) + testVarFloat64(t, 0.01171875, []byte{0x3F, 0x88}) +} + +func testVarFloat32(t *testing.T, value float32, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%X): %X", value, math.Float32bits(value), bytes) + var size = SizeOfVarFloat32(value) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodeVarFloat32(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val float32 + codec = VarCodec{Size: len(bytes)} + assert.Nil(t, codec.DecodeVarFloat32(bytes, &val), msg) + assert.Equal(t, value, val, msg) +} + +func testVarFloat64(t *testing.T, value float64, bytes []byte) { + var msg = fmt.Sprintf("tester %v (%X): %X", value, math.Float64bits(value), bytes) + var size = SizeOfVarFloat64(value) + assert.Equal(t, len(bytes), size, msg) + + buffer := make([]byte, len(bytes)) + codec := VarCodec{Size: size} + assert.Nil(t, codec.EncodeVarFloat64(buffer, value), msg) + assert.Equal(t, bytes, buffer, msg) + + var val float64 + codec = VarCodec{Size: len(bytes)} + assert.Nil(t, codec.DecodeVarFloat64(bytes, &val), msg) + assert.Equal(t, value, val, msg) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d9c2d83 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module mq_coder + +go 1.18 + +replace ( + git.hpds.cc/Component/mq_coder/encoding latest => target mq_coder/encoding + git.hpds.cc/Component/mq_coder/spec latest => target mq_coder/spec +) \ No newline at end of file diff --git a/packet.go b/packet.go new file mode 100644 index 0000000..4a3df2f --- /dev/null +++ b/packet.go @@ -0,0 +1,156 @@ +package mq_coder + +import ( + "bytes" + "io" + + "git.hpds.cc/Component/mq_coder/encoding" + "git.hpds.cc/Component/mq_coder/spec" +) + +// StreamPacket implement the Packet interface. +type StreamPacket struct { + t spec.T + l spec.L + vbuf []byte + vr io.Reader + chunkMode bool + chunkSize int +} + +var _ Packet = &StreamPacket{} + +// SeqId returns the sequenceId of this packet +func (p *StreamPacket) SeqId() int { return p.t.Sid() } + +// Size returns the size of whole packet. +func (p *StreamPacket) Size() int { + // T.Size + L.Size + V.Size + return p.t.Size() + p.l.Size() + p.l.VSize() +} + +// VSize returns the size of V. +func (p *StreamPacket) VSize() int { return p.l.VSize() } + +// Bytes return the raw bytes of this packet. V will be absent if +// is in chunked mode +func (p *StreamPacket) Bytes() []byte { + buf := new(bytes.Buffer) + // the raw bytes of T and L + p.writeTL(buf) + // p.valbuf stores the raw bytes of V + buf.Write(p.vbuf) + + return buf.Bytes() +} + +// VReader return an io.Reader which can be read as the content of V. +func (p *StreamPacket) VReader() io.Reader { + if !p.chunkMode { + return bytes.NewReader(p.vbuf) + } + return p.vr +} + +// Reader return an io.Reader which can be read as the whole bytes of +// this packet. This function only available if this V of packet is in +// chunked mode. +func (p *StreamPacket) Reader() io.Reader { + if !p.chunkMode { + buf := new(bytes.Buffer) + buf.Write(p.t.Bytes()) + buf.Write(p.l.Bytes()) + buf.Write(p.vbuf) + return buf + } + + buf := new(bytes.Buffer) + // T and L of this packet + p.writeTL(buf) + // V of this packet + buf.Write(p.vbuf) + + return &chunkVReader{ + buf: buf, + src: p.vr, + totalSize: p.Size(), + ChunkVSize: p.VSize(), + } +} + +// IsStreamMode returns a bool value indicates if the V of +// this packet is in stream mode +func (p *StreamPacket) IsStreamMode() bool { + return p.chunkMode +} + +// IsNodeMode returns a bool value indicates if this packet +// is node mode +func (p *StreamPacket) IsNodeMode() bool { + return p.t.IsNodeMode() +} + +// write the raw bytes of T and L to given buf +func (p *StreamPacket) writeTL(buf *bytes.Buffer) { + buf.Write(p.t.Bytes()) + buf.Write(p.l.Bytes()) +} + +// BytesV return V as bytes +func (p *StreamPacket) BytesV() []byte { + return p.vbuf +} + +// UTF8StringV return V as utf-8 string +func (p *StreamPacket) UTF8StringV() string { + return string(p.vbuf) +} + +// Int32V return V as int32 +func (p *StreamPacket) Int32V() (val int32, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeNVarInt32(p.vbuf, &val) + return val, err +} + +// UInt32V return V as uint32 +func (p *StreamPacket) UInt32V() (val uint32, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeNVarUInt32(p.vbuf, &val) + return val, err +} + +// Int64V return V as int64 +func (p *StreamPacket) Int64V() (val int64, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeNVarInt64(p.vbuf, &val) + return val, err +} + +// UInt64V return V as uint64 +func (p *StreamPacket) UInt64V() (val uint64, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeNVarUInt64(p.vbuf, &val) + return val, err +} + +// Float32V return V as float32 +func (p *StreamPacket) Float32V() (val float32, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeVarFloat32(p.vbuf, &val) + return val, err +} + +// Float64V return V as float64 +func (p *StreamPacket) Float64V() (val float64, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeVarFloat64(p.vbuf, &val) + return val, err +} + +// BoolV return V as bool +func (p *StreamPacket) BoolV() (val bool, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodePVarBool(p.vbuf, &val) + return val, err +} diff --git a/spec/spec.go b/spec/spec.go new file mode 100644 index 0000000..0258b7d --- /dev/null +++ b/spec/spec.go @@ -0,0 +1,26 @@ +package spec + +import ( + "errors" + "io" +) + +const ( + maxSeqID = 0x3F + flagBitNode = 0x80 + wipeFlagBits = 0x3F + msb = 0x80 +) + +var ( + errInvalidSeqID = errors.New("y3.Builder: SeqID should >= 0 and =< 0x3F") +) + +func readByte(reader io.Reader) (byte, error) { + var b [1]byte + n, err := reader.Read(b[:]) + if n == 0 { + return 0x00, err + } + return b[0], err +} diff --git a/spec/spec.puml b/spec/spec.puml new file mode 100644 index 0000000..ab6a65c --- /dev/null +++ b/spec/spec.puml @@ -0,0 +1,27 @@ +@startuml +namespace spec { + class L << (S,Aquamarine) >> { + - buf []byte + - size int + - len int + + + Bytes() []byte + + Size() int + + VSize() int + + } + class T << (S,Aquamarine) >> { + + Sid() int + + Bytes() []byte + + IsNodeMode() bool + + SetNodeMode(flag bool) + + Size() int + + } + class spec.T << (T, #FF7700) >> { + } +} + + +"__builtin__.byte" #.. "spec.T" +@enduml \ No newline at end of file diff --git a/spec/tlv.t.go b/spec/tlv.t.go new file mode 100644 index 0000000..84671eb --- /dev/null +++ b/spec/tlv.t.go @@ -0,0 +1,54 @@ +package spec + +import ( + "io" +) + +// T is the Tag in a TLV structure +type T byte + +// NewT returns a T with sequenceID. If this packet contains other +// packets, this packet will be a "node packet", the T of this packet +// will set MSB to T. +func NewT(seqID int) (T, error) { + if seqID < 0 || seqID > maxSeqID { + return 0, errInvalidSeqID + } + + return T(seqID), nil +} + +// Sid returns the sequenceID of this packet. +func (t T) Sid() int { + return int(t & wipeFlagBits) +} + +// Bytes returns raw bytes of T. +func (t T) Bytes() []byte { + return []byte{byte(t)} +} + +// IsNodeMode will return true if this packet contains other packets. +// Otherwise return flase. +func (t T) IsNodeMode() bool { + return t&flagBitNode == flagBitNode +} + +// SetNodeMode will set T to indicates this packet contains +// other packets. +func (t *T) SetNodeMode(flag bool) { + if flag { + *t |= flagBitNode + } +} + +// Size return the size of T raw bytes. +func (t T) Size() int { + return 1 +} + +// ReadT read T from a bufio.Reader +func ReadT(rd io.Reader) (T, error) { + b, err := readByte(rd) + return T(b), err +} diff --git a/spec/tvl.l.go b/spec/tvl.l.go new file mode 100644 index 0000000..8ce8281 --- /dev/null +++ b/spec/tvl.l.go @@ -0,0 +1,90 @@ +package spec + +import ( + "bytes" + "errors" + "io" + + //"git.hpds.cc/Component/mq_coder/encoding" + "mq_coder/encoding" +) + +// L is the Length in a TLV structure +type L struct { + buf []byte + size int + len int +} + +// NewL will take an int type len as parameter and return L to +// represent the size of V in a TLV. an integer will be encoded as +// a PVarInt32 type to represent the value. +func NewL(len int) (L, error) { + var l = L{} + if len < -1 { + return l, errors.New("y3.L: len can't less than -1") + } + + valLen := int32(len) + l.size = encoding.SizeOfPVarInt32(valLen) + codec := encoding.VarCodec{Size: l.size} + tmp := make([]byte, l.size) + err := codec.EncodePVarInt32(tmp, valLen) + if err != nil { + panic(err) + } + l.buf = make([]byte, l.size) + copy(l.buf, tmp) + l.len = len + return l, nil +} + +// Bytes will return the raw bytes of L. +func (l L) Bytes() []byte { + return l.buf +} + +// Size returns how many bytes used to represent this L. +func (l L) Size() int { + return l.size +} + +// VSize returns the size of V. +func (l L) VSize() int { + return int(l.len) +} + +// ReadL read L from bufio.Reader +func ReadL(r io.Reader) (*L, error) { + lenBuf := bytes.Buffer{} + for { + b, err := readByte(r) + if err != nil { + return nil, err + } + lenBuf.WriteByte(b) + if b&msb != msb { + break + } + } + + buf := lenBuf.Bytes() + + // decode to L + length, err := decodeL(buf) + if err != nil { + return nil, err + } + + return &L{ + buf: buf, + len: int(length), + size: len(buf), + }, nil +} + +func decodeL(buf []byte) (length int32, err error) { + codec := encoding.VarCodec{} + err = codec.DecodePVarInt32(buf, &length) + return length, err +}