mq_coder/stream_decoder.go

116 lines
2.0 KiB
Go
Raw Permalink Normal View History

2022-10-10 14:47:49 +08:00
package mq_coder
import (
"bytes"
"errors"
"fmt"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
)
// StreamReader read an coder packet from io.Reader, and return
// the ValReader after decode out Tag and Len
type StreamReader struct {
src io.Reader
// Tag of a coder packet
Tag byte
// Len of a coder packet
Len int
// Val of a coder packet
Val io.Reader
}
// NewStreamParser create a new coder StreamReader
func NewStreamParser(reader io.Reader) *StreamReader {
return &StreamReader{
src: reader,
}
}
func (sr *StreamReader) GetValBuffer() ([]byte, error) {
buf, err := io.ReadAll(sr.Val)
return buf, err
}
// Do must run in a goroutine
func (sr *StreamReader) Do() error {
if sr.src == nil {
return errors.New("coder: nil source reader")
}
tag, err := readByte(sr.src)
if err != nil {
return err
}
// the first byte is coder.Tag
sr.Tag = tag
// read coder.Length bytes, a varInt format
lenBuf := bytes.Buffer{}
for {
b, e := readByte(sr.src)
if e != nil {
return err
}
lenBuf.WriteByte(b)
if b&0x80 != 0x80 {
break
}
}
// parse to coder.Length
var length int32
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
if err != nil {
return err
}
// validate len decoded from stream
if length < 0 {
return fmt.Errorf("coder: streamParse() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
}
sr.Len = int(length)
// read next {len} bytes as coder.Value
sr.Val = &valR{
length: int(length),
src: sr.src,
}
return nil
}
type valR struct {
length int
off int
src io.Reader
}
func (r *valR) Read(p []byte) (n int, err error) {
if r.src == nil {
return 0, nil
}
if r.off >= r.length {
return 0, io.EOF
}
bound := len(p)
if len(p) > r.length-r.off {
bound = r.length - r.off
}
// update read
r.off, err = r.src.Read(p[0:bound])
return r.off, err
}
func StreamReadPacket(reader io.Reader) (*StreamReader, error) {
sp := NewStreamParser(reader)
err := sp.Do()
return sp, err
}