bug fix
This commit is contained in:
		
							parent
							
								
									b7e43ac8fe
								
							
						
					
					
						commit
						174ae7ae33
					
				| 
						 | 
				
			
			@ -1,72 +0,0 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"io"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type chunkVReader struct {
 | 
			
		||||
	src        io.Reader     // the reader parts of V
 | 
			
		||||
	buf        *bytes.Buffer // the bytes part of V
 | 
			
		||||
	totalSize  int           // size of whole buffer of this packet
 | 
			
		||||
	off        int           // last read op
 | 
			
		||||
	ChunkVSize int           // the size of chunked V
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Read implement io.Reader interface
 | 
			
		||||
func (r *chunkVReader) Read(p []byte) (n int, err error) {
 | 
			
		||||
	if r.src == nil {
 | 
			
		||||
		return 0, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if r.off >= r.totalSize {
 | 
			
		||||
		return 0, io.EOF
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if r.off < r.totalSize-r.ChunkVSize {
 | 
			
		||||
		n, err := r.buf.Read(p)
 | 
			
		||||
		r.off += n
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if err == io.EOF {
 | 
			
		||||
				return n, nil
 | 
			
		||||
			} else {
 | 
			
		||||
				return 0, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return n, nil
 | 
			
		||||
	}
 | 
			
		||||
	n, err = r.src.Read(p)
 | 
			
		||||
	r.off += n
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return n, err
 | 
			
		||||
	}
 | 
			
		||||
	return n, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WriteTo implement io.WriteTo interface
 | 
			
		||||
func (r *chunkVReader) WriteTo(w io.Writer) (n int64, err error) {
 | 
			
		||||
	if r.src == nil {
 | 
			
		||||
		return 0, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// first, write existed buffer
 | 
			
		||||
	m, err := w.Write(r.buf.Bytes())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	n += int64(m)
 | 
			
		||||
 | 
			
		||||
	// last, write from reader
 | 
			
		||||
	buf, err := ioutil.ReadAll(r.src)
 | 
			
		||||
	if err != nil && err != io.EOF {
 | 
			
		||||
		return 0, errWriteFromReader
 | 
			
		||||
	}
 | 
			
		||||
	m, err = w.Write(buf)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	n += int64(m)
 | 
			
		||||
	return n, nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										54
									
								
								coder.go
								
								
								
								
							
							
						
						
									
										54
									
								
								coder.go
								
								
								
								
							| 
						 | 
				
			
			@ -1,54 +0,0 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"io"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	errBuildIncomplete = errors.New("coder.Encoder: invalid structure of packet")
 | 
			
		||||
	errInvalidAdding   = errors.New("coder.Encoder: can not add this Packet after StreamPacket has been add")
 | 
			
		||||
	errNonStreamPacket = errors.New("coder.Packet: this packet is not in node mode")
 | 
			
		||||
	errWriteFromReader = errors.New("coder.streamV: write from reader error")
 | 
			
		||||
	errNotNodeMode     = errors.New("coder.Encoder: packet should be in node mode can be add other packets as child")
 | 
			
		||||
	errNilReader       = errors.New("coder.Decoder: nil source reader")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Packet 编解码器包
 | 
			
		||||
type Packet interface {
 | 
			
		||||
	// SeqId 返回此数据包的序列Id.
 | 
			
		||||
	SeqId() int
 | 
			
		||||
	// Size 返回整个数据包的大小
 | 
			
		||||
	Size() int
 | 
			
		||||
	// VSize 返回V的大小.
 | 
			
		||||
	VSize() int
 | 
			
		||||
	// Bytes 此数据包的全部字节.
 | 
			
		||||
	Bytes() []byte
 | 
			
		||||
	// Reader 返回 io.Reader 字节.
 | 
			
		||||
	Reader() io.Reader
 | 
			
		||||
	// VReader 返回 io.Reader 的 流字节.
 | 
			
		||||
	VReader() io.Reader
 | 
			
		||||
	// IsStreamMode 是否流模式
 | 
			
		||||
	IsStreamMode() bool
 | 
			
		||||
	// IsNodeMode 是否节点模式
 | 
			
		||||
	IsNodeMode() bool
 | 
			
		||||
 | 
			
		||||
	// BytesV 返回流字节
 | 
			
		||||
	BytesV() []byte
 | 
			
		||||
	// Utf8StringV 返回流的utf8字符串值
 | 
			
		||||
	Utf8StringV() string
 | 
			
		||||
	// Int32V 返回流的int32值
 | 
			
		||||
	Int32V() (val int32, err error)
 | 
			
		||||
	// UInt32V 返回流的uint32值
 | 
			
		||||
	UInt32V() (val uint32, err error)
 | 
			
		||||
	// Int64V 返回流的int64值
 | 
			
		||||
	Int64V() (val int64, err error)
 | 
			
		||||
	// UInt64V 返回流的uint64值
 | 
			
		||||
	UInt64V() (val uint64, err error)
 | 
			
		||||
	// Float32V 返回流的float32值
 | 
			
		||||
	Float32V() (val float32, err error)
 | 
			
		||||
	// Float64V 返回流的float64值
 | 
			
		||||
	Float64V() (val float64, err error)
 | 
			
		||||
	// BoolV 返回流的bool值
 | 
			
		||||
	BoolV() (val bool, err error)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										93
									
								
								decoder.go
								
								
								
								
							
							
						
						
									
										93
									
								
								decoder.go
								
								
								
								
							| 
						 | 
				
			
			@ -1,93 +0,0 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"io"
 | 
			
		||||
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/spec"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Decoder is the tool for decoding packet from stream
 | 
			
		||||
type Decoder struct {
 | 
			
		||||
	tag spec.T
 | 
			
		||||
	len *spec.L
 | 
			
		||||
	rd  io.Reader
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewDecoder returns a Decoder from an io.Reader
 | 
			
		||||
func NewDecoder(reader io.Reader) *Decoder {
 | 
			
		||||
	return &Decoder{
 | 
			
		||||
		rd: reader,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SeqId return the SequenceID of the decoding packet
 | 
			
		||||
func (d *Decoder) SeqId() int {
 | 
			
		||||
	return d.tag.Sid()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UnderlyingReader returns the reader this decoder using
 | 
			
		||||
func (d *Decoder) UnderlyingReader() io.Reader {
 | 
			
		||||
	return d.rd
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadHeader will block until io.EOF or receive T and L of a packet.
 | 
			
		||||
func (d *Decoder) ReadHeader() error {
 | 
			
		||||
	// only read T and L
 | 
			
		||||
	return d.readTL()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetChunkedPacket will block until io.EOF or receive V of a packet in chunked mode.
 | 
			
		||||
func (d *Decoder) GetChunkedPacket() Packet {
 | 
			
		||||
	return &StreamPacket{
 | 
			
		||||
		t:         d.tag,
 | 
			
		||||
		l:         *d.len,
 | 
			
		||||
		vr:        d.rd,
 | 
			
		||||
		chunkMode: true,
 | 
			
		||||
		chunkSize: d.len.VSize(),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetFullFilledPacket read full Packet from given io.Reader
 | 
			
		||||
func (d *Decoder) GetFullFilledPacket() (packet Packet, err error) {
 | 
			
		||||
	// read V
 | 
			
		||||
	buf := new(bytes.Buffer)
 | 
			
		||||
	total := 0
 | 
			
		||||
	for {
 | 
			
		||||
		valBuf := make([]byte, d.len.VSize())
 | 
			
		||||
		n, err := d.rd.Read(valBuf)
 | 
			
		||||
		if n > 0 {
 | 
			
		||||
			total += n
 | 
			
		||||
			buf.Write(valBuf[:n])
 | 
			
		||||
		}
 | 
			
		||||
		if total >= d.len.VSize() || err != nil {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	packet = &StreamPacket{
 | 
			
		||||
		t:         d.tag,
 | 
			
		||||
		l:         *d.len,
 | 
			
		||||
		vBuf:      buf.Bytes(),
 | 
			
		||||
		chunkMode: false,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return packet, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *Decoder) readTL() (err error) {
 | 
			
		||||
	if d.rd == nil {
 | 
			
		||||
		return errNilReader
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// read T
 | 
			
		||||
	d.tag, err = spec.ReadT(d.rd)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// read L
 | 
			
		||||
	d.len, err = spec.ReadL(d.rd)
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										187
									
								
								encoder.go
								
								
								
								
							
							
						
						
									
										187
									
								
								encoder.go
								
								
								
								
							| 
						 | 
				
			
			@ -2,157 +2,76 @@ package mq_coder
 | 
			
		|||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"io"
 | 
			
		||||
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/spec"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Encoder is the tool for creating a packet easily
 | 
			
		||||
type Encoder struct {
 | 
			
		||||
	tag           spec.T
 | 
			
		||||
	len           *spec.L
 | 
			
		||||
	valReader     io.Reader
 | 
			
		||||
	valReaderSize int
 | 
			
		||||
	nodes         map[int]Packet
 | 
			
		||||
	state         int
 | 
			
		||||
	size          int32 // size of value
 | 
			
		||||
	isStreamMode  bool
 | 
			
		||||
	valBuf        *bytes.Buffer
 | 
			
		||||
	done          bool
 | 
			
		||||
	seqId         int
 | 
			
		||||
	isNodeMode    bool
 | 
			
		||||
// Encoder will encode object to encoding
 | 
			
		||||
type encoder struct {
 | 
			
		||||
	seqID    byte
 | 
			
		||||
	valbuf   []byte
 | 
			
		||||
	isNode   bool
 | 
			
		||||
	isArray  bool
 | 
			
		||||
	buf      *bytes.Buffer
 | 
			
		||||
	complete bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetSeqId set sequenceId of a  packet, if this packet contains other
 | 
			
		||||
// packets, isNode should set to true
 | 
			
		||||
func (b *Encoder) SetSeqId(seqId int, isNode bool) {
 | 
			
		||||
	// init
 | 
			
		||||
	b.valBuf = new(bytes.Buffer)
 | 
			
		||||
	b.nodes = make(map[int]Packet)
 | 
			
		||||
	// set seqId
 | 
			
		||||
	b.seqId = seqId
 | 
			
		||||
	b.isNodeMode = isNode
 | 
			
		||||
type iEncoder interface {
 | 
			
		||||
	Encode() []byte
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetBytesV set bytes type as V
 | 
			
		||||
func (b *Encoder) SetBytesV(buf []byte) {
 | 
			
		||||
	b.size += int32(len(buf))
 | 
			
		||||
	b.valBuf.Write(buf)
 | 
			
		||||
	b.isStreamMode = false
 | 
			
		||||
	b.state |= 0x04
 | 
			
		||||
func (enc *encoder) GetValBuf() []byte {
 | 
			
		||||
	return enc.valbuf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetReaderV set io.Reader type as V
 | 
			
		||||
func (b *Encoder) SetReaderV(r io.Reader, size int) {
 | 
			
		||||
	b.isStreamMode = true
 | 
			
		||||
	b.valReader = r
 | 
			
		||||
	b.state |= 0x04
 | 
			
		||||
	b.size += int32(size)
 | 
			
		||||
	b.valReaderSize = size
 | 
			
		||||
func (enc *encoder) IsEmpty() bool {
 | 
			
		||||
	return len(enc.valbuf) == 0
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddPacket add a Packet child to this packet, this packet must be NodeMode
 | 
			
		||||
func (b *Encoder) AddPacket(child Packet) error {
 | 
			
		||||
	// only packet is in node mode can add other packets
 | 
			
		||||
	if !b.isNodeMode {
 | 
			
		||||
		return errNotNodeMode
 | 
			
		||||
func (enc *encoder) AddBytes(buf []byte) {
 | 
			
		||||
	enc.valbuf = append(enc.valbuf, buf...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (enc *encoder) addRawPacket(en iEncoder) {
 | 
			
		||||
	enc.valbuf = append(enc.valbuf, en.Encode()...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// setTag write tag as seqID
 | 
			
		||||
func (enc *encoder) writeTag() {
 | 
			
		||||
	if enc.seqID > 0x3F {
 | 
			
		||||
		panic(fmt.Errorf("sid should be in [0..0x3F]"))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if b.done {
 | 
			
		||||
		return errInvalidAdding
 | 
			
		||||
	if enc.isNode {
 | 
			
		||||
		enc.seqID = enc.seqID | 0x80
 | 
			
		||||
	}
 | 
			
		||||
	b.nodes[child.SeqId()] = child
 | 
			
		||||
	buf := child.Bytes()
 | 
			
		||||
	b.SetBytesV(buf)
 | 
			
		||||
	return nil
 | 
			
		||||
	if enc.isArray {
 | 
			
		||||
		enc.seqID = enc.seqID | 0x40
 | 
			
		||||
	}
 | 
			
		||||
	enc.buf.WriteByte(enc.seqID)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddStreamPacket will put a StreamPacket in chunked mode to current packet.
 | 
			
		||||
func (b *Encoder) AddStreamPacket(child Packet) (err error) {
 | 
			
		||||
	// if this packet is in stream mode, can not add any packets
 | 
			
		||||
	if b.done {
 | 
			
		||||
		return errInvalidAdding
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// only accept packet in stream mode
 | 
			
		||||
	if !child.IsStreamMode() {
 | 
			
		||||
		return errNonStreamPacket
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// set the valReader of this packet to the child's
 | 
			
		||||
	b.valReader = child.VReader()
 | 
			
		||||
 | 
			
		||||
	// valReaderSize will be the same as child's
 | 
			
		||||
	b.valReaderSize = child.VSize()
 | 
			
		||||
	// add this child packet
 | 
			
		||||
	b.nodes[child.SeqId()] = child
 | 
			
		||||
	// add the size of child's V to L of this packet
 | 
			
		||||
	b.size += int32(child.Size())
 | 
			
		||||
	// put the bytes of child to valBuf
 | 
			
		||||
	buf := child.Bytes()
 | 
			
		||||
	b.valBuf.Write(buf)
 | 
			
		||||
	// update state
 | 
			
		||||
	b.state |= 0x04
 | 
			
		||||
	b.isStreamMode = true
 | 
			
		||||
	b.done = true
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Packet return a Packet instance.
 | 
			
		||||
func (b *Encoder) Packet() (Packet, error) {
 | 
			
		||||
	err := b.generateT()
 | 
			
		||||
func (enc *encoder) writeLengthBuf() {
 | 
			
		||||
	vallen := len(enc.valbuf)
 | 
			
		||||
	size := encoding.SizeOfPVarInt32(int32(vallen))
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	tmp := make([]byte, size)
 | 
			
		||||
	err := codec.EncodePVarInt32(tmp, int32(vallen))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = b.generateL()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if b.state != 0x07 {
 | 
			
		||||
		return nil, errBuildIncomplete
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if b.isStreamMode {
 | 
			
		||||
		return &StreamPacket{
 | 
			
		||||
			t:         b.tag,
 | 
			
		||||
			l:         *b.len,
 | 
			
		||||
			vr:        b.valReader,
 | 
			
		||||
			vBuf:      b.valBuf.Bytes(),
 | 
			
		||||
			chunkMode: true,
 | 
			
		||||
			chunkSize: b.valReaderSize,
 | 
			
		||||
		}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// not streaming mode
 | 
			
		||||
	return &StreamPacket{
 | 
			
		||||
		t:         b.tag,
 | 
			
		||||
		l:         *b.len,
 | 
			
		||||
		vBuf:      b.valBuf.Bytes(),
 | 
			
		||||
		chunkMode: false,
 | 
			
		||||
	}, err
 | 
			
		||||
	enc.buf.Write(tmp)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// will generate T of a TLV.
 | 
			
		||||
func (b *Encoder) generateT() error {
 | 
			
		||||
	t, err := spec.NewT(b.seqId)
 | 
			
		||||
	t.SetNodeMode(b.isNodeMode)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
// Encode returns a final Y3 encoded byte slice
 | 
			
		||||
func (enc *encoder) Encode() []byte {
 | 
			
		||||
	if !enc.complete {
 | 
			
		||||
		// Tag
 | 
			
		||||
		enc.writeTag()
 | 
			
		||||
		// Len
 | 
			
		||||
		enc.writeLengthBuf()
 | 
			
		||||
		// Val
 | 
			
		||||
		enc.buf.Write(enc.valbuf)
 | 
			
		||||
		enc.complete = true
 | 
			
		||||
	}
 | 
			
		||||
	b.tag = t
 | 
			
		||||
	b.state |= 0x01
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// will generate L of a TLV.
 | 
			
		||||
func (b *Encoder) generateL() error {
 | 
			
		||||
	l, err := spec.NewL(int(b.size))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	b.len = &l
 | 
			
		||||
	b.state |= 0x02
 | 
			
		||||
	return nil
 | 
			
		||||
	return enc.buf.Bytes()
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,98 +0,0 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// SetUtf8StringV set utf-8 string type value as V
 | 
			
		||||
func (b *Encoder) SetUtf8StringV(v string) {
 | 
			
		||||
	buf := []byte(v)
 | 
			
		||||
	b.SetBytesV(buf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetInt32V set an int32 type value as V
 | 
			
		||||
func (b *Encoder) SetInt32V(v int32) error {
 | 
			
		||||
	size := encoding.SizeOfNVarInt32(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	buf := make([]byte, size)
 | 
			
		||||
	err := codec.EncodeNVarInt32(buf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	b.SetBytesV(buf)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetUInt32V set an uint32 type value as V
 | 
			
		||||
func (b *Encoder) SetUInt32V(v uint32) error {
 | 
			
		||||
	size := encoding.SizeOfNVarUInt32(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	buf := make([]byte, size)
 | 
			
		||||
	err := codec.EncodeNVarUInt32(buf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	b.SetBytesV(buf)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetInt64V set an int64 type value as V
 | 
			
		||||
func (b *Encoder) SetInt64V(v int64) error {
 | 
			
		||||
	size := encoding.SizeOfNVarInt64(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	buf := make([]byte, size)
 | 
			
		||||
	err := codec.EncodeNVarInt64(buf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	b.SetBytesV(buf)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetUInt64V set an uint64 type value as V
 | 
			
		||||
func (b *Encoder) SetUInt64V(v uint64) error {
 | 
			
		||||
	size := encoding.SizeOfNVarUInt64(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	buf := make([]byte, size)
 | 
			
		||||
	err := codec.EncodeNVarUInt64(buf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	b.SetBytesV(buf)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetFloat32V set a float32 type value as V
 | 
			
		||||
func (b *Encoder) SetFloat32V(v float32) error {
 | 
			
		||||
	size := encoding.SizeOfVarFloat32(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	buf := make([]byte, size)
 | 
			
		||||
	err := codec.EncodeVarFloat32(buf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	b.SetBytesV(buf)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetFloat64V set a float64 type value as V
 | 
			
		||||
func (b *Encoder) SetFloat64V(v float64) error {
 | 
			
		||||
	size := encoding.SizeOfVarFloat64(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	buf := make([]byte, size)
 | 
			
		||||
	err := codec.EncodeVarFloat64(buf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	b.SetBytesV(buf)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetBoolV set bool type value as V
 | 
			
		||||
func (b *Encoder) SetBoolV(v bool) {
 | 
			
		||||
	var size = encoding.SizeOfPVarUInt32(uint32(1))
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	buf := make([]byte, size)
 | 
			
		||||
	_ = codec.EncodePVarBool(buf, v)
 | 
			
		||||
	b.SetBytesV(buf)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,96 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"errors"
 | 
			
		||||
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/utils"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func parsePayload(b []byte) (consumedBytes int, ifNodePacket bool, np *NodePacket, pp *PrimitivePacket, err error) {
 | 
			
		||||
	if len(b) == 0 {
 | 
			
		||||
		return 0, false, nil, nil, errors.New("parsePacket params can not be nil")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pos := 0
 | 
			
		||||
	// NodePacket
 | 
			
		||||
	if ok := utils.IsNodePacket(b[pos]); ok {
 | 
			
		||||
		np = &NodePacket{}
 | 
			
		||||
		endPos, err := DecodeToNodePacket(b, np)
 | 
			
		||||
		return endPos, true, np, nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pp = &PrimitivePacket{}
 | 
			
		||||
	state, err := DecodeToPrimitivePacket(b, pp)
 | 
			
		||||
	return state.ConsumedBytes, false, nil, pp, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DecodeToNodePacket parse out whole buffer to a NodePacket
 | 
			
		||||
func DecodeToNodePacket(buf []byte, pct *NodePacket) (consumedBytes int, err error) {
 | 
			
		||||
	if len(buf) == 0 {
 | 
			
		||||
		return 0, errors.New("empty buf")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pct.packet = &packet{
 | 
			
		||||
		valBuf: buf,
 | 
			
		||||
		buf:    &bytes.Buffer{},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pct.NodePackets = map[byte]NodePacket{}
 | 
			
		||||
	pct.PrimitivePackets = map[byte]PrimitivePacket{}
 | 
			
		||||
 | 
			
		||||
	pos := 0
 | 
			
		||||
 | 
			
		||||
	// `Tag`
 | 
			
		||||
	tag := NewTag(buf[pos])
 | 
			
		||||
	pct.packet.tag = tag
 | 
			
		||||
	pct.buf.WriteByte(buf[pos])
 | 
			
		||||
	pos++
 | 
			
		||||
 | 
			
		||||
	// `Length`: the type is `varint`
 | 
			
		||||
	tmpBuf := buf[pos:]
 | 
			
		||||
	var valLen int32
 | 
			
		||||
	codec := encoding.VarCodec{}
 | 
			
		||||
	err = codec.DecodePVarInt32(tmpBuf, &valLen)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	pct.packet.length = int(valLen)
 | 
			
		||||
	pct.buf.Write(buf[pos : pos+codec.Size])
 | 
			
		||||
	pos += codec.Size
 | 
			
		||||
	// if `Length` is 0, means empty node packet
 | 
			
		||||
	if valLen == 0 {
 | 
			
		||||
		return pos, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// `Value`
 | 
			
		||||
	// `raw` is pct.Length() length
 | 
			
		||||
	vl := int(valLen)
 | 
			
		||||
	if vl < 0 {
 | 
			
		||||
		return pos, errors.New("found L of V smaller than 0")
 | 
			
		||||
	}
 | 
			
		||||
	endPos := pos + vl
 | 
			
		||||
	pct.packet.valBuf = buf[pos:endPos]
 | 
			
		||||
	pct.buf.Write(buf[pos:endPos])
 | 
			
		||||
 | 
			
		||||
	// Parse value to Packet
 | 
			
		||||
	for {
 | 
			
		||||
		if pos >= endPos || pos >= len(buf) {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		_p, isNode, np, pp, err := parsePayload(buf[pos:endPos])
 | 
			
		||||
		pos += _p
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return 0, err
 | 
			
		||||
		}
 | 
			
		||||
		if isNode {
 | 
			
		||||
			pct.NodePackets[np.packet.tag.SeqId()] = *np
 | 
			
		||||
		} else {
 | 
			
		||||
			pct.PrimitivePackets[byte(pp.SeqId())] = *pp
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	consumedBytes = endPos
 | 
			
		||||
	return consumedBytes, nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,33 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// NodePacketEncoder used for encode a node packet
 | 
			
		||||
type NodePacketEncoder struct {
 | 
			
		||||
	*encoder
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewNodePacketEncoder returns an Encoder for node packet
 | 
			
		||||
func NewNodePacketEncoder(sid byte) *NodePacketEncoder {
 | 
			
		||||
	nodeEnc := &NodePacketEncoder{
 | 
			
		||||
		encoder: &encoder{
 | 
			
		||||
			isNode: true,
 | 
			
		||||
			buf:    new(bytes.Buffer),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	nodeEnc.seqID = sid
 | 
			
		||||
	return nodeEnc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddNodePacket add new node to this node
 | 
			
		||||
func (enc *NodePacketEncoder) AddNodePacket(np *NodePacketEncoder) {
 | 
			
		||||
	enc.addRawPacket(np)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddPrimitivePacket add new primitive to this node
 | 
			
		||||
func (enc *NodePacketEncoder) AddPrimitivePacket(np *PrimitivePacketEncoder) {
 | 
			
		||||
	enc.addRawPacket(np)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,10 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
// NodePacket describes complex values
 | 
			
		||||
type NodePacket struct {
 | 
			
		||||
	*packet
 | 
			
		||||
	// NodePackets store all the node packets
 | 
			
		||||
	NodePackets map[byte]NodePacket
 | 
			
		||||
	// PrimitivePackets store all the primitive packets
 | 
			
		||||
	PrimitivePackets map[byte]PrimitivePacket
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										160
									
								
								packet.go
								
								
								
								
							
							
						
						
									
										160
									
								
								packet.go
								
								
								
								
							| 
						 | 
				
			
			@ -2,155 +2,37 @@ package mq_coder
 | 
			
		|||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"io"
 | 
			
		||||
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/spec"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// StreamPacket implement the Packet interface.
 | 
			
		||||
type StreamPacket struct {
 | 
			
		||||
	t         spec.T
 | 
			
		||||
	l         spec.L
 | 
			
		||||
	vBuf      []byte
 | 
			
		||||
	vr        io.Reader
 | 
			
		||||
	chunkMode bool
 | 
			
		||||
	chunkSize int
 | 
			
		||||
// packet is the base type of the NodePacket and PrimitivePacket
 | 
			
		||||
type packet struct {
 | 
			
		||||
	tag    *Tag
 | 
			
		||||
	length int
 | 
			
		||||
	valBuf []byte
 | 
			
		||||
	buf    *bytes.Buffer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ Packet = &StreamPacket{}
 | 
			
		||||
 | 
			
		||||
// SeqId returns the sequenceId of this packet
 | 
			
		||||
func (p *StreamPacket) SeqId() int { return p.t.Sid() }
 | 
			
		||||
 | 
			
		||||
// Size returns the size of whole packet.
 | 
			
		||||
func (p *StreamPacket) Size() int {
 | 
			
		||||
	// T.Size + L.Size + V.Size
 | 
			
		||||
	return p.t.Size() + p.l.Size() + p.l.VSize()
 | 
			
		||||
// GetRawBytes get all raw bytes of this packet
 | 
			
		||||
func (bp *packet) GetRawBytes() []byte {
 | 
			
		||||
	return bp.buf.Bytes()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// VSize returns the size of V.
 | 
			
		||||
func (p *StreamPacket) VSize() int { return p.l.VSize() }
 | 
			
		||||
 | 
			
		||||
// Bytes return the raw bytes of this packet. V will be absent if
 | 
			
		||||
// is in chunked mode
 | 
			
		||||
func (p *StreamPacket) Bytes() []byte {
 | 
			
		||||
	buf := new(bytes.Buffer)
 | 
			
		||||
	// the raw bytes of T and L
 | 
			
		||||
	p.writeTL(buf)
 | 
			
		||||
	// p.valBuf stores the raw bytes of V
 | 
			
		||||
	buf.Write(p.vBuf)
 | 
			
		||||
 | 
			
		||||
	return buf.Bytes()
 | 
			
		||||
// Length return the length of Val this packet
 | 
			
		||||
func (bp *packet) Length() int {
 | 
			
		||||
	return bp.length
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// VReader return an io.Reader which can be read as the content of V.
 | 
			
		||||
func (p *StreamPacket) VReader() io.Reader {
 | 
			
		||||
	if !p.chunkMode {
 | 
			
		||||
		return bytes.NewReader(p.vBuf)
 | 
			
		||||
	}
 | 
			
		||||
	return p.vr
 | 
			
		||||
// SeqId returns Tag of this packet
 | 
			
		||||
func (bp *packet) SeqId() byte {
 | 
			
		||||
	return bp.tag.SeqId()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Reader return an io.Reader which can be read as the whole bytes of
 | 
			
		||||
// this packet. This function only available if this V of packet is in
 | 
			
		||||
// chunked mode.
 | 
			
		||||
func (p *StreamPacket) Reader() io.Reader {
 | 
			
		||||
	if !p.chunkMode {
 | 
			
		||||
		buf := new(bytes.Buffer)
 | 
			
		||||
		buf.Write(p.t.Bytes())
 | 
			
		||||
		buf.Write(p.l.Bytes())
 | 
			
		||||
		buf.Write(p.vBuf)
 | 
			
		||||
		return buf
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	buf := new(bytes.Buffer)
 | 
			
		||||
	// T and L of this packet
 | 
			
		||||
	p.writeTL(buf)
 | 
			
		||||
	// V of this packet
 | 
			
		||||
	buf.Write(p.vBuf)
 | 
			
		||||
 | 
			
		||||
	return &chunkVReader{
 | 
			
		||||
		buf:        buf,
 | 
			
		||||
		src:        p.vr,
 | 
			
		||||
		totalSize:  p.Size(),
 | 
			
		||||
		ChunkVSize: p.VSize(),
 | 
			
		||||
	}
 | 
			
		||||
// IsSlice determine if the current node is a Slice
 | 
			
		||||
func (bp *packet) IsSlice() bool {
 | 
			
		||||
	return bp.tag.IsSlice()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// IsStreamMode returns a bool value indicates if the V of
 | 
			
		||||
// this packet is in stream mode
 | 
			
		||||
func (p *StreamPacket) IsStreamMode() bool {
 | 
			
		||||
	return p.chunkMode
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// IsNodeMode returns a bool value indicates if this packet
 | 
			
		||||
// is node mode
 | 
			
		||||
func (p *StreamPacket) IsNodeMode() bool {
 | 
			
		||||
	return p.t.IsNodeMode()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// write the raw bytes of T and L to given buf
 | 
			
		||||
func (p *StreamPacket) writeTL(buf *bytes.Buffer) {
 | 
			
		||||
	buf.Write(p.t.Bytes())
 | 
			
		||||
	buf.Write(p.l.Bytes())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BytesV return V as bytes
 | 
			
		||||
func (p *StreamPacket) BytesV() []byte {
 | 
			
		||||
	return p.vBuf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UTF8StringV return V as utf-8 string
 | 
			
		||||
func (p *StreamPacket) Utf8StringV() string {
 | 
			
		||||
	return string(p.vBuf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Int32V return V as int32
 | 
			
		||||
func (p *StreamPacket) Int32V() (val int32, err error) {
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.vBuf)}
 | 
			
		||||
	err = codec.DecodeNVarInt32(p.vBuf, &val)
 | 
			
		||||
	return val, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UInt32V return V as uint32
 | 
			
		||||
func (p *StreamPacket) UInt32V() (val uint32, err error) {
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.vBuf)}
 | 
			
		||||
	err = codec.DecodeNVarUInt32(p.vBuf, &val)
 | 
			
		||||
	return val, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Int64V return V as int64
 | 
			
		||||
func (p *StreamPacket) Int64V() (val int64, err error) {
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.vBuf)}
 | 
			
		||||
	err = codec.DecodeNVarInt64(p.vBuf, &val)
 | 
			
		||||
	return val, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UInt64V return V as uint64
 | 
			
		||||
func (p *StreamPacket) UInt64V() (val uint64, err error) {
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.vBuf)}
 | 
			
		||||
	err = codec.DecodeNVarUInt64(p.vBuf, &val)
 | 
			
		||||
	return val, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Float32V return V as float32
 | 
			
		||||
func (p *StreamPacket) Float32V() (val float32, err error) {
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.vBuf)}
 | 
			
		||||
	err = codec.DecodeVarFloat32(p.vBuf, &val)
 | 
			
		||||
	return val, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Float64V return V as float64
 | 
			
		||||
func (p *StreamPacket) Float64V() (val float64, err error) {
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.vBuf)}
 | 
			
		||||
	err = codec.DecodeVarFloat64(p.vBuf, &val)
 | 
			
		||||
	return val, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BoolV return V as bool
 | 
			
		||||
func (p *StreamPacket) BoolV() (val bool, err error) {
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.vBuf)}
 | 
			
		||||
	err = codec.DecodePVarBool(p.vBuf, &val)
 | 
			
		||||
	return val, err
 | 
			
		||||
// GetValBuf get raw buffer of Val of this packet
 | 
			
		||||
func (bp *packet) GetValBuf() []byte {
 | 
			
		||||
	return bp.valBuf
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,100 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	ErrMalformed = errors.New("coder.ReadPacket: malformed")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// ReadPacket will try to read encoded packet from the reader
 | 
			
		||||
func ReadPacket(reader io.Reader) ([]byte, error) {
 | 
			
		||||
	tag, err := readByte(reader)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	// buf will contain a complete encoded handshakeFrame
 | 
			
		||||
	buf := bytes.Buffer{}
 | 
			
		||||
 | 
			
		||||
	// the first byte is coder.Tag
 | 
			
		||||
	// write coder.Tag bytes
 | 
			
		||||
	buf.WriteByte(tag)
 | 
			
		||||
 | 
			
		||||
	// read y3.Length bytes, a varInt format
 | 
			
		||||
	lenBuf := bytes.Buffer{}
 | 
			
		||||
	for {
 | 
			
		||||
		b, e := readByte(reader)
 | 
			
		||||
		if e != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		lenBuf.WriteByte(b)
 | 
			
		||||
		if b&0x80 != 0x80 {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// parse to coder.Length
 | 
			
		||||
	var length int32
 | 
			
		||||
	codec := encoding.VarCodec{}
 | 
			
		||||
	err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, ErrMalformed
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// validate len decoded from stream
 | 
			
		||||
	if length < 0 {
 | 
			
		||||
		return nil, fmt.Errorf("coder.ReadPacket() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// write coder.Length bytes
 | 
			
		||||
	buf.Write(lenBuf.Bytes())
 | 
			
		||||
 | 
			
		||||
	// read next {len} bytes as coder.Value
 | 
			
		||||
	valBuf := bytes.Buffer{}
 | 
			
		||||
 | 
			
		||||
	// every batch read 512 bytes, if next reads < 512, read
 | 
			
		||||
	var count int
 | 
			
		||||
	for {
 | 
			
		||||
		batchReadSize := 1024 * 1024
 | 
			
		||||
		var tmpBuf []byte
 | 
			
		||||
		if int(length)-count < batchReadSize {
 | 
			
		||||
			tmpBuf = make([]byte, int(length)-count)
 | 
			
		||||
		} else {
 | 
			
		||||
			tmpBuf = make([]byte, batchReadSize)
 | 
			
		||||
		}
 | 
			
		||||
		p, e := reader.Read(tmpBuf)
 | 
			
		||||
		count += p
 | 
			
		||||
		if e != nil {
 | 
			
		||||
			if e == io.EOF {
 | 
			
		||||
				valBuf.Write(tmpBuf[:p])
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			return nil, fmt.Errorf("y3 parse valBuf error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		valBuf.Write(tmpBuf[:p])
 | 
			
		||||
		if count == int(length) {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if count < int(length) {
 | 
			
		||||
		// return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count)
 | 
			
		||||
		return nil, ErrMalformed
 | 
			
		||||
	}
 | 
			
		||||
	// write coder.Value bytes
 | 
			
		||||
	buf.Write(valBuf.Bytes())
 | 
			
		||||
 | 
			
		||||
	return buf.Bytes(), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func readByte(reader io.Reader) (byte, error) {
 | 
			
		||||
	var b [1]byte
 | 
			
		||||
	_, err := reader.Read(b[:])
 | 
			
		||||
	return b[0], err
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,88 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// DecodeState represents the state of decoding
 | 
			
		||||
type DecodeState struct {
 | 
			
		||||
	// ConsumedBytes is the bytes consumed by decoder
 | 
			
		||||
	ConsumedBytes int
 | 
			
		||||
	// SizeL is the bytes length of value
 | 
			
		||||
	SizeL int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DecodeToPrimitivePacket parse out whole buffer to a PrimitivePacket
 | 
			
		||||
//
 | 
			
		||||
// Examples:
 | 
			
		||||
// [0x01, 0x01, 0x01] -> Key=0x01, Value=0x01
 | 
			
		||||
// [0x41, 0x06, 0x03, 0x01, 0x61, 0x04, 0x01, 0x62] -> key=0x03, value=0x61; key=0x04, value=0x62
 | 
			
		||||
func DecodeToPrimitivePacket(buf []byte, p *PrimitivePacket) (*DecodeState, error) {
 | 
			
		||||
	decoder := &DecodeState{
 | 
			
		||||
		ConsumedBytes: 0,
 | 
			
		||||
		SizeL:         0,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if buf == nil || len(buf) < primitivePacketBufferMinimalLength {
 | 
			
		||||
		return decoder, errors.New("invalid packet minimal size")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	p.packet = &packet{
 | 
			
		||||
		valBuf: []byte{},
 | 
			
		||||
		buf:    &bytes.Buffer{},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var pos = 0
 | 
			
		||||
	// first byte is `Tag`
 | 
			
		||||
	p.tag = NewTag(buf[pos])
 | 
			
		||||
	p.buf.WriteByte(buf[pos])
 | 
			
		||||
	pos++
 | 
			
		||||
	decoder.ConsumedBytes = pos
 | 
			
		||||
 | 
			
		||||
	// read `Varint` from buf for `Length of value`
 | 
			
		||||
	tmpBuf := buf[pos:]
 | 
			
		||||
	var bufLen int32
 | 
			
		||||
	codec := encoding.VarCodec{}
 | 
			
		||||
	err := codec.DecodePVarInt32(tmpBuf, &bufLen)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return decoder, err
 | 
			
		||||
	}
 | 
			
		||||
	if codec.Size < 1 {
 | 
			
		||||
		return decoder, errors.New("malformed, size of Length can not smaller than 1")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// codec.Size describes how many bytes used to represent `Length`
 | 
			
		||||
	p.buf.Write(buf[pos : pos+codec.Size])
 | 
			
		||||
	pos += codec.Size
 | 
			
		||||
 | 
			
		||||
	decoder.ConsumedBytes = pos
 | 
			
		||||
	decoder.SizeL = codec.Size
 | 
			
		||||
 | 
			
		||||
	// if length<0, error on decoding
 | 
			
		||||
	if bufLen < 0 {
 | 
			
		||||
		return decoder, errors.New("invalid packet, negative length")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// the length of value
 | 
			
		||||
	p.length = int(bufLen)
 | 
			
		||||
	if p.length == 0 {
 | 
			
		||||
		p.valBuf = nil
 | 
			
		||||
		return decoder, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// the next `p.length` bytes store value
 | 
			
		||||
	endPos := pos + p.length
 | 
			
		||||
 | 
			
		||||
	if pos > endPos || endPos > len(buf) || pos > len(buf) {
 | 
			
		||||
		return decoder, fmt.Errorf("beyond the boundary, pos=%v, endPos=%v", pos, endPos)
 | 
			
		||||
	}
 | 
			
		||||
	p.valBuf = buf[pos:endPos]
 | 
			
		||||
	p.buf.Write(buf[pos:endPos])
 | 
			
		||||
 | 
			
		||||
	decoder.ConsumedBytes = endPos
 | 
			
		||||
	return decoder, nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,112 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// PrimitivePacketEncoder used for encode a primitive packet
 | 
			
		||||
type PrimitivePacketEncoder struct {
 | 
			
		||||
	*encoder
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewPrimitivePacketEncoder return an Encoder for primitive packet
 | 
			
		||||
func NewPrimitivePacketEncoder(sid byte) *PrimitivePacketEncoder {
 | 
			
		||||
	prim := &PrimitivePacketEncoder{
 | 
			
		||||
		encoder: &encoder{
 | 
			
		||||
			isNode: false,
 | 
			
		||||
			buf:    new(bytes.Buffer),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	prim.seqID = sid
 | 
			
		||||
	return prim
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetInt32Value encode int32 value
 | 
			
		||||
func (enc *PrimitivePacketEncoder) SetInt32Value(v int32) {
 | 
			
		||||
	size := encoding.SizeOfNVarInt32(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	enc.valbuf = make([]byte, size)
 | 
			
		||||
	err := codec.EncodeNVarInt32(enc.valbuf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetUInt32Value encode uint32 value
 | 
			
		||||
func (enc *PrimitivePacketEncoder) SetUInt32Value(v uint32) {
 | 
			
		||||
	size := encoding.SizeOfNVarUInt32(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	enc.valbuf = make([]byte, size)
 | 
			
		||||
	err := codec.EncodeNVarUInt32(enc.valbuf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetInt64Value encode int64 value
 | 
			
		||||
func (enc *PrimitivePacketEncoder) SetInt64Value(v int64) {
 | 
			
		||||
	size := encoding.SizeOfNVarInt64(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	enc.valbuf = make([]byte, size)
 | 
			
		||||
	err := codec.EncodeNVarInt64(enc.valbuf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetUInt64Value encode uint64 value
 | 
			
		||||
func (enc *PrimitivePacketEncoder) SetUInt64Value(v uint64) {
 | 
			
		||||
	size := encoding.SizeOfNVarUInt64(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	enc.valbuf = make([]byte, size)
 | 
			
		||||
	err := codec.EncodeNVarUInt64(enc.valbuf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetFloat32Value encode float32 value
 | 
			
		||||
func (enc *PrimitivePacketEncoder) SetFloat32Value(v float32) {
 | 
			
		||||
	var size = encoding.SizeOfVarFloat32(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	enc.valbuf = make([]byte, size)
 | 
			
		||||
	err := codec.EncodeVarFloat32(enc.valbuf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetFloat64Value encode float64 value
 | 
			
		||||
func (enc *PrimitivePacketEncoder) SetFloat64Value(v float64) {
 | 
			
		||||
	var size = encoding.SizeOfVarFloat64(v)
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	enc.valbuf = make([]byte, size)
 | 
			
		||||
	err := codec.EncodeVarFloat64(enc.valbuf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetBoolValue encode bool value
 | 
			
		||||
func (enc *PrimitivePacketEncoder) SetBoolValue(v bool) {
 | 
			
		||||
	var size = encoding.SizeOfPVarUInt32(uint32(1))
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	enc.valbuf = make([]byte, size)
 | 
			
		||||
	err := codec.EncodePVarBool(enc.valbuf, v)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetStringValue encode string
 | 
			
		||||
func (enc *PrimitivePacketEncoder) SetStringValue(v string) {
 | 
			
		||||
	enc.valbuf = []byte(v)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetBytesValue encode []byte
 | 
			
		||||
func (enc *PrimitivePacketEncoder) SetBytesValue(v []byte) {
 | 
			
		||||
	enc.valbuf = v
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,100 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// 最小长度为2个字节
 | 
			
		||||
const primitivePacketBufferMinimalLength = 2
 | 
			
		||||
 | 
			
		||||
// PrimitivePacket describes primitive value type,
 | 
			
		||||
type PrimitivePacket struct {
 | 
			
		||||
	*packet
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToInt32 parse raw as int32 value
 | 
			
		||||
func (p *PrimitivePacket) ToInt32() (int32, error) {
 | 
			
		||||
	var val int32
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.valBuf)}
 | 
			
		||||
	err := codec.DecodeNVarInt32(p.packet.valBuf, &val)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	return val, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToUInt32 parse raw as uint32 value
 | 
			
		||||
func (p *PrimitivePacket) ToUInt32() (uint32, error) {
 | 
			
		||||
	var val uint32
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.valBuf)}
 | 
			
		||||
	err := codec.DecodeNVarUInt32(p.valBuf, &val)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	return val, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToInt64 parse raw as int64 value
 | 
			
		||||
func (p *PrimitivePacket) ToInt64() (int64, error) {
 | 
			
		||||
	var val int64
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.valBuf)}
 | 
			
		||||
	err := codec.DecodeNVarInt64(p.valBuf, &val)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	return val, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToUInt64 parse raw as uint64 value
 | 
			
		||||
func (p *PrimitivePacket) ToUInt64() (uint64, error) {
 | 
			
		||||
	var val uint64
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.valBuf)}
 | 
			
		||||
	err := codec.DecodeNVarUInt64(p.valBuf, &val)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	return val, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToFloat32 parse raw as float32 value
 | 
			
		||||
func (p *PrimitivePacket) ToFloat32() (float32, error) {
 | 
			
		||||
	var val float32
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.valBuf)}
 | 
			
		||||
	err := codec.DecodeVarFloat32(p.valBuf, &val)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	return val, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToFloat64 parse raw as float64 value
 | 
			
		||||
func (p *PrimitivePacket) ToFloat64() (float64, error) {
 | 
			
		||||
	var val float64
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.valBuf)}
 | 
			
		||||
	err := codec.DecodeVarFloat64(p.valBuf, &val)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	return val, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToBool parse raw as bool value
 | 
			
		||||
func (p *PrimitivePacket) ToBool() (bool, error) {
 | 
			
		||||
	var val bool
 | 
			
		||||
	codec := encoding.VarCodec{Size: len(p.valBuf)}
 | 
			
		||||
	err := codec.DecodePVarBool(p.valBuf, &val)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return false, err
 | 
			
		||||
	}
 | 
			
		||||
	return val, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToUTF8String parse raw data as string value
 | 
			
		||||
func (p *PrimitivePacket) ToUTF8String() (string, error) {
 | 
			
		||||
	return string(p.valBuf), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToBytes returns raw buffer data
 | 
			
		||||
func (p *PrimitivePacket) ToBytes() []byte {
 | 
			
		||||
	return p.valBuf
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										26
									
								
								spec/spec.go
								
								
								
								
							
							
						
						
									
										26
									
								
								spec/spec.go
								
								
								
								
							| 
						 | 
				
			
			@ -1,26 +0,0 @@
 | 
			
		|||
package spec
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"io"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	maxSeqId     = 0x3F
 | 
			
		||||
	flagBitNode  = 0x80
 | 
			
		||||
	wipeFlagBits = 0x3F
 | 
			
		||||
	msb          = 0x80
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	errInvalidSeqId = errors.New("coder.Builder: SeqId should >= 0 and =< 0x3F")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func readByte(reader io.Reader) (byte, error) {
 | 
			
		||||
	var b [1]byte
 | 
			
		||||
	n, err := reader.Read(b[:])
 | 
			
		||||
	if n == 0 {
 | 
			
		||||
		return 0x00, err
 | 
			
		||||
	}
 | 
			
		||||
	return b[0], err
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,27 +0,0 @@
 | 
			
		|||
@startuml
 | 
			
		||||
namespace spec {
 | 
			
		||||
    class L << (S,Aquamarine) >> {
 | 
			
		||||
        - buf []byte
 | 
			
		||||
        - size int
 | 
			
		||||
        - len int
 | 
			
		||||
 | 
			
		||||
        + Bytes() []byte
 | 
			
		||||
        + Size() int
 | 
			
		||||
        + VSize() int
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
    class T << (S,Aquamarine) >> {
 | 
			
		||||
        + Sid() int
 | 
			
		||||
        + Bytes() []byte
 | 
			
		||||
        + IsNodeMode() bool
 | 
			
		||||
        + SetNodeMode(flag bool)
 | 
			
		||||
        + Size() int
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
    class spec.T << (T, #FF7700) >>  {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
"__builtin__.byte" #.. "spec.T"
 | 
			
		||||
@enduml
 | 
			
		||||
| 
						 | 
				
			
			@ -1,54 +0,0 @@
 | 
			
		|||
package spec
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"io"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// T is the Tag in a TLV structure
 | 
			
		||||
type T byte
 | 
			
		||||
 | 
			
		||||
// NewT returns a T with sequenceID. If this packet contains other
 | 
			
		||||
// packets, this packet will be a "node packet", the T of this packet
 | 
			
		||||
// will set MSB to T.
 | 
			
		||||
func NewT(seqId int) (T, error) {
 | 
			
		||||
	if seqId < 0 || seqId > maxSeqId {
 | 
			
		||||
		return 0, errInvalidSeqId
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return T(seqId), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Sid returns the sequenceId of this packet.
 | 
			
		||||
func (t T) Sid() int {
 | 
			
		||||
	return int(t & wipeFlagBits)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Bytes returns raw bytes of T.
 | 
			
		||||
func (t T) Bytes() []byte {
 | 
			
		||||
	return []byte{byte(t)}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// IsNodeMode will return true if this packet contains other packets.
 | 
			
		||||
// Otherwise, return false.
 | 
			
		||||
func (t T) IsNodeMode() bool {
 | 
			
		||||
	return t&flagBitNode == flagBitNode
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetNodeMode will set T to indicates this packet contains
 | 
			
		||||
// other packets.
 | 
			
		||||
func (t *T) SetNodeMode(flag bool) {
 | 
			
		||||
	if flag {
 | 
			
		||||
		*t |= flagBitNode
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Size return the size of T raw bytes.
 | 
			
		||||
func (t T) Size() int {
 | 
			
		||||
	return 1
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadT read T from a bufio.Reader
 | 
			
		||||
func ReadT(rd io.Reader) (T, error) {
 | 
			
		||||
	b, err := readByte(rd)
 | 
			
		||||
	return T(b), err
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,89 +0,0 @@
 | 
			
		|||
package spec
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"io"
 | 
			
		||||
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// L is the Length in a TLV structure
 | 
			
		||||
type L struct {
 | 
			
		||||
	buf  []byte
 | 
			
		||||
	size int
 | 
			
		||||
	len  int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewL will take an int type len as parameter and return L to
 | 
			
		||||
// represent the size of V in a TLV. an integer will be encoded as
 | 
			
		||||
// a PVarInt32 type to represent the value.
 | 
			
		||||
func NewL(len int) (L, error) {
 | 
			
		||||
	var l = L{}
 | 
			
		||||
	if len < -1 {
 | 
			
		||||
		return l, errors.New("y3.L: len can't less than -1")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	valLen := int32(len)
 | 
			
		||||
	l.size = encoding.SizeOfPVarInt32(valLen)
 | 
			
		||||
	codec := encoding.VarCodec{Size: l.size}
 | 
			
		||||
	tmp := make([]byte, l.size)
 | 
			
		||||
	err := codec.EncodePVarInt32(tmp, valLen)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	l.buf = make([]byte, l.size)
 | 
			
		||||
	copy(l.buf, tmp)
 | 
			
		||||
	l.len = len
 | 
			
		||||
	return l, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Bytes will return the raw bytes of L.
 | 
			
		||||
func (l L) Bytes() []byte {
 | 
			
		||||
	return l.buf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Size returns how many bytes used to represent this L.
 | 
			
		||||
func (l L) Size() int {
 | 
			
		||||
	return l.size
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// VSize returns the size of V.
 | 
			
		||||
func (l L) VSize() int {
 | 
			
		||||
	return int(l.len)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadL read L from bufio.Reader
 | 
			
		||||
func ReadL(r io.Reader) (*L, error) {
 | 
			
		||||
	lenBuf := bytes.Buffer{}
 | 
			
		||||
	for {
 | 
			
		||||
		b, err := readByte(r)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		lenBuf.WriteByte(b)
 | 
			
		||||
		if b&msb != msb {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	buf := lenBuf.Bytes()
 | 
			
		||||
 | 
			
		||||
	// decode to L
 | 
			
		||||
	length, err := decodeL(buf)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &L{
 | 
			
		||||
		buf:  buf,
 | 
			
		||||
		len:  int(length),
 | 
			
		||||
		size: len(buf),
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func decodeL(buf []byte) (length int32, err error) {
 | 
			
		||||
	codec := encoding.VarCodec{}
 | 
			
		||||
	err = codec.DecodePVarInt32(buf, &length)
 | 
			
		||||
	return length, err
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,115 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// StreamReader read an coder packet from io.Reader, and return
 | 
			
		||||
// the ValReader after decode out Tag and Len
 | 
			
		||||
type StreamReader struct {
 | 
			
		||||
	src io.Reader
 | 
			
		||||
	// Tag of a coder packet
 | 
			
		||||
	Tag byte
 | 
			
		||||
	// Len of a coder packet
 | 
			
		||||
	Len int
 | 
			
		||||
	// Val of a coder packet
 | 
			
		||||
	Val io.Reader
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewStreamParser create a new coder StreamReader
 | 
			
		||||
func NewStreamParser(reader io.Reader) *StreamReader {
 | 
			
		||||
	return &StreamReader{
 | 
			
		||||
		src: reader,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sr *StreamReader) GetValBuffer() ([]byte, error) {
 | 
			
		||||
	buf, err := io.ReadAll(sr.Val)
 | 
			
		||||
	return buf, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Do must run in a goroutine
 | 
			
		||||
func (sr *StreamReader) Do() error {
 | 
			
		||||
	if sr.src == nil {
 | 
			
		||||
		return errors.New("coder: nil source reader")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tag, err := readByte(sr.src)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// the first byte is coder.Tag
 | 
			
		||||
	sr.Tag = tag
 | 
			
		||||
 | 
			
		||||
	// read coder.Length bytes, a varInt format
 | 
			
		||||
	lenBuf := bytes.Buffer{}
 | 
			
		||||
	for {
 | 
			
		||||
		b, e := readByte(sr.src)
 | 
			
		||||
		if e != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		lenBuf.WriteByte(b)
 | 
			
		||||
		if b&0x80 != 0x80 {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// parse to coder.Length
 | 
			
		||||
	var length int32
 | 
			
		||||
	codec := encoding.VarCodec{}
 | 
			
		||||
	err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// validate len decoded from stream
 | 
			
		||||
	if length < 0 {
 | 
			
		||||
		return fmt.Errorf("coder: streamParse() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	sr.Len = int(length)
 | 
			
		||||
 | 
			
		||||
	// read next {len} bytes as coder.Value
 | 
			
		||||
	sr.Val = &valR{
 | 
			
		||||
		length: int(length),
 | 
			
		||||
		src:    sr.src,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type valR struct {
 | 
			
		||||
	length int
 | 
			
		||||
	off    int
 | 
			
		||||
	src    io.Reader
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *valR) Read(p []byte) (n int, err error) {
 | 
			
		||||
	if r.src == nil {
 | 
			
		||||
		return 0, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if r.off >= r.length {
 | 
			
		||||
		return 0, io.EOF
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	bound := len(p)
 | 
			
		||||
	if len(p) > r.length-r.off {
 | 
			
		||||
		bound = r.length - r.off
 | 
			
		||||
	}
 | 
			
		||||
	// update read
 | 
			
		||||
	r.off, err = r.src.Read(p[0:bound])
 | 
			
		||||
	return r.off, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func StreamReadPacket(reader io.Reader) (*StreamReader, error) {
 | 
			
		||||
	sp := NewStreamParser(reader)
 | 
			
		||||
	err := sp.Do()
 | 
			
		||||
	return sp, err
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,138 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"io"
 | 
			
		||||
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/encoding"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type StreamEncoder struct {
 | 
			
		||||
	tag    byte
 | 
			
		||||
	buf    *bytes.Buffer
 | 
			
		||||
	pBuf   *bytes.Buffer
 | 
			
		||||
	len    int
 | 
			
		||||
	sLen   int
 | 
			
		||||
	reader *yR
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewStreamEncoder(tag byte) *StreamEncoder {
 | 
			
		||||
	var se = &StreamEncoder{
 | 
			
		||||
		tag:  tag,
 | 
			
		||||
		buf:  new(bytes.Buffer),
 | 
			
		||||
		pBuf: new(bytes.Buffer),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return se
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (se *StreamEncoder) AddPacket(packet *PrimitivePacketEncoder) {
 | 
			
		||||
	node := packet.Encode()
 | 
			
		||||
	se.AddPacketBuffer(node)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (se *StreamEncoder) AddPacketBuffer(buf []byte) {
 | 
			
		||||
	se.pBuf.Write(buf)
 | 
			
		||||
	se.growLen(len(se.pBuf.Bytes()))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (se *StreamEncoder) AddStreamPacket(tag byte, length int, reader io.Reader) {
 | 
			
		||||
	se.sLen = length
 | 
			
		||||
	// s-Tag
 | 
			
		||||
	se.pBuf.WriteByte(tag)
 | 
			
		||||
	se.growLen(1)
 | 
			
		||||
	// calculate s-Len
 | 
			
		||||
	size := encoding.SizeOfPVarInt32(int32(length))
 | 
			
		||||
	codec := encoding.VarCodec{Size: size}
 | 
			
		||||
	tmp := make([]byte, size)
 | 
			
		||||
	err := codec.EncodePVarInt32(tmp, int32(length))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	se.pBuf.Write(tmp)
 | 
			
		||||
	se.growLen(size)
 | 
			
		||||
 | 
			
		||||
	// total buf
 | 
			
		||||
	se.buf.WriteByte(se.tag)
 | 
			
		||||
	se.growLen(length)
 | 
			
		||||
	// calculate total Len buf
 | 
			
		||||
	size = encoding.SizeOfPVarInt32(int32(se.len))
 | 
			
		||||
	codec = encoding.VarCodec{Size: size}
 | 
			
		||||
	tmp = make([]byte, size)
 | 
			
		||||
	err = codec.EncodePVarInt32(tmp, int32(se.len))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	se.buf.Write(tmp) //lenBuf
 | 
			
		||||
	se.buf.Write(se.pBuf.Bytes())
 | 
			
		||||
	se.growLen(size) // total length
 | 
			
		||||
	se.growLen(1)    // parent tag
 | 
			
		||||
 | 
			
		||||
	se.reader = &yR{
 | 
			
		||||
		buf:    se.buf,
 | 
			
		||||
		src:    reader,
 | 
			
		||||
		length: se.len,
 | 
			
		||||
		sLen:   se.sLen,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (se *StreamEncoder) GetReader() io.Reader {
 | 
			
		||||
	if se.reader != nil {
 | 
			
		||||
		return se.reader
 | 
			
		||||
	}
 | 
			
		||||
	return new(bytes.Buffer)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Pipe can pipe data to os.StdOut
 | 
			
		||||
func (se *StreamEncoder) Pipe(writer io.Writer) {
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (se *StreamEncoder) GetLen() int {
 | 
			
		||||
	if se.reader != nil {
 | 
			
		||||
		return se.len
 | 
			
		||||
	}
 | 
			
		||||
	return 0
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (se *StreamEncoder) growLen(step int) {
 | 
			
		||||
	se.len += step
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type yR struct {
 | 
			
		||||
	src    io.Reader
 | 
			
		||||
	buf    *bytes.Buffer
 | 
			
		||||
	length int
 | 
			
		||||
	off    int
 | 
			
		||||
	sLen   int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *yR) Read(p []byte) (n int, err error) {
 | 
			
		||||
	if r.src == nil {
 | 
			
		||||
		return 0, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if r.off >= r.length {
 | 
			
		||||
		return 0, io.EOF
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if r.off < r.length-r.sLen {
 | 
			
		||||
		n, err = r.buf.Read(p)
 | 
			
		||||
		r.off += n
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if err == io.EOF {
 | 
			
		||||
				return n, nil
 | 
			
		||||
			} else {
 | 
			
		||||
				return 0, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return n, nil
 | 
			
		||||
	} else {
 | 
			
		||||
		n, err = r.src.Read(p)
 | 
			
		||||
		r.off += n
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return n, err
 | 
			
		||||
		}
 | 
			
		||||
		return n, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,38 @@
 | 
			
		|||
package mq_coder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"git.hpds.cc/Component/mq_coder/utils"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Tag represents the Tag of TLV,
 | 
			
		||||
// MSB used to represent the packet type, 0x80 means a node packet, otherwise is a primitive packet.
 | 
			
		||||
// Low 7 bits represent Sequence ID, like `key` in JSON format
 | 
			
		||||
type Tag struct {
 | 
			
		||||
	raw byte
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// IsNode returns true is MSB is 1.
 | 
			
		||||
func (t *Tag) IsNode() bool {
 | 
			
		||||
	return t.raw&utils.MSB == utils.MSB
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// IsSlice determine if the current node is a Slice
 | 
			
		||||
func (t *Tag) IsSlice() bool {
 | 
			
		||||
	return t.raw&utils.SliceFlag == utils.SliceFlag
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SeqId get the sequence ID, as key in JSON format
 | 
			
		||||
func (t *Tag) SeqId() byte {
 | 
			
		||||
	//return t.raw & packet utils.DropMSB
 | 
			
		||||
	return t.raw & utils.DropMSBArrayFlag
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewTag create a NodePacket Tag field
 | 
			
		||||
func NewTag(b byte) *Tag {
 | 
			
		||||
	return &Tag{raw: b}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Raw return the original byte
 | 
			
		||||
func (t *Tag) Raw() byte {
 | 
			
		||||
	return t.raw
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,18 @@
 | 
			
		|||
package utils
 | 
			
		||||
 | 
			
		||||
// MSB is `1000 0000` describes this is a node packet, otherwise, is a primitive packet
 | 
			
		||||
const MSB byte = 0x80
 | 
			
		||||
 | 
			
		||||
// DropMSB is `0111 1111`, used to remove MSB flag bit
 | 
			
		||||
const DropMSB byte = 0x3F
 | 
			
		||||
 | 
			
		||||
// SliceFlag is `0100 0000`, describes this packet is a Slice type
 | 
			
		||||
const SliceFlag byte = 0x40
 | 
			
		||||
 | 
			
		||||
// DropMSBArrayFlag is `0011 1111`, used to remove MSB and Slice flag bit
 | 
			
		||||
const DropMSBArrayFlag byte = 0x3F
 | 
			
		||||
 | 
			
		||||
// IsNodePacket returns true if the tag represents a node package
 | 
			
		||||
func IsNodePacket(tag byte) bool {
 | 
			
		||||
	return tag&MSB == MSB
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue