Compare commits
2 Commits
Author | SHA1 | Date |
---|---|---|
wangjian | afc76d7d15 | |
wangjian | 174ae7ae33 |
|
@ -1,72 +0,0 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
type chunkVReader struct {
|
||||
src io.Reader // the reader parts of V
|
||||
buf *bytes.Buffer // the bytes part of V
|
||||
totalSize int // size of whole buffer of this packet
|
||||
off int // last read op
|
||||
ChunkVSize int // the size of chunked V
|
||||
}
|
||||
|
||||
// Read implement io.Reader interface
|
||||
func (r *chunkVReader) Read(p []byte) (n int, err error) {
|
||||
if r.src == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if r.off >= r.totalSize {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
if r.off < r.totalSize-r.ChunkVSize {
|
||||
n, err := r.buf.Read(p)
|
||||
r.off += n
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return n, nil
|
||||
} else {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
n, err = r.src.Read(p)
|
||||
r.off += n
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// WriteTo implement io.WriteTo interface
|
||||
func (r *chunkVReader) WriteTo(w io.Writer) (n int64, err error) {
|
||||
if r.src == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// first, write existed buffer
|
||||
m, err := w.Write(r.buf.Bytes())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n += int64(m)
|
||||
|
||||
// last, write from reader
|
||||
buf, err := ioutil.ReadAll(r.src)
|
||||
if err != nil && err != io.EOF {
|
||||
return 0, errWriteFromReader
|
||||
}
|
||||
m, err = w.Write(buf)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n += int64(m)
|
||||
return n, nil
|
||||
}
|
54
coder.go
54
coder.go
|
@ -1,54 +0,0 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
var (
|
||||
errBuildIncomplete = errors.New("coder.Encoder: invalid structure of packet")
|
||||
errInvalidAdding = errors.New("coder.Encoder: can not add this Packet after StreamPacket has been add")
|
||||
errNonStreamPacket = errors.New("coder.Packet: this packet is not in node mode")
|
||||
errWriteFromReader = errors.New("coder.streamV: write from reader error")
|
||||
errNotNodeMode = errors.New("coder.Encoder: packet should be in node mode can be add other packets as child")
|
||||
errNilReader = errors.New("coder.Decoder: nil source reader")
|
||||
)
|
||||
|
||||
// Packet 编解码器包
|
||||
type Packet interface {
|
||||
// SeqId 返回此数据包的序列Id.
|
||||
SeqId() int
|
||||
// Size 返回整个数据包的大小
|
||||
Size() int
|
||||
// VSize 返回V的大小.
|
||||
VSize() int
|
||||
// Bytes 此数据包的全部字节.
|
||||
Bytes() []byte
|
||||
// Reader 返回 io.Reader 字节.
|
||||
Reader() io.Reader
|
||||
// VReader 返回 io.Reader 的 流字节.
|
||||
VReader() io.Reader
|
||||
// IsStreamMode 是否流模式
|
||||
IsStreamMode() bool
|
||||
// IsNodeMode 是否节点模式
|
||||
IsNodeMode() bool
|
||||
|
||||
// BytesV 返回流字节
|
||||
BytesV() []byte
|
||||
// Utf8StringV 返回流的utf8字符串值
|
||||
Utf8StringV() string
|
||||
// Int32V 返回流的int32值
|
||||
Int32V() (val int32, err error)
|
||||
// UInt32V 返回流的uint32值
|
||||
UInt32V() (val uint32, err error)
|
||||
// Int64V 返回流的int64值
|
||||
Int64V() (val int64, err error)
|
||||
// UInt64V 返回流的uint64值
|
||||
UInt64V() (val uint64, err error)
|
||||
// Float32V 返回流的float32值
|
||||
Float32V() (val float32, err error)
|
||||
// Float64V 返回流的float64值
|
||||
Float64V() (val float64, err error)
|
||||
// BoolV 返回流的bool值
|
||||
BoolV() (val bool, err error)
|
||||
}
|
93
decoder.go
93
decoder.go
|
@ -1,93 +0,0 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
|
||||
"git.hpds.cc/Component/mq_coder/spec"
|
||||
)
|
||||
|
||||
// Decoder is the tool for decoding packet from stream
|
||||
type Decoder struct {
|
||||
tag spec.T
|
||||
len *spec.L
|
||||
rd io.Reader
|
||||
}
|
||||
|
||||
// NewDecoder returns a Decoder from an io.Reader
|
||||
func NewDecoder(reader io.Reader) *Decoder {
|
||||
return &Decoder{
|
||||
rd: reader,
|
||||
}
|
||||
}
|
||||
|
||||
// SeqId return the SequenceID of the decoding packet
|
||||
func (d *Decoder) SeqId() int {
|
||||
return d.tag.Sid()
|
||||
}
|
||||
|
||||
// UnderlyingReader returns the reader this decoder using
|
||||
func (d *Decoder) UnderlyingReader() io.Reader {
|
||||
return d.rd
|
||||
}
|
||||
|
||||
// ReadHeader will block until io.EOF or receive T and L of a packet.
|
||||
func (d *Decoder) ReadHeader() error {
|
||||
// only read T and L
|
||||
return d.readTL()
|
||||
}
|
||||
|
||||
// GetChunkedPacket will block until io.EOF or receive V of a packet in chunked mode.
|
||||
func (d *Decoder) GetChunkedPacket() Packet {
|
||||
return &StreamPacket{
|
||||
t: d.tag,
|
||||
l: *d.len,
|
||||
vr: d.rd,
|
||||
chunkMode: true,
|
||||
chunkSize: d.len.VSize(),
|
||||
}
|
||||
}
|
||||
|
||||
// GetFullFilledPacket read full Packet from given io.Reader
|
||||
func (d *Decoder) GetFullFilledPacket() (packet Packet, err error) {
|
||||
// read V
|
||||
buf := new(bytes.Buffer)
|
||||
total := 0
|
||||
for {
|
||||
valBuf := make([]byte, d.len.VSize())
|
||||
n, err := d.rd.Read(valBuf)
|
||||
if n > 0 {
|
||||
total += n
|
||||
buf.Write(valBuf[:n])
|
||||
}
|
||||
if total >= d.len.VSize() || err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
packet = &StreamPacket{
|
||||
t: d.tag,
|
||||
l: *d.len,
|
||||
vBuf: buf.Bytes(),
|
||||
chunkMode: false,
|
||||
}
|
||||
|
||||
return packet, nil
|
||||
}
|
||||
|
||||
func (d *Decoder) readTL() (err error) {
|
||||
if d.rd == nil {
|
||||
return errNilReader
|
||||
}
|
||||
|
||||
// read T
|
||||
d.tag, err = spec.ReadT(d.rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// read L
|
||||
d.len, err = spec.ReadL(d.rd)
|
||||
|
||||
return err
|
||||
}
|
185
encoder.go
185
encoder.go
|
@ -2,157 +2,76 @@ package mq_coder
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
|
||||
"git.hpds.cc/Component/mq_coder/spec"
|
||||
"fmt"
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
)
|
||||
|
||||
// Encoder is the tool for creating a packet easily
|
||||
type Encoder struct {
|
||||
tag spec.T
|
||||
len *spec.L
|
||||
valReader io.Reader
|
||||
valReaderSize int
|
||||
nodes map[int]Packet
|
||||
state int
|
||||
size int32 // size of value
|
||||
isStreamMode bool
|
||||
valBuf *bytes.Buffer
|
||||
done bool
|
||||
seqId int
|
||||
isNodeMode bool
|
||||
// Encoder will encode object to encoding
|
||||
type encoder struct {
|
||||
seqID byte
|
||||
valBuf []byte
|
||||
isNode bool
|
||||
isArray bool
|
||||
buf *bytes.Buffer
|
||||
complete bool
|
||||
}
|
||||
|
||||
// SetSeqId set sequenceId of a packet, if this packet contains other
|
||||
// packets, isNode should set to true
|
||||
func (b *Encoder) SetSeqId(seqId int, isNode bool) {
|
||||
// init
|
||||
b.valBuf = new(bytes.Buffer)
|
||||
b.nodes = make(map[int]Packet)
|
||||
// set seqId
|
||||
b.seqId = seqId
|
||||
b.isNodeMode = isNode
|
||||
type iEncoder interface {
|
||||
Encode() []byte
|
||||
}
|
||||
|
||||
// SetBytesV set bytes type as V
|
||||
func (b *Encoder) SetBytesV(buf []byte) {
|
||||
b.size += int32(len(buf))
|
||||
b.valBuf.Write(buf)
|
||||
b.isStreamMode = false
|
||||
b.state |= 0x04
|
||||
func (enc *encoder) GetValBuf() []byte {
|
||||
return enc.valBuf
|
||||
}
|
||||
|
||||
// SetReaderV set io.Reader type as V
|
||||
func (b *Encoder) SetReaderV(r io.Reader, size int) {
|
||||
b.isStreamMode = true
|
||||
b.valReader = r
|
||||
b.state |= 0x04
|
||||
b.size += int32(size)
|
||||
b.valReaderSize = size
|
||||
func (enc *encoder) IsEmpty() bool {
|
||||
return len(enc.valBuf) == 0
|
||||
}
|
||||
|
||||
// AddPacket add a Packet child to this packet, this packet must be NodeMode
|
||||
func (b *Encoder) AddPacket(child Packet) error {
|
||||
// only packet is in node mode can add other packets
|
||||
if !b.isNodeMode {
|
||||
return errNotNodeMode
|
||||
func (enc *encoder) AddBytes(buf []byte) {
|
||||
enc.valBuf = append(enc.valBuf, buf...)
|
||||
}
|
||||
|
||||
if b.done {
|
||||
return errInvalidAdding
|
||||
}
|
||||
b.nodes[child.SeqId()] = child
|
||||
buf := child.Bytes()
|
||||
b.SetBytesV(buf)
|
||||
return nil
|
||||
func (enc *encoder) addRawPacket(en iEncoder) {
|
||||
enc.valBuf = append(enc.valBuf, en.Encode()...)
|
||||
}
|
||||
|
||||
// AddStreamPacket will put a StreamPacket in chunked mode to current packet.
|
||||
func (b *Encoder) AddStreamPacket(child Packet) (err error) {
|
||||
// if this packet is in stream mode, can not add any packets
|
||||
if b.done {
|
||||
return errInvalidAdding
|
||||
// setTag write tag as seqID
|
||||
func (enc *encoder) writeTag() {
|
||||
if enc.seqID > 0x3F {
|
||||
panic(fmt.Errorf("sid should be in [0..0x3F]"))
|
||||
}
|
||||
if enc.isNode {
|
||||
enc.seqID = enc.seqID | 0x80
|
||||
}
|
||||
if enc.isArray {
|
||||
enc.seqID = enc.seqID | 0x40
|
||||
}
|
||||
enc.buf.WriteByte(enc.seqID)
|
||||
}
|
||||
|
||||
// only accept packet in stream mode
|
||||
if !child.IsStreamMode() {
|
||||
return errNonStreamPacket
|
||||
}
|
||||
|
||||
// set the valReader of this packet to the child's
|
||||
b.valReader = child.VReader()
|
||||
|
||||
// valReaderSize will be the same as child's
|
||||
b.valReaderSize = child.VSize()
|
||||
// add this child packet
|
||||
b.nodes[child.SeqId()] = child
|
||||
// add the size of child's V to L of this packet
|
||||
b.size += int32(child.Size())
|
||||
// put the bytes of child to valBuf
|
||||
buf := child.Bytes()
|
||||
b.valBuf.Write(buf)
|
||||
// update state
|
||||
b.state |= 0x04
|
||||
b.isStreamMode = true
|
||||
b.done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Packet return a Packet instance.
|
||||
func (b *Encoder) Packet() (Packet, error) {
|
||||
err := b.generateT()
|
||||
func (enc *encoder) writeLengthBuf() {
|
||||
valLen := len(enc.valBuf)
|
||||
size := encoding.SizeOfPVarInt32(int32(valLen))
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
tmp := make([]byte, size)
|
||||
err := codec.EncodePVarInt32(tmp, int32(valLen))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
panic(err)
|
||||
}
|
||||
enc.buf.Write(tmp)
|
||||
}
|
||||
|
||||
err = b.generateL()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Encode returns a final Y3 encoded byte slice
|
||||
func (enc *encoder) Encode() []byte {
|
||||
if !enc.complete {
|
||||
// Tag
|
||||
enc.writeTag()
|
||||
// Len
|
||||
enc.writeLengthBuf()
|
||||
// Val
|
||||
enc.buf.Write(enc.valBuf)
|
||||
enc.complete = true
|
||||
}
|
||||
|
||||
if b.state != 0x07 {
|
||||
return nil, errBuildIncomplete
|
||||
}
|
||||
|
||||
if b.isStreamMode {
|
||||
return &StreamPacket{
|
||||
t: b.tag,
|
||||
l: *b.len,
|
||||
vr: b.valReader,
|
||||
vBuf: b.valBuf.Bytes(),
|
||||
chunkMode: true,
|
||||
chunkSize: b.valReaderSize,
|
||||
}, err
|
||||
}
|
||||
|
||||
// not streaming mode
|
||||
return &StreamPacket{
|
||||
t: b.tag,
|
||||
l: *b.len,
|
||||
vBuf: b.valBuf.Bytes(),
|
||||
chunkMode: false,
|
||||
}, err
|
||||
}
|
||||
|
||||
// will generate T of a TLV.
|
||||
func (b *Encoder) generateT() error {
|
||||
t, err := spec.NewT(b.seqId)
|
||||
t.SetNodeMode(b.isNodeMode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.tag = t
|
||||
b.state |= 0x01
|
||||
return nil
|
||||
}
|
||||
|
||||
// will generate L of a TLV.
|
||||
func (b *Encoder) generateL() error {
|
||||
l, err := spec.NewL(int(b.size))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.len = &l
|
||||
b.state |= 0x02
|
||||
return nil
|
||||
return enc.buf.Bytes()
|
||||
}
|
||||
|
|
|
@ -1,98 +0,0 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
)
|
||||
|
||||
// SetUtf8StringV set utf-8 string type value as V
|
||||
func (b *Encoder) SetUtf8StringV(v string) {
|
||||
buf := []byte(v)
|
||||
b.SetBytesV(buf)
|
||||
}
|
||||
|
||||
// SetInt32V set an int32 type value as V
|
||||
func (b *Encoder) SetInt32V(v int32) error {
|
||||
size := encoding.SizeOfNVarInt32(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
buf := make([]byte, size)
|
||||
err := codec.EncodeNVarInt32(buf, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.SetBytesV(buf)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetUInt32V set an uint32 type value as V
|
||||
func (b *Encoder) SetUInt32V(v uint32) error {
|
||||
size := encoding.SizeOfNVarUInt32(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
buf := make([]byte, size)
|
||||
err := codec.EncodeNVarUInt32(buf, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.SetBytesV(buf)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetInt64V set an int64 type value as V
|
||||
func (b *Encoder) SetInt64V(v int64) error {
|
||||
size := encoding.SizeOfNVarInt64(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
buf := make([]byte, size)
|
||||
err := codec.EncodeNVarInt64(buf, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.SetBytesV(buf)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetUInt64V set an uint64 type value as V
|
||||
func (b *Encoder) SetUInt64V(v uint64) error {
|
||||
size := encoding.SizeOfNVarUInt64(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
buf := make([]byte, size)
|
||||
err := codec.EncodeNVarUInt64(buf, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.SetBytesV(buf)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetFloat32V set a float32 type value as V
|
||||
func (b *Encoder) SetFloat32V(v float32) error {
|
||||
size := encoding.SizeOfVarFloat32(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
buf := make([]byte, size)
|
||||
err := codec.EncodeVarFloat32(buf, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.SetBytesV(buf)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetFloat64V set a float64 type value as V
|
||||
func (b *Encoder) SetFloat64V(v float64) error {
|
||||
size := encoding.SizeOfVarFloat64(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
buf := make([]byte, size)
|
||||
err := codec.EncodeVarFloat64(buf, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.SetBytesV(buf)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetBoolV set bool type value as V
|
||||
func (b *Encoder) SetBoolV(v bool) {
|
||||
var size = encoding.SizeOfPVarUInt32(uint32(1))
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
buf := make([]byte, size)
|
||||
_ = codec.EncodePVarBool(buf, v)
|
||||
b.SetBytesV(buf)
|
||||
}
|
|
@ -60,7 +60,7 @@ func (codec *VarCodec) encodeVarFloat(buffer []byte, bits uint64, width int) err
|
|||
return errors.New("nothing to encode")
|
||||
}
|
||||
|
||||
const unit = 8 // bit width of encoding unit
|
||||
const unit = 8 // the bit width of encoding unit
|
||||
var gap, mask = codec.sizeOfGap(width)
|
||||
|
||||
for (codec.Size & mask) > 0 {
|
||||
|
@ -81,7 +81,7 @@ func (codec *VarCodec) decodeVarFloat(buffer []byte, bits *uint64, width int) er
|
|||
return errors.New("nothing to decode")
|
||||
}
|
||||
|
||||
const unit = 8 // bit width of encoding unit
|
||||
const unit = 8 // the bit width of encoding unit
|
||||
var gap, mask = codec.sizeOfGap(width)
|
||||
|
||||
for (codec.Size & mask) > 0 {
|
||||
|
|
2
go.mod
2
go.mod
|
@ -1,6 +1,6 @@
|
|||
module git.hpds.cc/Component/mq_coder
|
||||
|
||||
go 1.18
|
||||
go 1.19
|
||||
|
||||
require github.com/stretchr/testify v1.8.0
|
||||
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
"git.hpds.cc/Component/mq_coder/utils"
|
||||
)
|
||||
|
||||
func parsePayload(b []byte) (consumedBytes int, ifNodePacket bool, np *NodePacket, pp *PrimitivePacket, err error) {
|
||||
if len(b) == 0 {
|
||||
return 0, false, nil, nil, errors.New("parsePacket params can not be nil")
|
||||
}
|
||||
|
||||
pos := 0
|
||||
// NodePacket
|
||||
if ok := utils.IsNodePacket(b[pos]); ok {
|
||||
np = &NodePacket{}
|
||||
endPos, err := DecodeToNodePacket(b, np)
|
||||
return endPos, true, np, nil, err
|
||||
}
|
||||
|
||||
pp = &PrimitivePacket{}
|
||||
state, err := DecodeToPrimitivePacket(b, pp)
|
||||
return state.ConsumedBytes, false, nil, pp, err
|
||||
}
|
||||
|
||||
// DecodeToNodePacket parse out whole buffer to a NodePacket
|
||||
func DecodeToNodePacket(buf []byte, pct *NodePacket) (consumedBytes int, err error) {
|
||||
if len(buf) == 0 {
|
||||
return 0, errors.New("empty buf")
|
||||
}
|
||||
|
||||
pct.packet = &packet{
|
||||
valBuf: buf,
|
||||
buf: &bytes.Buffer{},
|
||||
}
|
||||
|
||||
pct.NodePackets = map[byte]NodePacket{}
|
||||
pct.PrimitivePackets = map[byte]PrimitivePacket{}
|
||||
|
||||
pos := 0
|
||||
|
||||
// `Tag`
|
||||
tag := NewTag(buf[pos])
|
||||
pct.packet.tag = tag
|
||||
pct.buf.WriteByte(buf[pos])
|
||||
pos++
|
||||
|
||||
// `Length`: the type is `varInt`
|
||||
tmpBuf := buf[pos:]
|
||||
var valLen int32
|
||||
codec := encoding.VarCodec{}
|
||||
err = codec.DecodePVarInt32(tmpBuf, &valLen)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
pct.packet.length = int(valLen)
|
||||
pct.buf.Write(buf[pos : pos+codec.Size])
|
||||
pos += codec.Size
|
||||
// if `Length` is 0, means empty node packet
|
||||
if valLen == 0 {
|
||||
return pos, nil
|
||||
}
|
||||
|
||||
// `Value`
|
||||
// `raw` is pct.Length() length
|
||||
vl := int(valLen)
|
||||
if vl < 0 {
|
||||
return pos, errors.New("found L of V smaller than 0")
|
||||
}
|
||||
endPos := pos + vl
|
||||
pct.packet.valBuf = buf[pos:endPos]
|
||||
pct.buf.Write(buf[pos:endPos])
|
||||
|
||||
// Parse value to Packet
|
||||
for {
|
||||
if pos >= endPos || pos >= len(buf) {
|
||||
break
|
||||
}
|
||||
_p, isNode, np, pp, err := parsePayload(buf[pos:endPos])
|
||||
pos += _p
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if isNode {
|
||||
pct.NodePackets[np.packet.tag.SeqId()] = *np
|
||||
} else {
|
||||
pct.PrimitivePackets[pp.SeqId()] = *pp
|
||||
}
|
||||
}
|
||||
|
||||
consumedBytes = endPos
|
||||
return consumedBytes, nil
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
)
|
||||
|
||||
// NodePacketEncoder used for encode a node packet
|
||||
type NodePacketEncoder struct {
|
||||
*encoder
|
||||
}
|
||||
|
||||
// NewNodePacketEncoder returns an Encoder for node packet
|
||||
func NewNodePacketEncoder(sid byte) *NodePacketEncoder {
|
||||
nodeEnc := &NodePacketEncoder{
|
||||
encoder: &encoder{
|
||||
isNode: true,
|
||||
buf: new(bytes.Buffer),
|
||||
},
|
||||
}
|
||||
|
||||
nodeEnc.seqID = sid
|
||||
return nodeEnc
|
||||
}
|
||||
|
||||
// AddNodePacket add new node to this node
|
||||
func (enc *NodePacketEncoder) AddNodePacket(np *NodePacketEncoder) {
|
||||
enc.addRawPacket(np)
|
||||
}
|
||||
|
||||
// AddPrimitivePacket add new primitive to this node
|
||||
func (enc *NodePacketEncoder) AddPrimitivePacket(np *PrimitivePacketEncoder) {
|
||||
enc.addRawPacket(np)
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
package mq_coder
|
||||
|
||||
// NodePacket describes complex values
|
||||
type NodePacket struct {
|
||||
*packet
|
||||
// NodePackets store all the node packets
|
||||
NodePackets map[byte]NodePacket
|
||||
// PrimitivePackets store all the primitive packets
|
||||
PrimitivePackets map[byte]PrimitivePacket
|
||||
}
|
160
packet.go
160
packet.go
|
@ -2,155 +2,37 @@ package mq_coder
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
"git.hpds.cc/Component/mq_coder/spec"
|
||||
)
|
||||
|
||||
// StreamPacket implement the Packet interface.
|
||||
type StreamPacket struct {
|
||||
t spec.T
|
||||
l spec.L
|
||||
vBuf []byte
|
||||
vr io.Reader
|
||||
chunkMode bool
|
||||
chunkSize int
|
||||
// packet is the base type of the NodePacket and PrimitivePacket
|
||||
type packet struct {
|
||||
tag *Tag
|
||||
length int
|
||||
valBuf []byte
|
||||
buf *bytes.Buffer
|
||||
}
|
||||
|
||||
var _ Packet = &StreamPacket{}
|
||||
|
||||
// SeqId returns the sequenceId of this packet
|
||||
func (p *StreamPacket) SeqId() int { return p.t.Sid() }
|
||||
|
||||
// Size returns the size of whole packet.
|
||||
func (p *StreamPacket) Size() int {
|
||||
// T.Size + L.Size + V.Size
|
||||
return p.t.Size() + p.l.Size() + p.l.VSize()
|
||||
// GetRawBytes get all raw bytes of this packet
|
||||
func (bp *packet) GetRawBytes() []byte {
|
||||
return bp.buf.Bytes()
|
||||
}
|
||||
|
||||
// VSize returns the size of V.
|
||||
func (p *StreamPacket) VSize() int { return p.l.VSize() }
|
||||
|
||||
// Bytes return the raw bytes of this packet. V will be absent if
|
||||
// is in chunked mode
|
||||
func (p *StreamPacket) Bytes() []byte {
|
||||
buf := new(bytes.Buffer)
|
||||
// the raw bytes of T and L
|
||||
p.writeTL(buf)
|
||||
// p.valBuf stores the raw bytes of V
|
||||
buf.Write(p.vBuf)
|
||||
|
||||
return buf.Bytes()
|
||||
// Length return the length of Val this packet
|
||||
func (bp *packet) Length() int {
|
||||
return bp.length
|
||||
}
|
||||
|
||||
// VReader return an io.Reader which can be read as the content of V.
|
||||
func (p *StreamPacket) VReader() io.Reader {
|
||||
if !p.chunkMode {
|
||||
return bytes.NewReader(p.vBuf)
|
||||
}
|
||||
return p.vr
|
||||
// SeqId returns Tag of this packet
|
||||
func (bp *packet) SeqId() byte {
|
||||
return bp.tag.SeqId()
|
||||
}
|
||||
|
||||
// Reader return an io.Reader which can be read as the whole bytes of
|
||||
// this packet. This function only available if this V of packet is in
|
||||
// chunked mode.
|
||||
func (p *StreamPacket) Reader() io.Reader {
|
||||
if !p.chunkMode {
|
||||
buf := new(bytes.Buffer)
|
||||
buf.Write(p.t.Bytes())
|
||||
buf.Write(p.l.Bytes())
|
||||
buf.Write(p.vBuf)
|
||||
return buf
|
||||
// IsSlice determine if the current node is a Slice
|
||||
func (bp *packet) IsSlice() bool {
|
||||
return bp.tag.IsSlice()
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
// T and L of this packet
|
||||
p.writeTL(buf)
|
||||
// V of this packet
|
||||
buf.Write(p.vBuf)
|
||||
|
||||
return &chunkVReader{
|
||||
buf: buf,
|
||||
src: p.vr,
|
||||
totalSize: p.Size(),
|
||||
ChunkVSize: p.VSize(),
|
||||
}
|
||||
}
|
||||
|
||||
// IsStreamMode returns a bool value indicates if the V of
|
||||
// this packet is in stream mode
|
||||
func (p *StreamPacket) IsStreamMode() bool {
|
||||
return p.chunkMode
|
||||
}
|
||||
|
||||
// IsNodeMode returns a bool value indicates if this packet
|
||||
// is node mode
|
||||
func (p *StreamPacket) IsNodeMode() bool {
|
||||
return p.t.IsNodeMode()
|
||||
}
|
||||
|
||||
// write the raw bytes of T and L to given buf
|
||||
func (p *StreamPacket) writeTL(buf *bytes.Buffer) {
|
||||
buf.Write(p.t.Bytes())
|
||||
buf.Write(p.l.Bytes())
|
||||
}
|
||||
|
||||
// BytesV return V as bytes
|
||||
func (p *StreamPacket) BytesV() []byte {
|
||||
return p.vBuf
|
||||
}
|
||||
|
||||
// UTF8StringV return V as utf-8 string
|
||||
func (p *StreamPacket) Utf8StringV() string {
|
||||
return string(p.vBuf)
|
||||
}
|
||||
|
||||
// Int32V return V as int32
|
||||
func (p *StreamPacket) Int32V() (val int32, err error) {
|
||||
codec := encoding.VarCodec{Size: len(p.vBuf)}
|
||||
err = codec.DecodeNVarInt32(p.vBuf, &val)
|
||||
return val, err
|
||||
}
|
||||
|
||||
// UInt32V return V as uint32
|
||||
func (p *StreamPacket) UInt32V() (val uint32, err error) {
|
||||
codec := encoding.VarCodec{Size: len(p.vBuf)}
|
||||
err = codec.DecodeNVarUInt32(p.vBuf, &val)
|
||||
return val, err
|
||||
}
|
||||
|
||||
// Int64V return V as int64
|
||||
func (p *StreamPacket) Int64V() (val int64, err error) {
|
||||
codec := encoding.VarCodec{Size: len(p.vBuf)}
|
||||
err = codec.DecodeNVarInt64(p.vBuf, &val)
|
||||
return val, err
|
||||
}
|
||||
|
||||
// UInt64V return V as uint64
|
||||
func (p *StreamPacket) UInt64V() (val uint64, err error) {
|
||||
codec := encoding.VarCodec{Size: len(p.vBuf)}
|
||||
err = codec.DecodeNVarUInt64(p.vBuf, &val)
|
||||
return val, err
|
||||
}
|
||||
|
||||
// Float32V return V as float32
|
||||
func (p *StreamPacket) Float32V() (val float32, err error) {
|
||||
codec := encoding.VarCodec{Size: len(p.vBuf)}
|
||||
err = codec.DecodeVarFloat32(p.vBuf, &val)
|
||||
return val, err
|
||||
}
|
||||
|
||||
// Float64V return V as float64
|
||||
func (p *StreamPacket) Float64V() (val float64, err error) {
|
||||
codec := encoding.VarCodec{Size: len(p.vBuf)}
|
||||
err = codec.DecodeVarFloat64(p.vBuf, &val)
|
||||
return val, err
|
||||
}
|
||||
|
||||
// BoolV return V as bool
|
||||
func (p *StreamPacket) BoolV() (val bool, err error) {
|
||||
codec := encoding.VarCodec{Size: len(p.vBuf)}
|
||||
err = codec.DecodePVarBool(p.vBuf, &val)
|
||||
return val, err
|
||||
// GetValBuf get raw buffer of Val of this packet
|
||||
func (bp *packet) GetValBuf() []byte {
|
||||
return bp.valBuf
|
||||
}
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrMalformed = errors.New("coder.ReadPacket: malformed")
|
||||
)
|
||||
|
||||
// ReadPacket will try to read encoded packet from the reader
|
||||
func ReadPacket(reader io.Reader) ([]byte, error) {
|
||||
tag, err := readByte(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// buf will contain a complete encoded handshakeFrame
|
||||
buf := bytes.Buffer{}
|
||||
|
||||
// the first byte is coder.Tag
|
||||
// write coder.Tag bytes
|
||||
buf.WriteByte(tag)
|
||||
|
||||
// read y3.Length bytes, a varInt format
|
||||
lenBuf := bytes.Buffer{}
|
||||
for {
|
||||
b, e := readByte(reader)
|
||||
if e != nil {
|
||||
return nil, err
|
||||
}
|
||||
lenBuf.WriteByte(b)
|
||||
if b&0x80 != 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// parse to coder.Length
|
||||
var length int32
|
||||
codec := encoding.VarCodec{}
|
||||
err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
|
||||
if err != nil {
|
||||
return nil, ErrMalformed
|
||||
}
|
||||
|
||||
// validate len decoded from stream
|
||||
if length < 0 {
|
||||
return nil, fmt.Errorf("coder.ReadPacket() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
|
||||
}
|
||||
|
||||
// write coder.Length bytes
|
||||
buf.Write(lenBuf.Bytes())
|
||||
|
||||
// read next {len} bytes as coder.Value
|
||||
valBuf := bytes.Buffer{}
|
||||
|
||||
// every batch read 512 bytes, if next reads < 512, read
|
||||
var count int
|
||||
for {
|
||||
batchReadSize := 1024 * 1024
|
||||
var tmpBuf []byte
|
||||
if int(length)-count < batchReadSize {
|
||||
tmpBuf = make([]byte, int(length)-count)
|
||||
} else {
|
||||
tmpBuf = make([]byte, batchReadSize)
|
||||
}
|
||||
p, e := reader.Read(tmpBuf)
|
||||
count += p
|
||||
if e != nil {
|
||||
if e == io.EOF {
|
||||
valBuf.Write(tmpBuf[:p])
|
||||
break
|
||||
}
|
||||
return nil, fmt.Errorf("y3 parse valBuf error: %v", err)
|
||||
}
|
||||
valBuf.Write(tmpBuf[:p])
|
||||
if count == int(length) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if count < int(length) {
|
||||
// return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count)
|
||||
return nil, ErrMalformed
|
||||
}
|
||||
// write coder.Value bytes
|
||||
buf.Write(valBuf.Bytes())
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func readByte(reader io.Reader) (byte, error) {
|
||||
var b [1]byte
|
||||
_, err := reader.Read(b[:])
|
||||
return b[0], err
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
)
|
||||
|
||||
// DecodeState represents the state of decoding
|
||||
type DecodeState struct {
|
||||
// ConsumedBytes is the bytes consumed by decoder
|
||||
ConsumedBytes int
|
||||
// SizeL is the bytes length of value
|
||||
SizeL int
|
||||
}
|
||||
|
||||
// DecodeToPrimitivePacket parse out whole buffer to a PrimitivePacket
|
||||
//
|
||||
// Examples:
|
||||
// [0x01, 0x01, 0x01] -> Key=0x01, Value=0x01
|
||||
// [0x41, 0x06, 0x03, 0x01, 0x61, 0x04, 0x01, 0x62] -> key=0x03, value=0x61; key=0x04, value=0x62
|
||||
func DecodeToPrimitivePacket(buf []byte, p *PrimitivePacket) (*DecodeState, error) {
|
||||
decoder := &DecodeState{
|
||||
ConsumedBytes: 0,
|
||||
SizeL: 0,
|
||||
}
|
||||
|
||||
if buf == nil || len(buf) < primitivePacketBufferMinimalLength {
|
||||
return decoder, errors.New("invalid packet minimal size")
|
||||
}
|
||||
|
||||
p.packet = &packet{
|
||||
valBuf: []byte{},
|
||||
buf: &bytes.Buffer{},
|
||||
}
|
||||
|
||||
var pos = 0
|
||||
// first byte is `Tag`
|
||||
p.tag = NewTag(buf[pos])
|
||||
p.buf.WriteByte(buf[pos])
|
||||
pos++
|
||||
decoder.ConsumedBytes = pos
|
||||
|
||||
// read `Variant` from buf for `Length of value`
|
||||
tmpBuf := buf[pos:]
|
||||
var bufLen int32
|
||||
codec := encoding.VarCodec{}
|
||||
err := codec.DecodePVarInt32(tmpBuf, &bufLen)
|
||||
if err != nil {
|
||||
return decoder, err
|
||||
}
|
||||
if codec.Size < 1 {
|
||||
return decoder, errors.New("malformed, size of Length can not smaller than 1")
|
||||
}
|
||||
|
||||
// codec.Size describes how many bytes used to represent `Length`
|
||||
p.buf.Write(buf[pos : pos+codec.Size])
|
||||
pos += codec.Size
|
||||
|
||||
decoder.ConsumedBytes = pos
|
||||
decoder.SizeL = codec.Size
|
||||
|
||||
// if length<0, error on decoding
|
||||
if bufLen < 0 {
|
||||
return decoder, errors.New("invalid packet, negative length")
|
||||
}
|
||||
|
||||
// the length of value
|
||||
p.length = int(bufLen)
|
||||
if p.length == 0 {
|
||||
p.valBuf = nil
|
||||
return decoder, nil
|
||||
}
|
||||
|
||||
// the next `p.length` bytes store value
|
||||
endPos := pos + p.length
|
||||
|
||||
if pos > endPos || endPos > len(buf) || pos > len(buf) {
|
||||
return decoder, fmt.Errorf("beyond the boundary, pos=%v, endPos=%v", pos, endPos)
|
||||
}
|
||||
p.valBuf = buf[pos:endPos]
|
||||
p.buf.Write(buf[pos:endPos])
|
||||
|
||||
decoder.ConsumedBytes = endPos
|
||||
return decoder, nil
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
)
|
||||
|
||||
// PrimitivePacketEncoder used for encode a primitive packet
|
||||
type PrimitivePacketEncoder struct {
|
||||
*encoder
|
||||
}
|
||||
|
||||
// NewPrimitivePacketEncoder return an Encoder for primitive packet
|
||||
func NewPrimitivePacketEncoder(sid byte) *PrimitivePacketEncoder {
|
||||
prim := &PrimitivePacketEncoder{
|
||||
encoder: &encoder{
|
||||
isNode: false,
|
||||
buf: new(bytes.Buffer),
|
||||
},
|
||||
}
|
||||
|
||||
prim.seqID = sid
|
||||
return prim
|
||||
}
|
||||
|
||||
// SetInt32Value encode int32 value
|
||||
func (enc *PrimitivePacketEncoder) SetInt32Value(v int32) {
|
||||
size := encoding.SizeOfNVarInt32(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
enc.valBuf = make([]byte, size)
|
||||
err := codec.EncodeNVarInt32(enc.valBuf, v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// SetUInt32Value encode uint32 value
|
||||
func (enc *PrimitivePacketEncoder) SetUInt32Value(v uint32) {
|
||||
size := encoding.SizeOfNVarUInt32(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
enc.valBuf = make([]byte, size)
|
||||
err := codec.EncodeNVarUInt32(enc.valBuf, v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// SetInt64Value encode int64 value
|
||||
func (enc *PrimitivePacketEncoder) SetInt64Value(v int64) {
|
||||
size := encoding.SizeOfNVarInt64(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
enc.valBuf = make([]byte, size)
|
||||
err := codec.EncodeNVarInt64(enc.valBuf, v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// SetUInt64Value encode uint64 value
|
||||
func (enc *PrimitivePacketEncoder) SetUInt64Value(v uint64) {
|
||||
size := encoding.SizeOfNVarUInt64(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
enc.valBuf = make([]byte, size)
|
||||
err := codec.EncodeNVarUInt64(enc.valBuf, v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// SetFloat32Value encode float32 value
|
||||
func (enc *PrimitivePacketEncoder) SetFloat32Value(v float32) {
|
||||
var size = encoding.SizeOfVarFloat32(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
enc.valBuf = make([]byte, size)
|
||||
err := codec.EncodeVarFloat32(enc.valBuf, v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// SetFloat64Value encode float64 value
|
||||
func (enc *PrimitivePacketEncoder) SetFloat64Value(v float64) {
|
||||
var size = encoding.SizeOfVarFloat64(v)
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
enc.valBuf = make([]byte, size)
|
||||
err := codec.EncodeVarFloat64(enc.valBuf, v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// SetBoolValue encode bool value
|
||||
func (enc *PrimitivePacketEncoder) SetBoolValue(v bool) {
|
||||
var size = encoding.SizeOfPVarUInt32(uint32(1))
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
enc.valBuf = make([]byte, size)
|
||||
err := codec.EncodePVarBool(enc.valBuf, v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// SetStringValue encode string
|
||||
func (enc *PrimitivePacketEncoder) SetStringValue(v string) {
|
||||
enc.valBuf = []byte(v)
|
||||
}
|
||||
|
||||
// SetBytesValue encode []byte
|
||||
func (enc *PrimitivePacketEncoder) SetBytesValue(v []byte) {
|
||||
enc.valBuf = v
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
)
|
||||
|
||||
// 最小长度为2个字节
|
||||
const primitivePacketBufferMinimalLength = 2
|
||||
|
||||
// PrimitivePacket describes primitive value type,
|
||||
type PrimitivePacket struct {
|
||||
*packet
|
||||
}
|
||||
|
||||
// ToInt32 parse raw as int32 value
|
||||
func (p *PrimitivePacket) ToInt32() (int32, error) {
|
||||
var val int32
|
||||
codec := encoding.VarCodec{Size: len(p.valBuf)}
|
||||
err := codec.DecodeNVarInt32(p.packet.valBuf, &val)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
|
||||
// ToUInt32 parse raw as uint32 value
|
||||
func (p *PrimitivePacket) ToUInt32() (uint32, error) {
|
||||
var val uint32
|
||||
codec := encoding.VarCodec{Size: len(p.valBuf)}
|
||||
err := codec.DecodeNVarUInt32(p.valBuf, &val)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
|
||||
// ToInt64 parse raw as int64 value
|
||||
func (p *PrimitivePacket) ToInt64() (int64, error) {
|
||||
var val int64
|
||||
codec := encoding.VarCodec{Size: len(p.valBuf)}
|
||||
err := codec.DecodeNVarInt64(p.valBuf, &val)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
|
||||
// ToUInt64 parse raw as uint64 value
|
||||
func (p *PrimitivePacket) ToUInt64() (uint64, error) {
|
||||
var val uint64
|
||||
codec := encoding.VarCodec{Size: len(p.valBuf)}
|
||||
err := codec.DecodeNVarUInt64(p.valBuf, &val)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
|
||||
// ToFloat32 parse raw as float32 value
|
||||
func (p *PrimitivePacket) ToFloat32() (float32, error) {
|
||||
var val float32
|
||||
codec := encoding.VarCodec{Size: len(p.valBuf)}
|
||||
err := codec.DecodeVarFloat32(p.valBuf, &val)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
|
||||
// ToFloat64 parse raw as float64 value
|
||||
func (p *PrimitivePacket) ToFloat64() (float64, error) {
|
||||
var val float64
|
||||
codec := encoding.VarCodec{Size: len(p.valBuf)}
|
||||
err := codec.DecodeVarFloat64(p.valBuf, &val)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
|
||||
// ToBool parse raw as bool value
|
||||
func (p *PrimitivePacket) ToBool() (bool, error) {
|
||||
var val bool
|
||||
codec := encoding.VarCodec{Size: len(p.valBuf)}
|
||||
err := codec.DecodePVarBool(p.valBuf, &val)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
|
||||
// ToUTF8String parse raw data as string value
|
||||
func (p *PrimitivePacket) ToUTF8String() (string, error) {
|
||||
return string(p.valBuf), nil
|
||||
}
|
||||
|
||||
// ToBytes returns raw buffer data
|
||||
func (p *PrimitivePacket) ToBytes() []byte {
|
||||
return p.valBuf
|
||||
}
|
26
spec/spec.go
26
spec/spec.go
|
@ -1,26 +0,0 @@
|
|||
package spec
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
const (
|
||||
maxSeqId = 0x3F
|
||||
flagBitNode = 0x80
|
||||
wipeFlagBits = 0x3F
|
||||
msb = 0x80
|
||||
)
|
||||
|
||||
var (
|
||||
errInvalidSeqId = errors.New("coder.Builder: SeqId should >= 0 and =< 0x3F")
|
||||
)
|
||||
|
||||
func readByte(reader io.Reader) (byte, error) {
|
||||
var b [1]byte
|
||||
n, err := reader.Read(b[:])
|
||||
if n == 0 {
|
||||
return 0x00, err
|
||||
}
|
||||
return b[0], err
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
@startuml
|
||||
namespace spec {
|
||||
class L << (S,Aquamarine) >> {
|
||||
- buf []byte
|
||||
- size int
|
||||
- len int
|
||||
|
||||
+ Bytes() []byte
|
||||
+ Size() int
|
||||
+ VSize() int
|
||||
|
||||
}
|
||||
class T << (S,Aquamarine) >> {
|
||||
+ Sid() int
|
||||
+ Bytes() []byte
|
||||
+ IsNodeMode() bool
|
||||
+ SetNodeMode(flag bool)
|
||||
+ Size() int
|
||||
|
||||
}
|
||||
class spec.T << (T, #FF7700) >> {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
"__builtin__.byte" #.. "spec.T"
|
||||
@enduml
|
|
@ -1,54 +0,0 @@
|
|||
package spec
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// T is the Tag in a TLV structure
|
||||
type T byte
|
||||
|
||||
// NewT returns a T with sequenceID. If this packet contains other
|
||||
// packets, this packet will be a "node packet", the T of this packet
|
||||
// will set MSB to T.
|
||||
func NewT(seqId int) (T, error) {
|
||||
if seqId < 0 || seqId > maxSeqId {
|
||||
return 0, errInvalidSeqId
|
||||
}
|
||||
|
||||
return T(seqId), nil
|
||||
}
|
||||
|
||||
// Sid returns the sequenceId of this packet.
|
||||
func (t T) Sid() int {
|
||||
return int(t & wipeFlagBits)
|
||||
}
|
||||
|
||||
// Bytes returns raw bytes of T.
|
||||
func (t T) Bytes() []byte {
|
||||
return []byte{byte(t)}
|
||||
}
|
||||
|
||||
// IsNodeMode will return true if this packet contains other packets.
|
||||
// Otherwise, return false.
|
||||
func (t T) IsNodeMode() bool {
|
||||
return t&flagBitNode == flagBitNode
|
||||
}
|
||||
|
||||
// SetNodeMode will set T to indicates this packet contains
|
||||
// other packets.
|
||||
func (t *T) SetNodeMode(flag bool) {
|
||||
if flag {
|
||||
*t |= flagBitNode
|
||||
}
|
||||
}
|
||||
|
||||
// Size return the size of T raw bytes.
|
||||
func (t T) Size() int {
|
||||
return 1
|
||||
}
|
||||
|
||||
// ReadT read T from a bufio.Reader
|
||||
func ReadT(rd io.Reader) (T, error) {
|
||||
b, err := readByte(rd)
|
||||
return T(b), err
|
||||
}
|
|
@ -1,89 +0,0 @@
|
|||
package spec
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
)
|
||||
|
||||
// L is the Length in a TLV structure
|
||||
type L struct {
|
||||
buf []byte
|
||||
size int
|
||||
len int
|
||||
}
|
||||
|
||||
// NewL will take an int type len as parameter and return L to
|
||||
// represent the size of V in a TLV. an integer will be encoded as
|
||||
// a PVarInt32 type to represent the value.
|
||||
func NewL(len int) (L, error) {
|
||||
var l = L{}
|
||||
if len < -1 {
|
||||
return l, errors.New("y3.L: len can't less than -1")
|
||||
}
|
||||
|
||||
valLen := int32(len)
|
||||
l.size = encoding.SizeOfPVarInt32(valLen)
|
||||
codec := encoding.VarCodec{Size: l.size}
|
||||
tmp := make([]byte, l.size)
|
||||
err := codec.EncodePVarInt32(tmp, valLen)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
l.buf = make([]byte, l.size)
|
||||
copy(l.buf, tmp)
|
||||
l.len = len
|
||||
return l, nil
|
||||
}
|
||||
|
||||
// Bytes will return the raw bytes of L.
|
||||
func (l L) Bytes() []byte {
|
||||
return l.buf
|
||||
}
|
||||
|
||||
// Size returns how many bytes used to represent this L.
|
||||
func (l L) Size() int {
|
||||
return l.size
|
||||
}
|
||||
|
||||
// VSize returns the size of V.
|
||||
func (l L) VSize() int {
|
||||
return int(l.len)
|
||||
}
|
||||
|
||||
// ReadL read L from bufio.Reader
|
||||
func ReadL(r io.Reader) (*L, error) {
|
||||
lenBuf := bytes.Buffer{}
|
||||
for {
|
||||
b, err := readByte(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lenBuf.WriteByte(b)
|
||||
if b&msb != msb {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
buf := lenBuf.Bytes()
|
||||
|
||||
// decode to L
|
||||
length, err := decodeL(buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &L{
|
||||
buf: buf,
|
||||
len: int(length),
|
||||
size: len(buf),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func decodeL(buf []byte) (length int32, err error) {
|
||||
codec := encoding.VarCodec{}
|
||||
err = codec.DecodePVarInt32(buf, &length)
|
||||
return length, err
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
)
|
||||
|
||||
// StreamReader read an coder packet from io.Reader, and return
|
||||
// the ValReader after decode out Tag and Len
|
||||
type StreamReader struct {
|
||||
src io.Reader
|
||||
// Tag of a coder packet
|
||||
Tag byte
|
||||
// Len of a coder packet
|
||||
Len int
|
||||
// Val of a coder packet
|
||||
Val io.Reader
|
||||
}
|
||||
|
||||
// NewStreamParser create a new coder StreamReader
|
||||
func NewStreamParser(reader io.Reader) *StreamReader {
|
||||
return &StreamReader{
|
||||
src: reader,
|
||||
}
|
||||
}
|
||||
|
||||
func (sr *StreamReader) GetValBuffer() ([]byte, error) {
|
||||
buf, err := io.ReadAll(sr.Val)
|
||||
return buf, err
|
||||
}
|
||||
|
||||
// Do must run in a goroutine
|
||||
func (sr *StreamReader) Do() error {
|
||||
if sr.src == nil {
|
||||
return errors.New("coder: nil source reader")
|
||||
}
|
||||
|
||||
tag, err := readByte(sr.src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// the first byte is coder.Tag
|
||||
sr.Tag = tag
|
||||
|
||||
// read coder.Length bytes, a varInt format
|
||||
lenBuf := bytes.Buffer{}
|
||||
for {
|
||||
b, e := readByte(sr.src)
|
||||
if e != nil {
|
||||
return err
|
||||
}
|
||||
lenBuf.WriteByte(b)
|
||||
if b&0x80 != 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// parse to coder.Length
|
||||
var length int32
|
||||
codec := encoding.VarCodec{}
|
||||
err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// validate len decoded from stream
|
||||
if length < 0 {
|
||||
return fmt.Errorf("coder: streamParse() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
|
||||
}
|
||||
|
||||
sr.Len = int(length)
|
||||
|
||||
// read next {len} bytes as coder.Value
|
||||
sr.Val = &valR{
|
||||
length: int(length),
|
||||
src: sr.src,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type valR struct {
|
||||
length int
|
||||
off int
|
||||
src io.Reader
|
||||
}
|
||||
|
||||
func (r *valR) Read(p []byte) (n int, err error) {
|
||||
if r.src == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if r.off >= r.length {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
bound := len(p)
|
||||
if len(p) > r.length-r.off {
|
||||
bound = r.length - r.off
|
||||
}
|
||||
// update read
|
||||
r.off, err = r.src.Read(p[0:bound])
|
||||
return r.off, err
|
||||
}
|
||||
|
||||
func StreamReadPacket(reader io.Reader) (*StreamReader, error) {
|
||||
sp := NewStreamParser(reader)
|
||||
err := sp.Do()
|
||||
return sp, err
|
||||
}
|
|
@ -0,0 +1,138 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
|
||||
"git.hpds.cc/Component/mq_coder/encoding"
|
||||
)
|
||||
|
||||
type StreamEncoder struct {
|
||||
tag byte
|
||||
buf *bytes.Buffer
|
||||
pBuf *bytes.Buffer
|
||||
len int
|
||||
sLen int
|
||||
reader *yR
|
||||
}
|
||||
|
||||
func NewStreamEncoder(tag byte) *StreamEncoder {
|
||||
var se = &StreamEncoder{
|
||||
tag: tag,
|
||||
buf: new(bytes.Buffer),
|
||||
pBuf: new(bytes.Buffer),
|
||||
}
|
||||
|
||||
return se
|
||||
}
|
||||
|
||||
func (se *StreamEncoder) AddPacket(packet *PrimitivePacketEncoder) {
|
||||
node := packet.Encode()
|
||||
se.AddPacketBuffer(node)
|
||||
}
|
||||
|
||||
func (se *StreamEncoder) AddPacketBuffer(buf []byte) {
|
||||
se.pBuf.Write(buf)
|
||||
se.growLen(len(se.pBuf.Bytes()))
|
||||
}
|
||||
|
||||
func (se *StreamEncoder) AddStreamPacket(tag byte, length int, reader io.Reader) {
|
||||
se.sLen = length
|
||||
// s-Tag
|
||||
se.pBuf.WriteByte(tag)
|
||||
se.growLen(1)
|
||||
// calculate s-Len
|
||||
size := encoding.SizeOfPVarInt32(int32(length))
|
||||
codec := encoding.VarCodec{Size: size}
|
||||
tmp := make([]byte, size)
|
||||
err := codec.EncodePVarInt32(tmp, int32(length))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
se.pBuf.Write(tmp)
|
||||
se.growLen(size)
|
||||
|
||||
// total buf
|
||||
se.buf.WriteByte(se.tag)
|
||||
se.growLen(length)
|
||||
// calculate total Len buf
|
||||
size = encoding.SizeOfPVarInt32(int32(se.len))
|
||||
codec = encoding.VarCodec{Size: size}
|
||||
tmp = make([]byte, size)
|
||||
err = codec.EncodePVarInt32(tmp, int32(se.len))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
se.buf.Write(tmp) //lenBuf
|
||||
se.buf.Write(se.pBuf.Bytes())
|
||||
se.growLen(size) // total length
|
||||
se.growLen(1) // parent tag
|
||||
|
||||
se.reader = &yR{
|
||||
buf: se.buf,
|
||||
src: reader,
|
||||
length: se.len,
|
||||
sLen: se.sLen,
|
||||
}
|
||||
}
|
||||
|
||||
func (se *StreamEncoder) GetReader() io.Reader {
|
||||
if se.reader != nil {
|
||||
return se.reader
|
||||
}
|
||||
return new(bytes.Buffer)
|
||||
}
|
||||
|
||||
// Pipe can pipe data to os.StdOut
|
||||
func (se *StreamEncoder) Pipe(writer io.Writer) {
|
||||
|
||||
}
|
||||
|
||||
func (se *StreamEncoder) GetLen() int {
|
||||
if se.reader != nil {
|
||||
return se.len
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (se *StreamEncoder) growLen(step int) {
|
||||
se.len += step
|
||||
}
|
||||
|
||||
type yR struct {
|
||||
src io.Reader
|
||||
buf *bytes.Buffer
|
||||
length int
|
||||
off int
|
||||
sLen int
|
||||
}
|
||||
|
||||
func (r *yR) Read(p []byte) (n int, err error) {
|
||||
if r.src == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if r.off >= r.length {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
if r.off < r.length-r.sLen {
|
||||
n, err = r.buf.Read(p)
|
||||
r.off += n
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return n, nil
|
||||
} else {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return n, nil
|
||||
} else {
|
||||
n, err = r.src.Read(p)
|
||||
r.off += n
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package mq_coder
|
||||
|
||||
import (
|
||||
"git.hpds.cc/Component/mq_coder/utils"
|
||||
)
|
||||
|
||||
// Tag represents the Tag of TLV,
|
||||
// MSB used to represent the packet type, 0x80 means a node packet, otherwise is a primitive packet.
|
||||
// Low 7 bits represent Sequence ID, like `key` in JSON format
|
||||
type Tag struct {
|
||||
raw byte
|
||||
}
|
||||
|
||||
// IsNode returns true is MSB is 1.
|
||||
func (t *Tag) IsNode() bool {
|
||||
return t.raw&utils.MSB == utils.MSB
|
||||
}
|
||||
|
||||
// IsSlice determine if the current node is a Slice
|
||||
func (t *Tag) IsSlice() bool {
|
||||
return t.raw&utils.SliceFlag == utils.SliceFlag
|
||||
}
|
||||
|
||||
// SeqId get the sequence ID, as key in JSON format
|
||||
func (t *Tag) SeqId() byte {
|
||||
//return t.raw & packet utils.DropMSB
|
||||
return t.raw & utils.DropMSBArrayFlag
|
||||
}
|
||||
|
||||
// NewTag create a NodePacket Tag field
|
||||
func NewTag(b byte) *Tag {
|
||||
return &Tag{raw: b}
|
||||
}
|
||||
|
||||
// Raw return the original byte
|
||||
func (t *Tag) Raw() byte {
|
||||
return t.raw
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package utils
|
||||
|
||||
// MSB is `1000 0000` describes this is a node packet, otherwise, is a primitive packet
|
||||
const MSB byte = 0x80
|
||||
|
||||
// DropMSB is `0111 1111`, used to remove MSB flag bit
|
||||
const DropMSB byte = 0x3F
|
||||
|
||||
// SliceFlag is `0100 0000`, describes this packet is a Slice type
|
||||
const SliceFlag byte = 0x40
|
||||
|
||||
// DropMSBArrayFlag is `0011 1111`, used to remove MSB and Slice flag bit
|
||||
const DropMSBArrayFlag byte = 0x3F
|
||||
|
||||
// IsNodePacket returns true if the tag represents a node package
|
||||
func IsNodePacket(tag byte) bool {
|
||||
return tag&MSB == MSB
|
||||
}
|
Loading…
Reference in New Issue