1、修改bug,如果连接报错,就会拒绝服务的问题

This commit is contained in:
wangjian 2023-04-05 17:25:22 +08:00
parent 3aac3a04ba
commit 44824a7e9f
8 changed files with 90 additions and 106 deletions

47
ap.go
View File

@ -4,6 +4,7 @@ import (
"context" "context"
"git.hpds.cc/Component/network" "git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame" "git.hpds.cc/Component/network/frame"
"git.hpds.cc/Component/network/log"
) )
const ( const (
@ -17,15 +18,17 @@ type AccessPoint interface {
// Connect to HPDS-Mq. // Connect to HPDS-Mq.
Connect() error Connect() error
// SetDataTag will set the tag of data when invoking Write(). // SetDataTag will set the tag of data when invoking Write().
SetDataTag(tag uint8) SetDataTag(tag frame.Tag)
// Write the data to downstream. // Write the data to downstream.
Write(p []byte) (n int, err error) Write(p []byte) (n int, err error)
// WriteWithTag will write data with specified tag, default transactionId is epoch time. // WriteWithTag will write data with specified tag, default transactionId is epoch time.
WriteWithTag(tag uint8, data []byte) error WriteWithTag(tag frame.Tag, data []byte) error
// SetErrorHandler set the error handler function when server error occurs // SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error)) SetErrorHandler(fn func(err error))
// SetReceiveHandler [Experimental] set to observe handler function // SetReceiveHandler [Experimental] set to observe handler function
SetReceiveHandler(fn func(tag byte, data []byte)) SetReceiveHandler(fn func(tag frame.Tag, data []byte))
// Broadcast the data to all downstream
Broadcast(data []byte) error
} }
// hpds-AccessPoint // hpds-AccessPoint
@ -33,8 +36,8 @@ type accessPoint struct {
name string name string
mqEndpoint string mqEndpoint string
client *network.Client client *network.Client
tag uint8 tag frame.Tag
fn func(byte, []byte) fn func(frame.Tag, []byte)
} }
var _ AccessPoint = &accessPoint{} var _ AccessPoint = &accessPoint{}
@ -42,7 +45,7 @@ var _ AccessPoint = &accessPoint{}
// NewAccessPoint create a hpds-AccessPoint // NewAccessPoint create a hpds-AccessPoint
func NewAccessPoint(name string, opts ...Option) AccessPoint { func NewAccessPoint(name string, opts ...Option) AccessPoint {
options := NewOptions(opts...) options := NewOptions(opts...)
client := network.NewClient(name, network.ClientTypeProtocolGateway, options.ClientOptions...) client := network.NewClient(name, network.ClientTypeSource, options.ClientOptions...)
return &accessPoint{ return &accessPoint{
name: name, name: name,
@ -53,21 +56,25 @@ func NewAccessPoint(name string, opts ...Option) AccessPoint {
// Write the data to downstream. // Write the data to downstream.
func (s *accessPoint) Write(data []byte) (int, error) { func (s *accessPoint) Write(data []byte) (int, error) {
return len(data), s.WriteWithTag(s.tag, data) err := s.WriteWithTag(s.tag, data)
if err != nil {
return 0, err
}
return len(data), nil
} }
// SetDataTag will set the tag of data when invoking Write(). // SetDataTag will set the tag of data when invoking Write().
func (s *accessPoint) SetDataTag(tag uint8) { func (s *accessPoint) SetDataTag(tag frame.Tag) {
s.tag = tag s.tag = tag
} }
// Close will close the connection to MessageQueue. // Close will close the connection to MessageQueue.
func (s *accessPoint) Close() error { func (s *accessPoint) Close() error {
if err := s.client.Close(); err != nil { if err := s.client.Close(); err != nil {
s.client.Logger().Errorf("%sClose(): %v", apLogPrefix, err) log.Errorf("%sClose(): %v", apLogPrefix, err)
return err return err
} }
s.client.Logger().Debugf("%s is closed", apLogPrefix) log.Debugf("%s is closed", apLogPrefix)
return nil return nil
} }
@ -82,17 +89,17 @@ func (s *accessPoint) Connect() error {
err := s.client.Connect(context.Background(), s.mqEndpoint) err := s.client.Connect(context.Background(), s.mqEndpoint)
if err != nil { if err != nil {
s.client.Logger().Errorf("%sConnect() error: %s", apLogPrefix, err) log.Errorf("%sConnect() error: %s", apLogPrefix, err)
} }
return err return err
} }
// WriteWithTag will write data with specified tag, default transactionID is epoch time. // WriteWithTag will write data with specified tag, default transactionID is epoch time.
func (s *accessPoint) WriteWithTag(tag uint8, data []byte) error { func (s *accessPoint) WriteWithTag(tag frame.Tag, data []byte) error {
f := frame.NewDataFrame() f := frame.NewDataFrame()
f.SetCarriage(tag, data) f.SetCarriage(tag, data)
f.SetSourceId(s.client.ClientId()) f.SetSourceId(s.client.ClientId())
s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x", log.Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x",
apLogPrefix, f.TransactionId(), f.SourceId(), len(data), frame.Shortly(data)) apLogPrefix, f.TransactionId(), f.SourceId(), len(data), frame.Shortly(data))
return s.client.WriteFrame(f) return s.client.WriteFrame(f)
} }
@ -103,7 +110,17 @@ func (s *accessPoint) SetErrorHandler(fn func(err error)) {
} }
// SetReceiveHandler [Experimental] set to observe handler function // SetReceiveHandler [Experimental] set to observe handler function
func (s *accessPoint) SetReceiveHandler(fn func(byte, []byte)) { func (s *accessPoint) SetReceiveHandler(fn func(frame.Tag, []byte)) {
s.fn = fn s.fn = fn
s.client.Logger().Debugf("%sSetReceiveHandler(%v)", apLogPrefix, s.fn) log.Debugf("%sSetReceiveHandler(%v)", apLogPrefix, s.fn)
}
// Broadcast Write the data to all downstream
func (s *accessPoint) Broadcast(data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(s.tag, data)
f.SetSourceId(s.client.ClientId())
f.SetBroadcast(true)
log.Debugf("Broadcast", "data_frame", f.String())
return s.client.WriteFrame(f)
} }

View File

@ -7,7 +7,9 @@ import (
func TestAccessPointSendDataToServer(t *testing.T) { func TestAccessPointSendDataToServer(t *testing.T) {
ap := NewAccessPoint("test-ap") ap := NewAccessPoint("test-ap")
defer ap.Close() defer func() {
_ = ap.Close()
}()
// connect to server // connect to server
err := ap.Connect() err := ap.Connect()

27
go.mod
View File

@ -3,9 +3,8 @@ module git.hpds.cc/pavement/hpds_node
go 1.19 go 1.19
require ( require (
git.hpds.cc/Component/network v0.0.0-20230405031738-6ce4bf7726d3 git.hpds.cc/Component/network v0.0.0-20230405091029-b109c53056fb
github.com/disintegration/imaging v1.6.2 github.com/disintegration/imaging v1.6.2
github.com/lucas-clemente/quic-go v0.29.1
github.com/stretchr/testify v1.8.0 github.com/stretchr/testify v1.8.0
github.com/u2takey/ffmpeg-go v0.4.1 github.com/u2takey/ffmpeg-go v0.4.1
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
@ -15,30 +14,28 @@ require (
git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect
github.com/aws/aws-sdk-go v1.38.20 // indirect github.com/aws/aws-sdk-go v1.38.20 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/golang/mock v1.6.0 // indirect github.com/golang/mock v1.6.0 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect
github.com/matoous/go-nanoid/v2 v2.0.0 // indirect github.com/matoous/go-nanoid/v2 v2.0.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect github.com/onsi/ginkgo/v2 v2.2.0 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/quic-go/qtls-go1-19 v0.2.1 // indirect
github.com/quic-go/qtls-go1-20 v0.1.1 // indirect
github.com/quic-go/quic-go v0.33.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/u2takey/go-utils v0.3.1 // indirect github.com/u2takey/go-utils v0.3.1 // indirect
go.uber.org/atomic v1.7.0 // indirect go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.23.0 // indirect go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/mod v0.6.0 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect golang.org/x/net v0.4.0 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/sys v0.3.0 // indirect
golang.org/x/tools v0.1.10 // indirect golang.org/x/tools v0.2.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
) )

View File

@ -1,30 +0,0 @@
package hpds_node
import (
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame"
)
type metadata struct{}
func (m *metadata) Encode() []byte {
return nil
}
type metadataBuilder struct {
m *metadata
}
func newMetadataBuilder() network.MetadataBuilder {
return &metadataBuilder{
m: &metadata{},
}
}
func (builder *metadataBuilder) Build(f *frame.HandshakeFrame) (network.Metadata, error) {
return builder.m, nil
}
func (builder *metadataBuilder) Decode(buf []byte) (network.Metadata, error) {
return builder.m, nil
}

32
mq.go
View File

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"git.hpds.cc/Component/network/metadata"
"net/http" "net/http"
"git.hpds.cc/Component/network" "git.hpds.cc/Component/network"
@ -68,10 +69,10 @@ var _ MessageQueue = &messageQueue{}
// NewMqWithOptions create a messageQueue instance. // NewMqWithOptions create a messageQueue instance.
func NewMqWithOptions(name string, opts ...Option) MessageQueue { func NewMqWithOptions(name string, opts ...Option) MessageQueue {
options := NewOptions(opts...) options := NewOptions(opts...)
zipper := createMessageQueueServer(name, options, nil) emitter := createMessageQueueServer(name, options, nil)
_ = zipper.ConfigMesh(options.MeshConfigURL) _ = emitter.ConfigMesh(options.MeshConfigURL)
return zipper return emitter
} }
// NewMq create a messageQueue instance from config files. // NewMq create a messageQueue instance from config files.
@ -86,17 +87,17 @@ func NewMq(conf string) (MessageQueue, error) {
options := NewOptions() options := NewOptions()
options.MqAddr = listenAddr options.MqAddr = listenAddr
zipper := createMessageQueueServer(confWf.Name, options, confWf) emitter := createMessageQueueServer(confWf.Name, options, confWf)
// messageQueue workflow // messageQueue workflow
err = zipper.configWorkflow(confWf) err = emitter.configWorkflow(confWf)
return zipper, err return emitter, err
} }
// NewDownstreamMq create a messageQueue descriptor for downstream messageQueue. // NewDownstreamMq create a messageQueue descriptor for downstream messageQueue.
func NewDownstreamMq(name string, opts ...Option) MessageQueue { func NewDownstreamMq(name string, opts ...Option) MessageQueue {
options := NewOptions(opts...) options := NewOptions(opts...)
client := network.NewClient(name, network.ClientTypeMessageQueue, options.ClientOptions...) client := network.NewClient(name, network.ClientTypeUpstreamEmitter, options.ClientOptions...)
return &messageQueue{ return &messageQueue{
name: name, name: name,
@ -120,6 +121,9 @@ func createMessageQueueServer(name string, options *Options, cfg *config.Workflo
z.init() z.init()
return z return z
} }
func (z *messageQueue) Logger() log.Logger {
return log.Default()
}
// ConfigWorkflow will read workflows from config files and register them to messageQueue. // ConfigWorkflow will read workflows from config files and register them to messageQueue.
func (z *messageQueue) ConfigWorkflow(conf string) error { func (z *messageQueue) ConfigWorkflow(conf string) error {
@ -134,7 +138,7 @@ func (z *messageQueue) ConfigWorkflow(conf string) error {
func (z *messageQueue) configWorkflow(config *config.WorkflowConfig) error { func (z *messageQueue) configWorkflow(config *config.WorkflowConfig) error {
z.wfc = config z.wfc = config
z.server.ConfigMetadataBuilder(newMetadataBuilder()) z.server.ConfigMetadataBuilder(metadata.DefaultBuilder())
z.server.ConfigRouter(newRouter(config.Functions)) z.server.ConfigRouter(newRouter(config.Functions))
return nil return nil
} }
@ -188,11 +192,11 @@ func (z *messageQueue) ListenAndServe() error {
log.Debugf("%sCreating MessageQueue Server ...", mqLogPrefix) log.Debugf("%sCreating MessageQueue Server ...", mqLogPrefix)
// check downstream zippers // check downstream zippers
for _, ds := range z.downstreamMqs { for _, ds := range z.downstreamMqs {
if dsZipper, ok := ds.(*messageQueue); ok { if dsEmitter, ok := ds.(*messageQueue); ok {
go func(dsZipper *messageQueue) { go func(dsEmitter *messageQueue) {
_ = dsZipper.client.Connect(context.Background(), dsZipper.addr) _ = dsEmitter.client.Connect(context.Background(), dsEmitter.addr)
z.server.AddDownstreamServer(dsZipper.addr, dsZipper.client) z.server.AddDownstreamServer(dsEmitter.addr, dsEmitter.client)
}(dsZipper) }(dsEmitter)
} }
} }
return z.server.ListenAndServe(context.Background(), z.addr) return z.server.ListenAndServe(context.Background(), z.addr)
@ -253,7 +257,7 @@ func (z *messageQueue) Stats() int {
log.Printf("[%s] all downstream mq connected: %d", z.name, len(z.server.DownStreams())) log.Printf("[%s] all downstream mq connected: %d", z.name, len(z.server.DownStreams()))
for k, v := range z.server.DownStreams() { for k, v := range z.server.DownStreams() {
log.Printf("[%s] |> [%s] %s", z.name, k, v.ServerAddr()) log.Printf("[%s] |> [%s] %v", z.name, k, v)
} }
log.Printf("[%s] total DataFrames received: %d", z.name, z.server.StatsCounter()) log.Printf("[%s] total DataFrames received: %d", z.name, z.server.StatsCounter())

View File

@ -26,6 +26,7 @@ func (z *messageQueue) init() {
log.Printf("Received signal: %s", p1) log.Printf("Received signal: %s", p1)
if p1 == syscall.SIGTERM || p1 == syscall.SIGINT { if p1 == syscall.SIGTERM || p1 == syscall.SIGINT {
log.Printf("graceful shutting down ... %s", p1) log.Printf("graceful shutting down ... %s", p1)
_ = z.Close()
os.Exit(0) os.Exit(0)
} else if p1 == syscall.SIGUSR2 { } else if p1 == syscall.SIGUSR2 {
var m runtime.MemStats var m runtime.MemStats

View File

@ -2,10 +2,10 @@ package hpds_node
import ( import (
"crypto/tls" "crypto/tls"
"github.com/lucas-clemente/quic-go"
"git.hpds.cc/Component/network" "git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame"
"git.hpds.cc/Component/network/log" "git.hpds.cc/Component/network/log"
"github.com/quic-go/quic-go"
) )
const ( const (
@ -101,7 +101,7 @@ func WithCredential(payload string) Option {
} }
// WithObserveDataTags sets client data tag list. // WithObserveDataTags sets client data tag list.
func WithObserveDataTags(tags ...byte) Option { func WithObserveDataTags(tags ...frame.Tag) Option {
return func(o *Options) { return func(o *Options) {
o.ClientOptions = append( o.ClientOptions = append(
o.ClientOptions, o.ClientOptions,
@ -110,16 +110,6 @@ func WithObserveDataTags(tags ...byte) Option {
} }
} }
// WithLogger sets the client logger
func WithLogger(logger log.Logger) Option {
return func(o *Options) {
o.ClientOptions = append(
o.ClientOptions,
network.WithLogger(logger),
)
}
}
// NewOptions creates a new options for YoMo-Client. // NewOptions creates a new options for YoMo-Client.
func NewOptions(opts ...Option) *Options { func NewOptions(opts ...Option) *Options {
options := &Options{} options := &Options{}

View File

@ -14,7 +14,7 @@ const (
type StreamFunction interface { type StreamFunction interface {
// SetObserveDataTags set the data tag list that will be observed // SetObserveDataTags set the data tag list that will be observed
// Deprecated: use hpds.WithObserveDataTags instead // Deprecated: use hpds.WithObserveDataTags instead
SetObserveDataTags(tag ...byte) SetObserveDataTags(tag ...frame.Tag)
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response // SetHandler set the handler function, which accept the raw bytes data and return the tag & response
SetHandler(fn network.AsyncHandler) error SetHandler(fn network.AsyncHandler) error
// SetErrorHandler set the error handler function when server error occurs // SetErrorHandler set the error handler function when server error occurs
@ -26,7 +26,7 @@ type StreamFunction interface {
// Close will close the connection // Close will close the connection
Close() error Close() error
// Write Send a data to mq. // Write Send a data to mq.
Write(tag byte, carriage []byte) error Write(tag frame.Tag, carriage []byte) error
} }
// NewStreamFunction create a stream function. // NewStreamFunction create a stream function.
@ -35,9 +35,9 @@ func NewStreamFunction(name string, opts ...Option) StreamFunction {
client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...) client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...)
sf := &streamFunction{ sf := &streamFunction{
name: name, name: name,
zipperEndpoint: options.MqAddr, emitterEndpoint: options.MqAddr,
client: client, client: client,
observeDataTags: make([]byte, 0), observeDataTags: make([]frame.Tag, 0),
} }
return sf return sf
@ -48,9 +48,9 @@ var _ StreamFunction = &streamFunction{}
// streamFunction implements StreamFunction interface. // streamFunction implements StreamFunction interface.
type streamFunction struct { type streamFunction struct {
name string name string
zipperEndpoint string emitterEndpoint string
client *network.Client client *network.Client
observeDataTags []byte // tag list that will be observed observeDataTags []frame.Tag // tag list that will be observed
fn network.AsyncHandler // user's function which will be invoked when data arrived fn network.AsyncHandler // user's function which will be invoked when data arrived
pfn network.PipeHandler pfn network.PipeHandler
pIn chan []byte pIn chan []byte
@ -59,8 +59,11 @@ type streamFunction struct {
// SetObserveDataTags set the data tag list that will be observed. // SetObserveDataTags set the data tag list that will be observed.
// Deprecated: use hpds.WithObserveDataTags instead // Deprecated: use hpds.WithObserveDataTags instead
func (s *streamFunction) SetObserveDataTags(tag ...byte) { func (s *streamFunction) SetObserveDataTags(tag ...frame.Tag) {
s.client.SetObserveDataTags(tag...) s.client.SetObserveDataTags(tag...)
//co := network.WithObserveDataTags(tag...)
//s.client.
//s.client.SetObserveDataTags(tag...)
s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags) s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags)
} }
@ -102,16 +105,16 @@ func (s *streamFunction) Connect() error {
data := <-s.pOut data := <-s.pOut
if data != nil { if data != nil {
s.client.Logger().Debugf("%spipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage) s.client.Logger().Debugf("%spipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage)
frame := frame.NewDataFrame() frm := frame.NewDataFrame()
// todo: frame.SetTransactionId // todo: frm.SetTransactionId
frame.SetCarriage(data.Tag, data.Carriage) frm.SetCarriage(data.Tag, data.Carriage)
s.client.WriteFrame(frame) _ = s.client.WriteFrame(frm)
} }
} }
}() }()
} }
err := s.client.Connect(context.Background(), s.zipperEndpoint) err := s.client.Connect(context.Background(), s.emitterEndpoint)
if err != nil { if err != nil {
s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err) s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err)
} }
@ -153,13 +156,13 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
s.client.Logger().Debugf("%sstart WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) s.client.Logger().Debugf("%sstart WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp))
// build a DataFrame // build a DataFrame
// TODO: seems we should implement a DeepCopy() of MetaFrame in the future // TODO: seems we should implement a DeepCopy() of MetaFrame in the future
frame := frame.NewDataFrame() frm := frame.NewDataFrame()
// reuse transactionId // reuse transactionId
frame.SetTransactionId(metaFrame.TransactionId()) frm.SetTransactionId(metaFrame.TransactionId())
// reuse sourceId // reuse sourceId
frame.SetSourceId(metaFrame.SourceId()) frm.SetSourceId(metaFrame.SourceId())
frame.SetCarriage(tag, resp) frm.SetCarriage(tag, resp)
s.client.WriteFrame(frame) _ = s.client.WriteFrame(frm)
} }
}() }()
} else if s.pfn != nil { } else if s.pfn != nil {
@ -171,7 +174,7 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
} }
// Send a DataFrame to mq. // Send a DataFrame to mq.
func (s *streamFunction) Write(tag byte, carriage []byte) error { func (s *streamFunction) Write(tag frame.Tag, carriage []byte) error {
fm := frame.NewDataFrame() fm := frame.NewDataFrame()
fm.SetCarriage(tag, carriage) fm.SetCarriage(tag, carriage)
return s.client.WriteFrame(fm) return s.client.WriteFrame(fm)