diff --git a/chunkedVReader.go b/chunkedVReader.go deleted file mode 100644 index 33be7c6..0000000 --- a/chunkedVReader.go +++ /dev/null @@ -1,72 +0,0 @@ -package mq_coder - -import ( - "bytes" - "io" - "io/ioutil" -) - -type chunkVReader struct { - src io.Reader // the reader parts of V - buf *bytes.Buffer // the bytes part of V - totalSize int // size of whole buffer of this packet - off int // last read op - ChunkVSize int // the size of chunked V -} - -// Read implement io.Reader interface -func (r *chunkVReader) Read(p []byte) (n int, err error) { - if r.src == nil { - return 0, nil - } - - if r.off >= r.totalSize { - return 0, io.EOF - } - - if r.off < r.totalSize-r.ChunkVSize { - n, err := r.buf.Read(p) - r.off += n - if err != nil { - if err == io.EOF { - return n, nil - } else { - return 0, err - } - } - return n, nil - } - n, err = r.src.Read(p) - r.off += n - if err != nil { - return n, err - } - return n, nil -} - -// WriteTo implement io.WriteTo interface -func (r *chunkVReader) WriteTo(w io.Writer) (n int64, err error) { - if r.src == nil { - return 0, nil - } - - // first, write existed buffer - m, err := w.Write(r.buf.Bytes()) - if err != nil { - return 0, err - } - n += int64(m) - - // last, write from reader - buf, err := ioutil.ReadAll(r.src) - if err != nil && err != io.EOF { - return 0, errWriteFromReader - } - m, err = w.Write(buf) - if err != nil { - return 0, err - } - - n += int64(m) - return n, nil -} diff --git a/coder.go b/coder.go deleted file mode 100644 index 9f86d98..0000000 --- a/coder.go +++ /dev/null @@ -1,54 +0,0 @@ -package mq_coder - -import ( - "errors" - "io" -) - -var ( - errBuildIncomplete = errors.New("coder.Encoder: invalid structure of packet") - errInvalidAdding = errors.New("coder.Encoder: can not add this Packet after StreamPacket has been add") - errNonStreamPacket = errors.New("coder.Packet: this packet is not in node mode") - errWriteFromReader = errors.New("coder.streamV: write from reader error") - errNotNodeMode = errors.New("coder.Encoder: packet should be in node mode can be add other packets as child") - errNilReader = errors.New("coder.Decoder: nil source reader") -) - -// Packet 编解码器包 -type Packet interface { - // SeqId 返回此数据包的序列Id. - SeqId() int - // Size 返回整个数据包的大小 - Size() int - // VSize 返回V的大小. - VSize() int - // Bytes 此数据包的全部字节. - Bytes() []byte - // Reader 返回 io.Reader 字节. - Reader() io.Reader - // VReader 返回 io.Reader 的 流字节. - VReader() io.Reader - // IsStreamMode 是否流模式 - IsStreamMode() bool - // IsNodeMode 是否节点模式 - IsNodeMode() bool - - // BytesV 返回流字节 - BytesV() []byte - // Utf8StringV 返回流的utf8字符串值 - Utf8StringV() string - // Int32V 返回流的int32值 - Int32V() (val int32, err error) - // UInt32V 返回流的uint32值 - UInt32V() (val uint32, err error) - // Int64V 返回流的int64值 - Int64V() (val int64, err error) - // UInt64V 返回流的uint64值 - UInt64V() (val uint64, err error) - // Float32V 返回流的float32值 - Float32V() (val float32, err error) - // Float64V 返回流的float64值 - Float64V() (val float64, err error) - // BoolV 返回流的bool值 - BoolV() (val bool, err error) -} diff --git a/decoder.go b/decoder.go deleted file mode 100644 index 3fcd5cb..0000000 --- a/decoder.go +++ /dev/null @@ -1,93 +0,0 @@ -package mq_coder - -import ( - "bytes" - "io" - - "git.hpds.cc/Component/mq_coder/spec" -) - -// Decoder is the tool for decoding packet from stream -type Decoder struct { - tag spec.T - len *spec.L - rd io.Reader -} - -// NewDecoder returns a Decoder from an io.Reader -func NewDecoder(reader io.Reader) *Decoder { - return &Decoder{ - rd: reader, - } -} - -// SeqId return the SequenceID of the decoding packet -func (d *Decoder) SeqId() int { - return d.tag.Sid() -} - -// UnderlyingReader returns the reader this decoder using -func (d *Decoder) UnderlyingReader() io.Reader { - return d.rd -} - -// ReadHeader will block until io.EOF or receive T and L of a packet. -func (d *Decoder) ReadHeader() error { - // only read T and L - return d.readTL() -} - -// GetChunkedPacket will block until io.EOF or receive V of a packet in chunked mode. -func (d *Decoder) GetChunkedPacket() Packet { - return &StreamPacket{ - t: d.tag, - l: *d.len, - vr: d.rd, - chunkMode: true, - chunkSize: d.len.VSize(), - } -} - -// GetFullFilledPacket read full Packet from given io.Reader -func (d *Decoder) GetFullFilledPacket() (packet Packet, err error) { - // read V - buf := new(bytes.Buffer) - total := 0 - for { - valBuf := make([]byte, d.len.VSize()) - n, err := d.rd.Read(valBuf) - if n > 0 { - total += n - buf.Write(valBuf[:n]) - } - if total >= d.len.VSize() || err != nil { - break - } - } - - packet = &StreamPacket{ - t: d.tag, - l: *d.len, - vBuf: buf.Bytes(), - chunkMode: false, - } - - return packet, nil -} - -func (d *Decoder) readTL() (err error) { - if d.rd == nil { - return errNilReader - } - - // read T - d.tag, err = spec.ReadT(d.rd) - if err != nil { - return err - } - - // read L - d.len, err = spec.ReadL(d.rd) - - return err -} diff --git a/encoder.go b/encoder.go index dc627d8..43245eb 100644 --- a/encoder.go +++ b/encoder.go @@ -2,157 +2,76 @@ package mq_coder import ( "bytes" - "io" - - "git.hpds.cc/Component/mq_coder/spec" + "fmt" + "git.hpds.cc/Component/mq_coder/encoding" ) -// Encoder is the tool for creating a packet easily -type Encoder struct { - tag spec.T - len *spec.L - valReader io.Reader - valReaderSize int - nodes map[int]Packet - state int - size int32 // size of value - isStreamMode bool - valBuf *bytes.Buffer - done bool - seqId int - isNodeMode bool +// Encoder will encode object to encoding +type encoder struct { + seqID byte + valbuf []byte + isNode bool + isArray bool + buf *bytes.Buffer + complete bool } -// SetSeqId set sequenceId of a packet, if this packet contains other -// packets, isNode should set to true -func (b *Encoder) SetSeqId(seqId int, isNode bool) { - // init - b.valBuf = new(bytes.Buffer) - b.nodes = make(map[int]Packet) - // set seqId - b.seqId = seqId - b.isNodeMode = isNode +type iEncoder interface { + Encode() []byte } -// SetBytesV set bytes type as V -func (b *Encoder) SetBytesV(buf []byte) { - b.size += int32(len(buf)) - b.valBuf.Write(buf) - b.isStreamMode = false - b.state |= 0x04 +func (enc *encoder) GetValBuf() []byte { + return enc.valbuf } -// SetReaderV set io.Reader type as V -func (b *Encoder) SetReaderV(r io.Reader, size int) { - b.isStreamMode = true - b.valReader = r - b.state |= 0x04 - b.size += int32(size) - b.valReaderSize = size +func (enc *encoder) IsEmpty() bool { + return len(enc.valbuf) == 0 } -// AddPacket add a Packet child to this packet, this packet must be NodeMode -func (b *Encoder) AddPacket(child Packet) error { - // only packet is in node mode can add other packets - if !b.isNodeMode { - return errNotNodeMode +func (enc *encoder) AddBytes(buf []byte) { + enc.valbuf = append(enc.valbuf, buf...) +} + +func (enc *encoder) addRawPacket(en iEncoder) { + enc.valbuf = append(enc.valbuf, en.Encode()...) +} + +// setTag write tag as seqID +func (enc *encoder) writeTag() { + if enc.seqID > 0x3F { + panic(fmt.Errorf("sid should be in [0..0x3F]")) } - - if b.done { - return errInvalidAdding + if enc.isNode { + enc.seqID = enc.seqID | 0x80 } - b.nodes[child.SeqId()] = child - buf := child.Bytes() - b.SetBytesV(buf) - return nil + if enc.isArray { + enc.seqID = enc.seqID | 0x40 + } + enc.buf.WriteByte(enc.seqID) } -// AddStreamPacket will put a StreamPacket in chunked mode to current packet. -func (b *Encoder) AddStreamPacket(child Packet) (err error) { - // if this packet is in stream mode, can not add any packets - if b.done { - return errInvalidAdding - } - - // only accept packet in stream mode - if !child.IsStreamMode() { - return errNonStreamPacket - } - - // set the valReader of this packet to the child's - b.valReader = child.VReader() - - // valReaderSize will be the same as child's - b.valReaderSize = child.VSize() - // add this child packet - b.nodes[child.SeqId()] = child - // add the size of child's V to L of this packet - b.size += int32(child.Size()) - // put the bytes of child to valBuf - buf := child.Bytes() - b.valBuf.Write(buf) - // update state - b.state |= 0x04 - b.isStreamMode = true - b.done = true - return nil -} - -// Packet return a Packet instance. -func (b *Encoder) Packet() (Packet, error) { - err := b.generateT() +func (enc *encoder) writeLengthBuf() { + vallen := len(enc.valbuf) + size := encoding.SizeOfPVarInt32(int32(vallen)) + codec := encoding.VarCodec{Size: size} + tmp := make([]byte, size) + err := codec.EncodePVarInt32(tmp, int32(vallen)) if err != nil { - return nil, err + panic(err) } - - err = b.generateL() - if err != nil { - return nil, err - } - - if b.state != 0x07 { - return nil, errBuildIncomplete - } - - if b.isStreamMode { - return &StreamPacket{ - t: b.tag, - l: *b.len, - vr: b.valReader, - vBuf: b.valBuf.Bytes(), - chunkMode: true, - chunkSize: b.valReaderSize, - }, err - } - - // not streaming mode - return &StreamPacket{ - t: b.tag, - l: *b.len, - vBuf: b.valBuf.Bytes(), - chunkMode: false, - }, err + enc.buf.Write(tmp) } -// will generate T of a TLV. -func (b *Encoder) generateT() error { - t, err := spec.NewT(b.seqId) - t.SetNodeMode(b.isNodeMode) - if err != nil { - return err +// Encode returns a final Y3 encoded byte slice +func (enc *encoder) Encode() []byte { + if !enc.complete { + // Tag + enc.writeTag() + // Len + enc.writeLengthBuf() + // Val + enc.buf.Write(enc.valbuf) + enc.complete = true } - b.tag = t - b.state |= 0x01 - return nil -} - -// will generate L of a TLV. -func (b *Encoder) generateL() error { - l, err := spec.NewL(int(b.size)) - if err != nil { - return err - } - b.len = &l - b.state |= 0x02 - return nil + return enc.buf.Bytes() } diff --git a/encoder_sugar.go b/encoder_sugar.go deleted file mode 100644 index 1bc92f9..0000000 --- a/encoder_sugar.go +++ /dev/null @@ -1,98 +0,0 @@ -package mq_coder - -import ( - "git.hpds.cc/Component/mq_coder/encoding" -) - -// SetUtf8StringV set utf-8 string type value as V -func (b *Encoder) SetUtf8StringV(v string) { - buf := []byte(v) - b.SetBytesV(buf) -} - -// SetInt32V set an int32 type value as V -func (b *Encoder) SetInt32V(v int32) error { - size := encoding.SizeOfNVarInt32(v) - codec := encoding.VarCodec{Size: size} - buf := make([]byte, size) - err := codec.EncodeNVarInt32(buf, v) - if err != nil { - return err - } - b.SetBytesV(buf) - return nil -} - -// SetUInt32V set an uint32 type value as V -func (b *Encoder) SetUInt32V(v uint32) error { - size := encoding.SizeOfNVarUInt32(v) - codec := encoding.VarCodec{Size: size} - buf := make([]byte, size) - err := codec.EncodeNVarUInt32(buf, v) - if err != nil { - return err - } - b.SetBytesV(buf) - return nil -} - -// SetInt64V set an int64 type value as V -func (b *Encoder) SetInt64V(v int64) error { - size := encoding.SizeOfNVarInt64(v) - codec := encoding.VarCodec{Size: size} - buf := make([]byte, size) - err := codec.EncodeNVarInt64(buf, v) - if err != nil { - return err - } - b.SetBytesV(buf) - return nil -} - -// SetUInt64V set an uint64 type value as V -func (b *Encoder) SetUInt64V(v uint64) error { - size := encoding.SizeOfNVarUInt64(v) - codec := encoding.VarCodec{Size: size} - buf := make([]byte, size) - err := codec.EncodeNVarUInt64(buf, v) - if err != nil { - return err - } - b.SetBytesV(buf) - return nil -} - -// SetFloat32V set a float32 type value as V -func (b *Encoder) SetFloat32V(v float32) error { - size := encoding.SizeOfVarFloat32(v) - codec := encoding.VarCodec{Size: size} - buf := make([]byte, size) - err := codec.EncodeVarFloat32(buf, v) - if err != nil { - return err - } - b.SetBytesV(buf) - return nil -} - -// SetFloat64V set a float64 type value as V -func (b *Encoder) SetFloat64V(v float64) error { - size := encoding.SizeOfVarFloat64(v) - codec := encoding.VarCodec{Size: size} - buf := make([]byte, size) - err := codec.EncodeVarFloat64(buf, v) - if err != nil { - return err - } - b.SetBytesV(buf) - return nil -} - -// SetBoolV set bool type value as V -func (b *Encoder) SetBoolV(v bool) { - var size = encoding.SizeOfPVarUInt32(uint32(1)) - codec := encoding.VarCodec{Size: size} - buf := make([]byte, size) - _ = codec.EncodePVarBool(buf, v) - b.SetBytesV(buf) -} diff --git a/node_decoder.go b/node_decoder.go new file mode 100644 index 0000000..1bf89bf --- /dev/null +++ b/node_decoder.go @@ -0,0 +1,96 @@ +package mq_coder + +import ( + "bytes" + "errors" + + "git.hpds.cc/Component/mq_coder/encoding" + "git.hpds.cc/Component/mq_coder/utils" +) + +func parsePayload(b []byte) (consumedBytes int, ifNodePacket bool, np *NodePacket, pp *PrimitivePacket, err error) { + if len(b) == 0 { + return 0, false, nil, nil, errors.New("parsePacket params can not be nil") + } + + pos := 0 + // NodePacket + if ok := utils.IsNodePacket(b[pos]); ok { + np = &NodePacket{} + endPos, err := DecodeToNodePacket(b, np) + return endPos, true, np, nil, err + } + + pp = &PrimitivePacket{} + state, err := DecodeToPrimitivePacket(b, pp) + return state.ConsumedBytes, false, nil, pp, err +} + +// DecodeToNodePacket parse out whole buffer to a NodePacket +func DecodeToNodePacket(buf []byte, pct *NodePacket) (consumedBytes int, err error) { + if len(buf) == 0 { + return 0, errors.New("empty buf") + } + + pct.packet = &packet{ + valBuf: buf, + buf: &bytes.Buffer{}, + } + + pct.NodePackets = map[byte]NodePacket{} + pct.PrimitivePackets = map[byte]PrimitivePacket{} + + pos := 0 + + // `Tag` + tag := NewTag(buf[pos]) + pct.packet.tag = tag + pct.buf.WriteByte(buf[pos]) + pos++ + + // `Length`: the type is `varint` + tmpBuf := buf[pos:] + var valLen int32 + codec := encoding.VarCodec{} + err = codec.DecodePVarInt32(tmpBuf, &valLen) + if err != nil { + return 0, err + } + pct.packet.length = int(valLen) + pct.buf.Write(buf[pos : pos+codec.Size]) + pos += codec.Size + // if `Length` is 0, means empty node packet + if valLen == 0 { + return pos, nil + } + + // `Value` + // `raw` is pct.Length() length + vl := int(valLen) + if vl < 0 { + return pos, errors.New("found L of V smaller than 0") + } + endPos := pos + vl + pct.packet.valBuf = buf[pos:endPos] + pct.buf.Write(buf[pos:endPos]) + + // Parse value to Packet + for { + if pos >= endPos || pos >= len(buf) { + break + } + _p, isNode, np, pp, err := parsePayload(buf[pos:endPos]) + pos += _p + if err != nil { + return 0, err + } + if isNode { + pct.NodePackets[np.packet.tag.SeqId()] = *np + } else { + pct.PrimitivePackets[byte(pp.SeqId())] = *pp + } + } + + consumedBytes = endPos + return consumedBytes, nil +} diff --git a/node_encoder.go b/node_encoder.go new file mode 100644 index 0000000..381b25f --- /dev/null +++ b/node_encoder.go @@ -0,0 +1,33 @@ +package mq_coder + +import ( + "bytes" +) + +// NodePacketEncoder used for encode a node packet +type NodePacketEncoder struct { + *encoder +} + +// NewNodePacketEncoder returns an Encoder for node packet +func NewNodePacketEncoder(sid byte) *NodePacketEncoder { + nodeEnc := &NodePacketEncoder{ + encoder: &encoder{ + isNode: true, + buf: new(bytes.Buffer), + }, + } + + nodeEnc.seqID = sid + return nodeEnc +} + +// AddNodePacket add new node to this node +func (enc *NodePacketEncoder) AddNodePacket(np *NodePacketEncoder) { + enc.addRawPacket(np) +} + +// AddPrimitivePacket add new primitive to this node +func (enc *NodePacketEncoder) AddPrimitivePacket(np *PrimitivePacketEncoder) { + enc.addRawPacket(np) +} diff --git a/node_packet.go b/node_packet.go new file mode 100644 index 0000000..e37a125 --- /dev/null +++ b/node_packet.go @@ -0,0 +1,10 @@ +package mq_coder + +// NodePacket describes complex values +type NodePacket struct { + *packet + // NodePackets store all the node packets + NodePackets map[byte]NodePacket + // PrimitivePackets store all the primitive packets + PrimitivePackets map[byte]PrimitivePacket +} diff --git a/packet.go b/packet.go index b4328ca..853461d 100644 --- a/packet.go +++ b/packet.go @@ -2,155 +2,37 @@ package mq_coder import ( "bytes" - "io" - - "git.hpds.cc/Component/mq_coder/encoding" - "git.hpds.cc/Component/mq_coder/spec" ) -// StreamPacket implement the Packet interface. -type StreamPacket struct { - t spec.T - l spec.L - vBuf []byte - vr io.Reader - chunkMode bool - chunkSize int +// packet is the base type of the NodePacket and PrimitivePacket +type packet struct { + tag *Tag + length int + valBuf []byte + buf *bytes.Buffer } -var _ Packet = &StreamPacket{} - -// SeqId returns the sequenceId of this packet -func (p *StreamPacket) SeqId() int { return p.t.Sid() } - -// Size returns the size of whole packet. -func (p *StreamPacket) Size() int { - // T.Size + L.Size + V.Size - return p.t.Size() + p.l.Size() + p.l.VSize() +// GetRawBytes get all raw bytes of this packet +func (bp *packet) GetRawBytes() []byte { + return bp.buf.Bytes() } -// VSize returns the size of V. -func (p *StreamPacket) VSize() int { return p.l.VSize() } - -// Bytes return the raw bytes of this packet. V will be absent if -// is in chunked mode -func (p *StreamPacket) Bytes() []byte { - buf := new(bytes.Buffer) - // the raw bytes of T and L - p.writeTL(buf) - // p.valBuf stores the raw bytes of V - buf.Write(p.vBuf) - - return buf.Bytes() +// Length return the length of Val this packet +func (bp *packet) Length() int { + return bp.length } -// VReader return an io.Reader which can be read as the content of V. -func (p *StreamPacket) VReader() io.Reader { - if !p.chunkMode { - return bytes.NewReader(p.vBuf) - } - return p.vr +// SeqId returns Tag of this packet +func (bp *packet) SeqId() byte { + return bp.tag.SeqId() } -// Reader return an io.Reader which can be read as the whole bytes of -// this packet. This function only available if this V of packet is in -// chunked mode. -func (p *StreamPacket) Reader() io.Reader { - if !p.chunkMode { - buf := new(bytes.Buffer) - buf.Write(p.t.Bytes()) - buf.Write(p.l.Bytes()) - buf.Write(p.vBuf) - return buf - } - - buf := new(bytes.Buffer) - // T and L of this packet - p.writeTL(buf) - // V of this packet - buf.Write(p.vBuf) - - return &chunkVReader{ - buf: buf, - src: p.vr, - totalSize: p.Size(), - ChunkVSize: p.VSize(), - } +// IsSlice determine if the current node is a Slice +func (bp *packet) IsSlice() bool { + return bp.tag.IsSlice() } -// IsStreamMode returns a bool value indicates if the V of -// this packet is in stream mode -func (p *StreamPacket) IsStreamMode() bool { - return p.chunkMode -} - -// IsNodeMode returns a bool value indicates if this packet -// is node mode -func (p *StreamPacket) IsNodeMode() bool { - return p.t.IsNodeMode() -} - -// write the raw bytes of T and L to given buf -func (p *StreamPacket) writeTL(buf *bytes.Buffer) { - buf.Write(p.t.Bytes()) - buf.Write(p.l.Bytes()) -} - -// BytesV return V as bytes -func (p *StreamPacket) BytesV() []byte { - return p.vBuf -} - -// UTF8StringV return V as utf-8 string -func (p *StreamPacket) Utf8StringV() string { - return string(p.vBuf) -} - -// Int32V return V as int32 -func (p *StreamPacket) Int32V() (val int32, err error) { - codec := encoding.VarCodec{Size: len(p.vBuf)} - err = codec.DecodeNVarInt32(p.vBuf, &val) - return val, err -} - -// UInt32V return V as uint32 -func (p *StreamPacket) UInt32V() (val uint32, err error) { - codec := encoding.VarCodec{Size: len(p.vBuf)} - err = codec.DecodeNVarUInt32(p.vBuf, &val) - return val, err -} - -// Int64V return V as int64 -func (p *StreamPacket) Int64V() (val int64, err error) { - codec := encoding.VarCodec{Size: len(p.vBuf)} - err = codec.DecodeNVarInt64(p.vBuf, &val) - return val, err -} - -// UInt64V return V as uint64 -func (p *StreamPacket) UInt64V() (val uint64, err error) { - codec := encoding.VarCodec{Size: len(p.vBuf)} - err = codec.DecodeNVarUInt64(p.vBuf, &val) - return val, err -} - -// Float32V return V as float32 -func (p *StreamPacket) Float32V() (val float32, err error) { - codec := encoding.VarCodec{Size: len(p.vBuf)} - err = codec.DecodeVarFloat32(p.vBuf, &val) - return val, err -} - -// Float64V return V as float64 -func (p *StreamPacket) Float64V() (val float64, err error) { - codec := encoding.VarCodec{Size: len(p.vBuf)} - err = codec.DecodeVarFloat64(p.vBuf, &val) - return val, err -} - -// BoolV return V as bool -func (p *StreamPacket) BoolV() (val bool, err error) { - codec := encoding.VarCodec{Size: len(p.vBuf)} - err = codec.DecodePVarBool(p.vBuf, &val) - return val, err +// GetValBuf get raw buffer of Val of this packet +func (bp *packet) GetValBuf() []byte { + return bp.valBuf } diff --git a/parser.go b/parser.go new file mode 100644 index 0000000..a44d113 --- /dev/null +++ b/parser.go @@ -0,0 +1,100 @@ +package mq_coder + +import ( + "bytes" + "errors" + "fmt" + "io" + + "git.hpds.cc/Component/mq_coder/encoding" +) + +var ( + ErrMalformed = errors.New("coder.ReadPacket: malformed") +) + +// ReadPacket will try to read encoded packet from the reader +func ReadPacket(reader io.Reader) ([]byte, error) { + tag, err := readByte(reader) + if err != nil { + return nil, err + } + // buf will contain a complete encoded handshakeFrame + buf := bytes.Buffer{} + + // the first byte is coder.Tag + // write coder.Tag bytes + buf.WriteByte(tag) + + // read y3.Length bytes, a varInt format + lenBuf := bytes.Buffer{} + for { + b, e := readByte(reader) + if e != nil { + return nil, err + } + lenBuf.WriteByte(b) + if b&0x80 != 0x80 { + break + } + } + + // parse to coder.Length + var length int32 + codec := encoding.VarCodec{} + err = codec.DecodePVarInt32(lenBuf.Bytes(), &length) + if err != nil { + return nil, ErrMalformed + } + + // validate len decoded from stream + if length < 0 { + return nil, fmt.Errorf("coder.ReadPacket() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length) + } + + // write coder.Length bytes + buf.Write(lenBuf.Bytes()) + + // read next {len} bytes as coder.Value + valBuf := bytes.Buffer{} + + // every batch read 512 bytes, if next reads < 512, read + var count int + for { + batchReadSize := 1024 * 1024 + var tmpBuf []byte + if int(length)-count < batchReadSize { + tmpBuf = make([]byte, int(length)-count) + } else { + tmpBuf = make([]byte, batchReadSize) + } + p, e := reader.Read(tmpBuf) + count += p + if e != nil { + if e == io.EOF { + valBuf.Write(tmpBuf[:p]) + break + } + return nil, fmt.Errorf("y3 parse valBuf error: %v", err) + } + valBuf.Write(tmpBuf[:p]) + if count == int(length) { + break + } + } + + if count < int(length) { + // return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count) + return nil, ErrMalformed + } + // write coder.Value bytes + buf.Write(valBuf.Bytes()) + + return buf.Bytes(), nil +} + +func readByte(reader io.Reader) (byte, error) { + var b [1]byte + _, err := reader.Read(b[:]) + return b[0], err +} diff --git a/primitive_decoder.go b/primitive_decoder.go new file mode 100644 index 0000000..36ebf07 --- /dev/null +++ b/primitive_decoder.go @@ -0,0 +1,88 @@ +package mq_coder + +import ( + "bytes" + "errors" + "fmt" + + "git.hpds.cc/Component/mq_coder/encoding" +) + +// DecodeState represents the state of decoding +type DecodeState struct { + // ConsumedBytes is the bytes consumed by decoder + ConsumedBytes int + // SizeL is the bytes length of value + SizeL int +} + +// DecodeToPrimitivePacket parse out whole buffer to a PrimitivePacket +// +// Examples: +// [0x01, 0x01, 0x01] -> Key=0x01, Value=0x01 +// [0x41, 0x06, 0x03, 0x01, 0x61, 0x04, 0x01, 0x62] -> key=0x03, value=0x61; key=0x04, value=0x62 +func DecodeToPrimitivePacket(buf []byte, p *PrimitivePacket) (*DecodeState, error) { + decoder := &DecodeState{ + ConsumedBytes: 0, + SizeL: 0, + } + + if buf == nil || len(buf) < primitivePacketBufferMinimalLength { + return decoder, errors.New("invalid packet minimal size") + } + + p.packet = &packet{ + valBuf: []byte{}, + buf: &bytes.Buffer{}, + } + + var pos = 0 + // first byte is `Tag` + p.tag = NewTag(buf[pos]) + p.buf.WriteByte(buf[pos]) + pos++ + decoder.ConsumedBytes = pos + + // read `Varint` from buf for `Length of value` + tmpBuf := buf[pos:] + var bufLen int32 + codec := encoding.VarCodec{} + err := codec.DecodePVarInt32(tmpBuf, &bufLen) + if err != nil { + return decoder, err + } + if codec.Size < 1 { + return decoder, errors.New("malformed, size of Length can not smaller than 1") + } + + // codec.Size describes how many bytes used to represent `Length` + p.buf.Write(buf[pos : pos+codec.Size]) + pos += codec.Size + + decoder.ConsumedBytes = pos + decoder.SizeL = codec.Size + + // if length<0, error on decoding + if bufLen < 0 { + return decoder, errors.New("invalid packet, negative length") + } + + // the length of value + p.length = int(bufLen) + if p.length == 0 { + p.valBuf = nil + return decoder, nil + } + + // the next `p.length` bytes store value + endPos := pos + p.length + + if pos > endPos || endPos > len(buf) || pos > len(buf) { + return decoder, fmt.Errorf("beyond the boundary, pos=%v, endPos=%v", pos, endPos) + } + p.valBuf = buf[pos:endPos] + p.buf.Write(buf[pos:endPos]) + + decoder.ConsumedBytes = endPos + return decoder, nil +} diff --git a/primitive_encoder.go b/primitive_encoder.go new file mode 100644 index 0000000..415cdab --- /dev/null +++ b/primitive_encoder.go @@ -0,0 +1,112 @@ +package mq_coder + +import ( + "bytes" + + "git.hpds.cc/Component/mq_coder/encoding" +) + +// PrimitivePacketEncoder used for encode a primitive packet +type PrimitivePacketEncoder struct { + *encoder +} + +// NewPrimitivePacketEncoder return an Encoder for primitive packet +func NewPrimitivePacketEncoder(sid byte) *PrimitivePacketEncoder { + prim := &PrimitivePacketEncoder{ + encoder: &encoder{ + isNode: false, + buf: new(bytes.Buffer), + }, + } + + prim.seqID = sid + return prim +} + +// SetInt32Value encode int32 value +func (enc *PrimitivePacketEncoder) SetInt32Value(v int32) { + size := encoding.SizeOfNVarInt32(v) + codec := encoding.VarCodec{Size: size} + enc.valbuf = make([]byte, size) + err := codec.EncodeNVarInt32(enc.valbuf, v) + if err != nil { + panic(err) + } +} + +// SetUInt32Value encode uint32 value +func (enc *PrimitivePacketEncoder) SetUInt32Value(v uint32) { + size := encoding.SizeOfNVarUInt32(v) + codec := encoding.VarCodec{Size: size} + enc.valbuf = make([]byte, size) + err := codec.EncodeNVarUInt32(enc.valbuf, v) + if err != nil { + panic(err) + } +} + +// SetInt64Value encode int64 value +func (enc *PrimitivePacketEncoder) SetInt64Value(v int64) { + size := encoding.SizeOfNVarInt64(v) + codec := encoding.VarCodec{Size: size} + enc.valbuf = make([]byte, size) + err := codec.EncodeNVarInt64(enc.valbuf, v) + if err != nil { + panic(err) + } +} + +// SetUInt64Value encode uint64 value +func (enc *PrimitivePacketEncoder) SetUInt64Value(v uint64) { + size := encoding.SizeOfNVarUInt64(v) + codec := encoding.VarCodec{Size: size} + enc.valbuf = make([]byte, size) + err := codec.EncodeNVarUInt64(enc.valbuf, v) + if err != nil { + panic(err) + } +} + +// SetFloat32Value encode float32 value +func (enc *PrimitivePacketEncoder) SetFloat32Value(v float32) { + var size = encoding.SizeOfVarFloat32(v) + codec := encoding.VarCodec{Size: size} + enc.valbuf = make([]byte, size) + err := codec.EncodeVarFloat32(enc.valbuf, v) + if err != nil { + panic(err) + } +} + +// SetFloat64Value encode float64 value +func (enc *PrimitivePacketEncoder) SetFloat64Value(v float64) { + var size = encoding.SizeOfVarFloat64(v) + codec := encoding.VarCodec{Size: size} + enc.valbuf = make([]byte, size) + err := codec.EncodeVarFloat64(enc.valbuf, v) + if err != nil { + panic(err) + } +} + +// SetBoolValue encode bool value +func (enc *PrimitivePacketEncoder) SetBoolValue(v bool) { + var size = encoding.SizeOfPVarUInt32(uint32(1)) + codec := encoding.VarCodec{Size: size} + enc.valbuf = make([]byte, size) + err := codec.EncodePVarBool(enc.valbuf, v) + if err != nil { + panic(err) + } +} + +// SetStringValue encode string +func (enc *PrimitivePacketEncoder) SetStringValue(v string) { + enc.valbuf = []byte(v) +} + +// SetBytesValue encode []byte +func (enc *PrimitivePacketEncoder) SetBytesValue(v []byte) { + enc.valbuf = v +} diff --git a/primitive_packet.go b/primitive_packet.go new file mode 100644 index 0000000..f78433d --- /dev/null +++ b/primitive_packet.go @@ -0,0 +1,100 @@ +package mq_coder + +import ( + "git.hpds.cc/Component/mq_coder/encoding" +) + +// 最小长度为2个字节 +const primitivePacketBufferMinimalLength = 2 + +// PrimitivePacket describes primitive value type, +type PrimitivePacket struct { + *packet +} + +// ToInt32 parse raw as int32 value +func (p *PrimitivePacket) ToInt32() (int32, error) { + var val int32 + codec := encoding.VarCodec{Size: len(p.valBuf)} + err := codec.DecodeNVarInt32(p.packet.valBuf, &val) + if err != nil { + return 0, err + } + return val, nil +} + +// ToUInt32 parse raw as uint32 value +func (p *PrimitivePacket) ToUInt32() (uint32, error) { + var val uint32 + codec := encoding.VarCodec{Size: len(p.valBuf)} + err := codec.DecodeNVarUInt32(p.valBuf, &val) + if err != nil { + return 0, err + } + return val, nil +} + +// ToInt64 parse raw as int64 value +func (p *PrimitivePacket) ToInt64() (int64, error) { + var val int64 + codec := encoding.VarCodec{Size: len(p.valBuf)} + err := codec.DecodeNVarInt64(p.valBuf, &val) + if err != nil { + return 0, err + } + return val, nil +} + +// ToUInt64 parse raw as uint64 value +func (p *PrimitivePacket) ToUInt64() (uint64, error) { + var val uint64 + codec := encoding.VarCodec{Size: len(p.valBuf)} + err := codec.DecodeNVarUInt64(p.valBuf, &val) + if err != nil { + return 0, err + } + return val, nil +} + +// ToFloat32 parse raw as float32 value +func (p *PrimitivePacket) ToFloat32() (float32, error) { + var val float32 + codec := encoding.VarCodec{Size: len(p.valBuf)} + err := codec.DecodeVarFloat32(p.valBuf, &val) + if err != nil { + return 0, err + } + return val, nil +} + +// ToFloat64 parse raw as float64 value +func (p *PrimitivePacket) ToFloat64() (float64, error) { + var val float64 + codec := encoding.VarCodec{Size: len(p.valBuf)} + err := codec.DecodeVarFloat64(p.valBuf, &val) + if err != nil { + return 0, err + } + return val, nil +} + +// ToBool parse raw as bool value +func (p *PrimitivePacket) ToBool() (bool, error) { + var val bool + codec := encoding.VarCodec{Size: len(p.valBuf)} + err := codec.DecodePVarBool(p.valBuf, &val) + if err != nil { + return false, err + } + return val, nil +} + +// ToUTF8String parse raw data as string value +func (p *PrimitivePacket) ToUTF8String() (string, error) { + return string(p.valBuf), nil +} + +// ToBytes returns raw buffer data +func (p *PrimitivePacket) ToBytes() []byte { + return p.valBuf +} diff --git a/spec/spec.go b/spec/spec.go deleted file mode 100644 index 350e525..0000000 --- a/spec/spec.go +++ /dev/null @@ -1,26 +0,0 @@ -package spec - -import ( - "errors" - "io" -) - -const ( - maxSeqId = 0x3F - flagBitNode = 0x80 - wipeFlagBits = 0x3F - msb = 0x80 -) - -var ( - errInvalidSeqId = errors.New("coder.Builder: SeqId should >= 0 and =< 0x3F") -) - -func readByte(reader io.Reader) (byte, error) { - var b [1]byte - n, err := reader.Read(b[:]) - if n == 0 { - return 0x00, err - } - return b[0], err -} diff --git a/spec/spec.puml b/spec/spec.puml deleted file mode 100644 index ab6a65c..0000000 --- a/spec/spec.puml +++ /dev/null @@ -1,27 +0,0 @@ -@startuml -namespace spec { - class L << (S,Aquamarine) >> { - - buf []byte - - size int - - len int - - + Bytes() []byte - + Size() int - + VSize() int - - } - class T << (S,Aquamarine) >> { - + Sid() int - + Bytes() []byte - + IsNodeMode() bool - + SetNodeMode(flag bool) - + Size() int - - } - class spec.T << (T, #FF7700) >> { - } -} - - -"__builtin__.byte" #.. "spec.T" -@enduml \ No newline at end of file diff --git a/spec/tlv.t.go b/spec/tlv.t.go deleted file mode 100644 index 985e41f..0000000 --- a/spec/tlv.t.go +++ /dev/null @@ -1,54 +0,0 @@ -package spec - -import ( - "io" -) - -// T is the Tag in a TLV structure -type T byte - -// NewT returns a T with sequenceID. If this packet contains other -// packets, this packet will be a "node packet", the T of this packet -// will set MSB to T. -func NewT(seqId int) (T, error) { - if seqId < 0 || seqId > maxSeqId { - return 0, errInvalidSeqId - } - - return T(seqId), nil -} - -// Sid returns the sequenceId of this packet. -func (t T) Sid() int { - return int(t & wipeFlagBits) -} - -// Bytes returns raw bytes of T. -func (t T) Bytes() []byte { - return []byte{byte(t)} -} - -// IsNodeMode will return true if this packet contains other packets. -// Otherwise, return false. -func (t T) IsNodeMode() bool { - return t&flagBitNode == flagBitNode -} - -// SetNodeMode will set T to indicates this packet contains -// other packets. -func (t *T) SetNodeMode(flag bool) { - if flag { - *t |= flagBitNode - } -} - -// Size return the size of T raw bytes. -func (t T) Size() int { - return 1 -} - -// ReadT read T from a bufio.Reader -func ReadT(rd io.Reader) (T, error) { - b, err := readByte(rd) - return T(b), err -} diff --git a/spec/tvl.l.go b/spec/tvl.l.go deleted file mode 100644 index 9397608..0000000 --- a/spec/tvl.l.go +++ /dev/null @@ -1,89 +0,0 @@ -package spec - -import ( - "bytes" - "errors" - "io" - - "git.hpds.cc/Component/mq_coder/encoding" -) - -// L is the Length in a TLV structure -type L struct { - buf []byte - size int - len int -} - -// NewL will take an int type len as parameter and return L to -// represent the size of V in a TLV. an integer will be encoded as -// a PVarInt32 type to represent the value. -func NewL(len int) (L, error) { - var l = L{} - if len < -1 { - return l, errors.New("y3.L: len can't less than -1") - } - - valLen := int32(len) - l.size = encoding.SizeOfPVarInt32(valLen) - codec := encoding.VarCodec{Size: l.size} - tmp := make([]byte, l.size) - err := codec.EncodePVarInt32(tmp, valLen) - if err != nil { - panic(err) - } - l.buf = make([]byte, l.size) - copy(l.buf, tmp) - l.len = len - return l, nil -} - -// Bytes will return the raw bytes of L. -func (l L) Bytes() []byte { - return l.buf -} - -// Size returns how many bytes used to represent this L. -func (l L) Size() int { - return l.size -} - -// VSize returns the size of V. -func (l L) VSize() int { - return int(l.len) -} - -// ReadL read L from bufio.Reader -func ReadL(r io.Reader) (*L, error) { - lenBuf := bytes.Buffer{} - for { - b, err := readByte(r) - if err != nil { - return nil, err - } - lenBuf.WriteByte(b) - if b&msb != msb { - break - } - } - - buf := lenBuf.Bytes() - - // decode to L - length, err := decodeL(buf) - if err != nil { - return nil, err - } - - return &L{ - buf: buf, - len: int(length), - size: len(buf), - }, nil -} - -func decodeL(buf []byte) (length int32, err error) { - codec := encoding.VarCodec{} - err = codec.DecodePVarInt32(buf, &length) - return length, err -} diff --git a/stream_decoder.go b/stream_decoder.go new file mode 100644 index 0000000..427bc31 --- /dev/null +++ b/stream_decoder.go @@ -0,0 +1,115 @@ +package mq_coder + +import ( + "bytes" + "errors" + "fmt" + "io" + + "git.hpds.cc/Component/mq_coder/encoding" +) + +// StreamReader read an coder packet from io.Reader, and return +// the ValReader after decode out Tag and Len +type StreamReader struct { + src io.Reader + // Tag of a coder packet + Tag byte + // Len of a coder packet + Len int + // Val of a coder packet + Val io.Reader +} + +// NewStreamParser create a new coder StreamReader +func NewStreamParser(reader io.Reader) *StreamReader { + return &StreamReader{ + src: reader, + } +} + +func (sr *StreamReader) GetValBuffer() ([]byte, error) { + buf, err := io.ReadAll(sr.Val) + return buf, err +} + +// Do must run in a goroutine +func (sr *StreamReader) Do() error { + if sr.src == nil { + return errors.New("coder: nil source reader") + } + + tag, err := readByte(sr.src) + if err != nil { + return err + } + + // the first byte is coder.Tag + sr.Tag = tag + + // read coder.Length bytes, a varInt format + lenBuf := bytes.Buffer{} + for { + b, e := readByte(sr.src) + if e != nil { + return err + } + lenBuf.WriteByte(b) + if b&0x80 != 0x80 { + break + } + } + + // parse to coder.Length + var length int32 + codec := encoding.VarCodec{} + err = codec.DecodePVarInt32(lenBuf.Bytes(), &length) + if err != nil { + return err + } + + // validate len decoded from stream + if length < 0 { + return fmt.Errorf("coder: streamParse() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length) + } + + sr.Len = int(length) + + // read next {len} bytes as coder.Value + sr.Val = &valR{ + length: int(length), + src: sr.src, + } + + return nil +} + +type valR struct { + length int + off int + src io.Reader +} + +func (r *valR) Read(p []byte) (n int, err error) { + if r.src == nil { + return 0, nil + } + + if r.off >= r.length { + return 0, io.EOF + } + + bound := len(p) + if len(p) > r.length-r.off { + bound = r.length - r.off + } + // update read + r.off, err = r.src.Read(p[0:bound]) + return r.off, err +} + +func StreamReadPacket(reader io.Reader) (*StreamReader, error) { + sp := NewStreamParser(reader) + err := sp.Do() + return sp, err +} diff --git a/stream_encoder.go b/stream_encoder.go new file mode 100644 index 0000000..138cd90 --- /dev/null +++ b/stream_encoder.go @@ -0,0 +1,138 @@ +package mq_coder + +import ( + "bytes" + "io" + + "git.hpds.cc/Component/mq_coder/encoding" +) + +type StreamEncoder struct { + tag byte + buf *bytes.Buffer + pBuf *bytes.Buffer + len int + sLen int + reader *yR +} + +func NewStreamEncoder(tag byte) *StreamEncoder { + var se = &StreamEncoder{ + tag: tag, + buf: new(bytes.Buffer), + pBuf: new(bytes.Buffer), + } + + return se +} + +func (se *StreamEncoder) AddPacket(packet *PrimitivePacketEncoder) { + node := packet.Encode() + se.AddPacketBuffer(node) +} + +func (se *StreamEncoder) AddPacketBuffer(buf []byte) { + se.pBuf.Write(buf) + se.growLen(len(se.pBuf.Bytes())) +} + +func (se *StreamEncoder) AddStreamPacket(tag byte, length int, reader io.Reader) { + se.sLen = length + // s-Tag + se.pBuf.WriteByte(tag) + se.growLen(1) + // calculate s-Len + size := encoding.SizeOfPVarInt32(int32(length)) + codec := encoding.VarCodec{Size: size} + tmp := make([]byte, size) + err := codec.EncodePVarInt32(tmp, int32(length)) + if err != nil { + panic(err) + } + se.pBuf.Write(tmp) + se.growLen(size) + + // total buf + se.buf.WriteByte(se.tag) + se.growLen(length) + // calculate total Len buf + size = encoding.SizeOfPVarInt32(int32(se.len)) + codec = encoding.VarCodec{Size: size} + tmp = make([]byte, size) + err = codec.EncodePVarInt32(tmp, int32(se.len)) + if err != nil { + panic(err) + } + se.buf.Write(tmp) //lenBuf + se.buf.Write(se.pBuf.Bytes()) + se.growLen(size) // total length + se.growLen(1) // parent tag + + se.reader = &yR{ + buf: se.buf, + src: reader, + length: se.len, + sLen: se.sLen, + } +} + +func (se *StreamEncoder) GetReader() io.Reader { + if se.reader != nil { + return se.reader + } + return new(bytes.Buffer) +} + +// Pipe can pipe data to os.StdOut +func (se *StreamEncoder) Pipe(writer io.Writer) { + +} + +func (se *StreamEncoder) GetLen() int { + if se.reader != nil { + return se.len + } + return 0 +} + +func (se *StreamEncoder) growLen(step int) { + se.len += step +} + +type yR struct { + src io.Reader + buf *bytes.Buffer + length int + off int + sLen int +} + +func (r *yR) Read(p []byte) (n int, err error) { + if r.src == nil { + return 0, nil + } + + if r.off >= r.length { + return 0, io.EOF + } + + if r.off < r.length-r.sLen { + n, err = r.buf.Read(p) + r.off += n + if err != nil { + if err == io.EOF { + return n, nil + } else { + return 0, err + } + } + return n, nil + } else { + n, err = r.src.Read(p) + r.off += n + if err != nil { + return n, err + } + return n, nil + } +} diff --git a/tag.go b/tag.go new file mode 100644 index 0000000..45dee58 --- /dev/null +++ b/tag.go @@ -0,0 +1,38 @@ +package mq_coder + +import ( + "git.hpds.cc/Component/mq_coder/utils" +) + +// Tag represents the Tag of TLV, +// MSB used to represent the packet type, 0x80 means a node packet, otherwise is a primitive packet. +// Low 7 bits represent Sequence ID, like `key` in JSON format +type Tag struct { + raw byte +} + +// IsNode returns true is MSB is 1. +func (t *Tag) IsNode() bool { + return t.raw&utils.MSB == utils.MSB +} + +// IsSlice determine if the current node is a Slice +func (t *Tag) IsSlice() bool { + return t.raw&utils.SliceFlag == utils.SliceFlag +} + +// SeqId get the sequence ID, as key in JSON format +func (t *Tag) SeqId() byte { + //return t.raw & packet utils.DropMSB + return t.raw & utils.DropMSBArrayFlag +} + +// NewTag create a NodePacket Tag field +func NewTag(b byte) *Tag { + return &Tag{raw: b} +} + +// Raw return the original byte +func (t *Tag) Raw() byte { + return t.raw +} diff --git a/utils/common.go b/utils/common.go new file mode 100644 index 0000000..149c839 --- /dev/null +++ b/utils/common.go @@ -0,0 +1,18 @@ +package utils + +// MSB is `1000 0000` describes this is a node packet, otherwise, is a primitive packet +const MSB byte = 0x80 + +// DropMSB is `0111 1111`, used to remove MSB flag bit +const DropMSB byte = 0x3F + +// SliceFlag is `0100 0000`, describes this packet is a Slice type +const SliceFlag byte = 0x40 + +// DropMSBArrayFlag is `0011 1111`, used to remove MSB and Slice flag bit +const DropMSBArrayFlag byte = 0x3F + +// IsNodePacket returns true if the tag represents a node package +func IsNodePacket(tag byte) bool { + return tag&MSB == MSB +}