package mq_coder import ( "bytes" "errors" "fmt" "io" "git.hpds.cc/Component/mq_coder/encoding" ) var ( ErrMalformed = errors.New("coder.ReadPacket: malformed") ) // ReadPacket will try to read encoded packet from the reader func ReadPacket(reader io.Reader) ([]byte, error) { tag, err := readByte(reader) if err != nil { return nil, err } // buf will contain a complete encoded handshakeFrame buf := bytes.Buffer{} // the first byte is coder.Tag // write coder.Tag bytes buf.WriteByte(tag) // read y3.Length bytes, a varInt format lenBuf := bytes.Buffer{} for { b, e := readByte(reader) if e != nil { return nil, err } lenBuf.WriteByte(b) if b&0x80 != 0x80 { break } } // parse to coder.Length var length int32 codec := encoding.VarCodec{} err = codec.DecodePVarInt32(lenBuf.Bytes(), &length) if err != nil { return nil, ErrMalformed } // validate len decoded from stream if length < 0 { return nil, fmt.Errorf("coder.ReadPacket() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length) } // write coder.Length bytes buf.Write(lenBuf.Bytes()) // read next {len} bytes as coder.Value valBuf := bytes.Buffer{} // every batch read 512 bytes, if next reads < 512, read var count int for { batchReadSize := 1024 * 1024 var tmpBuf []byte if int(length)-count < batchReadSize { tmpBuf = make([]byte, int(length)-count) } else { tmpBuf = make([]byte, batchReadSize) } p, e := reader.Read(tmpBuf) count += p if e != nil { if e == io.EOF { valBuf.Write(tmpBuf[:p]) break } return nil, fmt.Errorf("y3 parse valBuf error: %v", err) } valBuf.Write(tmpBuf[:p]) if count == int(length) { break } } if count < int(length) { // return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count) return nil, ErrMalformed } // write coder.Value bytes buf.Write(valBuf.Bytes()) return buf.Bytes(), nil } func readByte(reader io.Reader) (byte, error) { var b [1]byte _, err := reader.Read(b[:]) return b[0], err }