mq_coder/decoder.go

94 lines
1.7 KiB
Go
Raw Permalink Normal View History

2022-10-07 15:30:45 +08:00
package mq_coder
import (
"bytes"
"io"
"git.hpds.cc/Component/mq_coder/spec"
)
// Decoder is the tool for decoding packet from stream
type Decoder struct {
tag spec.T
len *spec.L
rd io.Reader
}
// NewDecoder returns a Decoder from an io.Reader
func NewDecoder(reader io.Reader) *Decoder {
return &Decoder{
rd: reader,
}
}
// SeqID return the SequenceID of the decoding packet
func (d *Decoder) SeqID() int {
return d.tag.Sid()
}
// UnderlyingReader returns the reader this decoder using
func (d *Decoder) UnderlyingReader() io.Reader {
return d.rd
}
// ReadHeader will block until io.EOF or receive T and L of a packet.
func (d *Decoder) ReadHeader() error {
// only read T and L
return d.readTL()
}
// GetChunkedPacket will block until io.EOF or receive V of a packet in chunked mode.
func (d *Decoder) GetChunkedPacket() Packet {
return &StreamPacket{
t: d.tag,
l: *d.len,
vr: d.rd,
chunkMode: true,
chunkSize: d.len.VSize(),
}
}
// GetFullFilledPacket read full Packet from given io.Reader
func (d *Decoder) GetFullFilledPacket() (packet Packet, err error) {
// read V
buf := new(bytes.Buffer)
total := 0
for {
valBuf := make([]byte, d.len.VSize())
n, err := d.rd.Read(valBuf)
if n > 0 {
total += n
buf.Write(valBuf[:n])
}
if total >= d.len.VSize() || err != nil {
break
}
}
packet = &StreamPacket{
t: d.tag,
l: *d.len,
vbuf: buf.Bytes(),
chunkMode: false,
}
return packet, nil
}
func (d *Decoder) readTL() (err error) {
if d.rd == nil {
return errNilReader
}
// read T
d.tag, err = spec.ReadT(d.rd)
if err != nil {
return err
}
// read L
d.len, err = spec.ReadL(d.rd)
return err
}