mq_coder/parser.go

101 lines
2.0 KiB
Go
Raw Permalink Normal View History

2022-10-10 14:47:49 +08:00
package mq_coder
import (
"bytes"
"errors"
"fmt"
"io"
"git.hpds.cc/Component/mq_coder/encoding"
)
var (
ErrMalformed = errors.New("coder.ReadPacket: malformed")
)
// ReadPacket will try to read encoded packet from the reader
func ReadPacket(reader io.Reader) ([]byte, error) {
tag, err := readByte(reader)
if err != nil {
return nil, err
}
// buf will contain a complete encoded handshakeFrame
buf := bytes.Buffer{}
// the first byte is coder.Tag
// write coder.Tag bytes
buf.WriteByte(tag)
// read y3.Length bytes, a varInt format
lenBuf := bytes.Buffer{}
for {
b, e := readByte(reader)
if e != nil {
return nil, err
}
lenBuf.WriteByte(b)
if b&0x80 != 0x80 {
break
}
}
// parse to coder.Length
var length int32
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
if err != nil {
return nil, ErrMalformed
}
// validate len decoded from stream
if length < 0 {
return nil, fmt.Errorf("coder.ReadPacket() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
}
// write coder.Length bytes
buf.Write(lenBuf.Bytes())
// read next {len} bytes as coder.Value
valBuf := bytes.Buffer{}
// every batch read 512 bytes, if next reads < 512, read
var count int
for {
batchReadSize := 1024 * 1024
var tmpBuf []byte
if int(length)-count < batchReadSize {
tmpBuf = make([]byte, int(length)-count)
} else {
tmpBuf = make([]byte, batchReadSize)
}
p, e := reader.Read(tmpBuf)
count += p
if e != nil {
if e == io.EOF {
valBuf.Write(tmpBuf[:p])
break
}
return nil, fmt.Errorf("y3 parse valBuf error: %v", err)
}
valBuf.Write(tmpBuf[:p])
if count == int(length) {
break
}
}
if count < int(length) {
// return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count)
return nil, ErrMalformed
}
// write coder.Value bytes
buf.Write(valBuf.Bytes())
return buf.Bytes(), nil
}
func readByte(reader io.Reader) (byte, error) {
var b [1]byte
_, err := reader.Read(b[:])
return b[0], err
}