157 lines
3.7 KiB
Go
157 lines
3.7 KiB
Go
|
package mq_coder
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"io"
|
||
|
|
||
|
"git.hpds.cc/Component/mq_coder/encoding"
|
||
|
"git.hpds.cc/Component/mq_coder/spec"
|
||
|
)
|
||
|
|
||
|
// StreamPacket implement the Packet interface.
|
||
|
type StreamPacket struct {
|
||
|
t spec.T
|
||
|
l spec.L
|
||
|
vbuf []byte
|
||
|
vr io.Reader
|
||
|
chunkMode bool
|
||
|
chunkSize int
|
||
|
}
|
||
|
|
||
|
var _ Packet = &StreamPacket{}
|
||
|
|
||
|
// SeqId returns the sequenceId of this packet
|
||
|
func (p *StreamPacket) SeqId() int { return p.t.Sid() }
|
||
|
|
||
|
// Size returns the size of whole packet.
|
||
|
func (p *StreamPacket) Size() int {
|
||
|
// T.Size + L.Size + V.Size
|
||
|
return p.t.Size() + p.l.Size() + p.l.VSize()
|
||
|
}
|
||
|
|
||
|
// VSize returns the size of V.
|
||
|
func (p *StreamPacket) VSize() int { return p.l.VSize() }
|
||
|
|
||
|
// Bytes return the raw bytes of this packet. V will be absent if
|
||
|
// is in chunked mode
|
||
|
func (p *StreamPacket) Bytes() []byte {
|
||
|
buf := new(bytes.Buffer)
|
||
|
// the raw bytes of T and L
|
||
|
p.writeTL(buf)
|
||
|
// p.valbuf stores the raw bytes of V
|
||
|
buf.Write(p.vbuf)
|
||
|
|
||
|
return buf.Bytes()
|
||
|
}
|
||
|
|
||
|
// VReader return an io.Reader which can be read as the content of V.
|
||
|
func (p *StreamPacket) VReader() io.Reader {
|
||
|
if !p.chunkMode {
|
||
|
return bytes.NewReader(p.vbuf)
|
||
|
}
|
||
|
return p.vr
|
||
|
}
|
||
|
|
||
|
// Reader return an io.Reader which can be read as the whole bytes of
|
||
|
// this packet. This function only available if this V of packet is in
|
||
|
// chunked mode.
|
||
|
func (p *StreamPacket) Reader() io.Reader {
|
||
|
if !p.chunkMode {
|
||
|
buf := new(bytes.Buffer)
|
||
|
buf.Write(p.t.Bytes())
|
||
|
buf.Write(p.l.Bytes())
|
||
|
buf.Write(p.vbuf)
|
||
|
return buf
|
||
|
}
|
||
|
|
||
|
buf := new(bytes.Buffer)
|
||
|
// T and L of this packet
|
||
|
p.writeTL(buf)
|
||
|
// V of this packet
|
||
|
buf.Write(p.vbuf)
|
||
|
|
||
|
return &chunkVReader{
|
||
|
buf: buf,
|
||
|
src: p.vr,
|
||
|
totalSize: p.Size(),
|
||
|
ChunkVSize: p.VSize(),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// IsStreamMode returns a bool value indicates if the V of
|
||
|
// this packet is in stream mode
|
||
|
func (p *StreamPacket) IsStreamMode() bool {
|
||
|
return p.chunkMode
|
||
|
}
|
||
|
|
||
|
// IsNodeMode returns a bool value indicates if this packet
|
||
|
// is node mode
|
||
|
func (p *StreamPacket) IsNodeMode() bool {
|
||
|
return p.t.IsNodeMode()
|
||
|
}
|
||
|
|
||
|
// write the raw bytes of T and L to given buf
|
||
|
func (p *StreamPacket) writeTL(buf *bytes.Buffer) {
|
||
|
buf.Write(p.t.Bytes())
|
||
|
buf.Write(p.l.Bytes())
|
||
|
}
|
||
|
|
||
|
// BytesV return V as bytes
|
||
|
func (p *StreamPacket) BytesV() []byte {
|
||
|
return p.vbuf
|
||
|
}
|
||
|
|
||
|
// UTF8StringV return V as utf-8 string
|
||
|
func (p *StreamPacket) UTF8StringV() string {
|
||
|
return string(p.vbuf)
|
||
|
}
|
||
|
|
||
|
// Int32V return V as int32
|
||
|
func (p *StreamPacket) Int32V() (val int32, err error) {
|
||
|
codec := encoding.VarCodec{Size: len(p.vbuf)}
|
||
|
err = codec.DecodeNVarInt32(p.vbuf, &val)
|
||
|
return val, err
|
||
|
}
|
||
|
|
||
|
// UInt32V return V as uint32
|
||
|
func (p *StreamPacket) UInt32V() (val uint32, err error) {
|
||
|
codec := encoding.VarCodec{Size: len(p.vbuf)}
|
||
|
err = codec.DecodeNVarUInt32(p.vbuf, &val)
|
||
|
return val, err
|
||
|
}
|
||
|
|
||
|
// Int64V return V as int64
|
||
|
func (p *StreamPacket) Int64V() (val int64, err error) {
|
||
|
codec := encoding.VarCodec{Size: len(p.vbuf)}
|
||
|
err = codec.DecodeNVarInt64(p.vbuf, &val)
|
||
|
return val, err
|
||
|
}
|
||
|
|
||
|
// UInt64V return V as uint64
|
||
|
func (p *StreamPacket) UInt64V() (val uint64, err error) {
|
||
|
codec := encoding.VarCodec{Size: len(p.vbuf)}
|
||
|
err = codec.DecodeNVarUInt64(p.vbuf, &val)
|
||
|
return val, err
|
||
|
}
|
||
|
|
||
|
// Float32V return V as float32
|
||
|
func (p *StreamPacket) Float32V() (val float32, err error) {
|
||
|
codec := encoding.VarCodec{Size: len(p.vbuf)}
|
||
|
err = codec.DecodeVarFloat32(p.vbuf, &val)
|
||
|
return val, err
|
||
|
}
|
||
|
|
||
|
// Float64V return V as float64
|
||
|
func (p *StreamPacket) Float64V() (val float64, err error) {
|
||
|
codec := encoding.VarCodec{Size: len(p.vbuf)}
|
||
|
err = codec.DecodeVarFloat64(p.vbuf, &val)
|
||
|
return val, err
|
||
|
}
|
||
|
|
||
|
// BoolV return V as bool
|
||
|
func (p *StreamPacket) BoolV() (val bool, err error) {
|
||
|
codec := encoding.VarCodec{Size: len(p.vbuf)}
|
||
|
err = codec.DecodePVarBool(p.vbuf, &val)
|
||
|
return val, err
|
||
|
}
|