101 lines
2.0 KiB
Go
101 lines
2.0 KiB
Go
|
package mq_coder
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
|
||
|
"git.hpds.cc/Component/mq_coder/encoding"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ErrMalformed = errors.New("coder.ReadPacket: malformed")
|
||
|
)
|
||
|
|
||
|
// ReadPacket will try to read encoded packet from the reader
|
||
|
func ReadPacket(reader io.Reader) ([]byte, error) {
|
||
|
tag, err := readByte(reader)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
// buf will contain a complete encoded handshakeFrame
|
||
|
buf := bytes.Buffer{}
|
||
|
|
||
|
// the first byte is coder.Tag
|
||
|
// write coder.Tag bytes
|
||
|
buf.WriteByte(tag)
|
||
|
|
||
|
// read y3.Length bytes, a varInt format
|
||
|
lenBuf := bytes.Buffer{}
|
||
|
for {
|
||
|
b, e := readByte(reader)
|
||
|
if e != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
lenBuf.WriteByte(b)
|
||
|
if b&0x80 != 0x80 {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// parse to coder.Length
|
||
|
var length int32
|
||
|
codec := encoding.VarCodec{}
|
||
|
err = codec.DecodePVarInt32(lenBuf.Bytes(), &length)
|
||
|
if err != nil {
|
||
|
return nil, ErrMalformed
|
||
|
}
|
||
|
|
||
|
// validate len decoded from stream
|
||
|
if length < 0 {
|
||
|
return nil, fmt.Errorf("coder.ReadPacket() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length)
|
||
|
}
|
||
|
|
||
|
// write coder.Length bytes
|
||
|
buf.Write(lenBuf.Bytes())
|
||
|
|
||
|
// read next {len} bytes as coder.Value
|
||
|
valBuf := bytes.Buffer{}
|
||
|
|
||
|
// every batch read 512 bytes, if next reads < 512, read
|
||
|
var count int
|
||
|
for {
|
||
|
batchReadSize := 1024 * 1024
|
||
|
var tmpBuf []byte
|
||
|
if int(length)-count < batchReadSize {
|
||
|
tmpBuf = make([]byte, int(length)-count)
|
||
|
} else {
|
||
|
tmpBuf = make([]byte, batchReadSize)
|
||
|
}
|
||
|
p, e := reader.Read(tmpBuf)
|
||
|
count += p
|
||
|
if e != nil {
|
||
|
if e == io.EOF {
|
||
|
valBuf.Write(tmpBuf[:p])
|
||
|
break
|
||
|
}
|
||
|
return nil, fmt.Errorf("y3 parse valBuf error: %v", err)
|
||
|
}
|
||
|
valBuf.Write(tmpBuf[:p])
|
||
|
if count == int(length) {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if count < int(length) {
|
||
|
// return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count)
|
||
|
return nil, ErrMalformed
|
||
|
}
|
||
|
// write coder.Value bytes
|
||
|
buf.Write(valBuf.Bytes())
|
||
|
|
||
|
return buf.Bytes(), nil
|
||
|
}
|
||
|
|
||
|
func readByte(reader io.Reader) (byte, error) {
|
||
|
var b [1]byte
|
||
|
_, err := reader.Read(b[:])
|
||
|
return b[0], err
|
||
|
}
|