package mq_coder import ( "bytes" "errors" "fmt" "io" "git.hpds.cc/Component/mq_coder/encoding" ) // StreamReader read an coder packet from io.Reader, and return // the ValReader after decode out Tag and Len type StreamReader struct { src io.Reader // Tag of a coder packet Tag byte // Len of a coder packet Len int // Val of a coder packet Val io.Reader } // NewStreamParser create a new coder StreamReader func NewStreamParser(reader io.Reader) *StreamReader { return &StreamReader{ src: reader, } } func (sr *StreamReader) GetValBuffer() ([]byte, error) { buf, err := io.ReadAll(sr.Val) return buf, err } // Do must run in a goroutine func (sr *StreamReader) Do() error { if sr.src == nil { return errors.New("coder: nil source reader") } tag, err := readByte(sr.src) if err != nil { return err } // the first byte is coder.Tag sr.Tag = tag // read coder.Length bytes, a varInt format lenBuf := bytes.Buffer{} for { b, e := readByte(sr.src) if e != nil { return err } lenBuf.WriteByte(b) if b&0x80 != 0x80 { break } } // parse to coder.Length var length int32 codec := encoding.VarCodec{} err = codec.DecodePVarInt32(lenBuf.Bytes(), &length) if err != nil { return err } // validate len decoded from stream if length < 0 { return fmt.Errorf("coder: streamParse() get lenBuf=(%# x), decode len=(%v)", lenBuf.Bytes(), length) } sr.Len = int(length) // read next {len} bytes as coder.Value sr.Val = &valR{ length: int(length), src: sr.src, } return nil } type valR struct { length int off int src io.Reader } func (r *valR) Read(p []byte) (n int, err error) { if r.src == nil { return 0, nil } if r.off >= r.length { return 0, io.EOF } bound := len(p) if len(p) > r.length-r.off { bound = r.length - r.off } // update read r.off, err = r.src.Read(p[0:bound]) return r.off, err } func StreamReadPacket(reader io.Reader) (*StreamReader, error) { sp := NewStreamParser(reader) err := sp.Do() return sp, err }