diff --git a/ap.go b/ap.go index 5e15670..745f87f 100644 --- a/ap.go +++ b/ap.go @@ -4,6 +4,7 @@ import ( "context" "git.hpds.cc/Component/network" "git.hpds.cc/Component/network/frame" + "git.hpds.cc/Component/network/log" ) const ( @@ -17,15 +18,17 @@ type AccessPoint interface { // Connect to HPDS-Mq. Connect() error // SetDataTag will set the tag of data when invoking Write(). - SetDataTag(tag uint8) + SetDataTag(tag frame.Tag) // Write the data to downstream. Write(p []byte) (n int, err error) // WriteWithTag will write data with specified tag, default transactionId is epoch time. - WriteWithTag(tag uint8, data []byte) error + WriteWithTag(tag frame.Tag, data []byte) error // SetErrorHandler set the error handler function when server error occurs SetErrorHandler(fn func(err error)) // SetReceiveHandler [Experimental] set to observe handler function - SetReceiveHandler(fn func(tag byte, data []byte)) + SetReceiveHandler(fn func(tag frame.Tag, data []byte)) + // Broadcast the data to all downstream + Broadcast(data []byte) error } // hpds-AccessPoint @@ -33,8 +36,8 @@ type accessPoint struct { name string mqEndpoint string client *network.Client - tag uint8 - fn func(byte, []byte) + tag frame.Tag + fn func(frame.Tag, []byte) } var _ AccessPoint = &accessPoint{} @@ -42,7 +45,7 @@ var _ AccessPoint = &accessPoint{} // NewAccessPoint create a hpds-AccessPoint func NewAccessPoint(name string, opts ...Option) AccessPoint { options := NewOptions(opts...) - client := network.NewClient(name, network.ClientTypeProtocolGateway, options.ClientOptions...) + client := network.NewClient(name, network.ClientTypeSource, options.ClientOptions...) return &accessPoint{ name: name, @@ -53,21 +56,25 @@ func NewAccessPoint(name string, opts ...Option) AccessPoint { // Write the data to downstream. func (s *accessPoint) Write(data []byte) (int, error) { - return len(data), s.WriteWithTag(s.tag, data) + err := s.WriteWithTag(s.tag, data) + if err != nil { + return 0, err + } + return len(data), nil } // SetDataTag will set the tag of data when invoking Write(). -func (s *accessPoint) SetDataTag(tag uint8) { +func (s *accessPoint) SetDataTag(tag frame.Tag) { s.tag = tag } // Close will close the connection to MessageQueue. func (s *accessPoint) Close() error { if err := s.client.Close(); err != nil { - s.client.Logger().Errorf("%sClose(): %v", apLogPrefix, err) + log.Errorf("%sClose(): %v", apLogPrefix, err) return err } - s.client.Logger().Debugf("%s is closed", apLogPrefix) + log.Debugf("%s is closed", apLogPrefix) return nil } @@ -82,17 +89,17 @@ func (s *accessPoint) Connect() error { err := s.client.Connect(context.Background(), s.mqEndpoint) if err != nil { - s.client.Logger().Errorf("%sConnect() error: %s", apLogPrefix, err) + log.Errorf("%sConnect() error: %s", apLogPrefix, err) } return err } // WriteWithTag will write data with specified tag, default transactionID is epoch time. -func (s *accessPoint) WriteWithTag(tag uint8, data []byte) error { +func (s *accessPoint) WriteWithTag(tag frame.Tag, data []byte) error { f := frame.NewDataFrame() f.SetCarriage(tag, data) f.SetSourceId(s.client.ClientId()) - s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x", + log.Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x", apLogPrefix, f.TransactionId(), f.SourceId(), len(data), frame.Shortly(data)) return s.client.WriteFrame(f) } @@ -103,7 +110,17 @@ func (s *accessPoint) SetErrorHandler(fn func(err error)) { } // SetReceiveHandler [Experimental] set to observe handler function -func (s *accessPoint) SetReceiveHandler(fn func(byte, []byte)) { +func (s *accessPoint) SetReceiveHandler(fn func(frame.Tag, []byte)) { s.fn = fn - s.client.Logger().Debugf("%sSetReceiveHandler(%v)", apLogPrefix, s.fn) + log.Debugf("%sSetReceiveHandler(%v)", apLogPrefix, s.fn) +} + +// Broadcast Write the data to all downstream +func (s *accessPoint) Broadcast(data []byte) error { + f := frame.NewDataFrame() + f.SetCarriage(s.tag, data) + f.SetSourceId(s.client.ClientId()) + f.SetBroadcast(true) + log.Debugf("Broadcast", "data_frame", f.String()) + return s.client.WriteFrame(f) } diff --git a/ap_test.go b/ap_test.go index 75055e5..fa0e64f 100644 --- a/ap_test.go +++ b/ap_test.go @@ -7,7 +7,9 @@ import ( func TestAccessPointSendDataToServer(t *testing.T) { ap := NewAccessPoint("test-ap") - defer ap.Close() + defer func() { + _ = ap.Close() + }() // connect to server err := ap.Connect() diff --git a/go.mod b/go.mod index 662dddf..39a8d1c 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,8 @@ module git.hpds.cc/pavement/hpds_node go 1.19 require ( - git.hpds.cc/Component/network v0.0.0-20230405031738-6ce4bf7726d3 + git.hpds.cc/Component/network v0.0.0-20230405091029-b109c53056fb github.com/disintegration/imaging v1.6.2 - github.com/lucas-clemente/quic-go v0.29.1 github.com/stretchr/testify v1.8.0 github.com/u2takey/ffmpeg-go v0.4.1 gopkg.in/yaml.v3 v3.0.1 @@ -15,30 +14,28 @@ require ( git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect github.com/aws/aws-sdk-go v1.38.20 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/golang/mock v1.6.0 // indirect + github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/kr/text v0.2.0 // indirect - github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect - github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect github.com/matoous/go-nanoid/v2 v2.0.0 // indirect - github.com/nxadm/tail v1.4.8 // indirect - github.com/onsi/ginkgo v1.16.4 // indirect + github.com/onsi/ginkgo/v2 v2.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/quic-go/qtls-go1-19 v0.2.1 // indirect + github.com/quic-go/qtls-go1-20 v0.1.1 // indirect + github.com/quic-go/quic-go v0.33.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/u2takey/go-utils v0.3.1 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.23.0 // indirect - golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect - golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect + golang.org/x/crypto v0.4.0 // indirect + golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect - golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect - golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect - golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect - golang.org/x/tools v0.1.10 // indirect - golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + golang.org/x/mod v0.6.0 // indirect + golang.org/x/net v0.4.0 // indirect + golang.org/x/sys v0.3.0 // indirect + golang.org/x/tools v0.2.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect - gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect ) diff --git a/metadata.go b/metadata.go deleted file mode 100644 index 7027987..0000000 --- a/metadata.go +++ /dev/null @@ -1,30 +0,0 @@ -package hpds_node - -import ( - "git.hpds.cc/Component/network" - "git.hpds.cc/Component/network/frame" -) - -type metadata struct{} - -func (m *metadata) Encode() []byte { - return nil -} - -type metadataBuilder struct { - m *metadata -} - -func newMetadataBuilder() network.MetadataBuilder { - return &metadataBuilder{ - m: &metadata{}, - } -} - -func (builder *metadataBuilder) Build(f *frame.HandshakeFrame) (network.Metadata, error) { - return builder.m, nil -} - -func (builder *metadataBuilder) Decode(buf []byte) (network.Metadata, error) { - return builder.m, nil -} diff --git a/mq.go b/mq.go index 3fb71f1..5a70799 100644 --- a/mq.go +++ b/mq.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "git.hpds.cc/Component/network/metadata" "net/http" "git.hpds.cc/Component/network" @@ -68,10 +69,10 @@ var _ MessageQueue = &messageQueue{} // NewMqWithOptions create a messageQueue instance. func NewMqWithOptions(name string, opts ...Option) MessageQueue { options := NewOptions(opts...) - zipper := createMessageQueueServer(name, options, nil) - _ = zipper.ConfigMesh(options.MeshConfigURL) + emitter := createMessageQueueServer(name, options, nil) + _ = emitter.ConfigMesh(options.MeshConfigURL) - return zipper + return emitter } // NewMq create a messageQueue instance from config files. @@ -86,17 +87,17 @@ func NewMq(conf string) (MessageQueue, error) { options := NewOptions() options.MqAddr = listenAddr - zipper := createMessageQueueServer(confWf.Name, options, confWf) + emitter := createMessageQueueServer(confWf.Name, options, confWf) // messageQueue workflow - err = zipper.configWorkflow(confWf) + err = emitter.configWorkflow(confWf) - return zipper, err + return emitter, err } // NewDownstreamMq create a messageQueue descriptor for downstream messageQueue. func NewDownstreamMq(name string, opts ...Option) MessageQueue { options := NewOptions(opts...) - client := network.NewClient(name, network.ClientTypeMessageQueue, options.ClientOptions...) + client := network.NewClient(name, network.ClientTypeUpstreamEmitter, options.ClientOptions...) return &messageQueue{ name: name, @@ -120,6 +121,9 @@ func createMessageQueueServer(name string, options *Options, cfg *config.Workflo z.init() return z } +func (z *messageQueue) Logger() log.Logger { + return log.Default() +} // ConfigWorkflow will read workflows from config files and register them to messageQueue. func (z *messageQueue) ConfigWorkflow(conf string) error { @@ -134,7 +138,7 @@ func (z *messageQueue) ConfigWorkflow(conf string) error { func (z *messageQueue) configWorkflow(config *config.WorkflowConfig) error { z.wfc = config - z.server.ConfigMetadataBuilder(newMetadataBuilder()) + z.server.ConfigMetadataBuilder(metadata.DefaultBuilder()) z.server.ConfigRouter(newRouter(config.Functions)) return nil } @@ -188,11 +192,11 @@ func (z *messageQueue) ListenAndServe() error { log.Debugf("%sCreating MessageQueue Server ...", mqLogPrefix) // check downstream zippers for _, ds := range z.downstreamMqs { - if dsZipper, ok := ds.(*messageQueue); ok { - go func(dsZipper *messageQueue) { - _ = dsZipper.client.Connect(context.Background(), dsZipper.addr) - z.server.AddDownstreamServer(dsZipper.addr, dsZipper.client) - }(dsZipper) + if dsEmitter, ok := ds.(*messageQueue); ok { + go func(dsEmitter *messageQueue) { + _ = dsEmitter.client.Connect(context.Background(), dsEmitter.addr) + z.server.AddDownstreamServer(dsEmitter.addr, dsEmitter.client) + }(dsEmitter) } } return z.server.ListenAndServe(context.Background(), z.addr) @@ -253,7 +257,7 @@ func (z *messageQueue) Stats() int { log.Printf("[%s] all downstream mq connected: %d", z.name, len(z.server.DownStreams())) for k, v := range z.server.DownStreams() { - log.Printf("[%s] |> [%s] %s", z.name, k, v.ServerAddr()) + log.Printf("[%s] |> [%s] %v", z.name, k, v) } log.Printf("[%s] total DataFrames received: %d", z.name, z.server.StatsCounter()) diff --git a/mq_notwindows.go b/mq_notwindows.go index 4d7307f..4f1be1c 100644 --- a/mq_notwindows.go +++ b/mq_notwindows.go @@ -26,6 +26,7 @@ func (z *messageQueue) init() { log.Printf("Received signal: %s", p1) if p1 == syscall.SIGTERM || p1 == syscall.SIGINT { log.Printf("graceful shutting down ... %s", p1) + _ = z.Close() os.Exit(0) } else if p1 == syscall.SIGUSR2 { var m runtime.MemStats diff --git a/options.go b/options.go index 24be4ca..6b6589b 100644 --- a/options.go +++ b/options.go @@ -2,10 +2,10 @@ package hpds_node import ( "crypto/tls" - "github.com/lucas-clemente/quic-go" - "git.hpds.cc/Component/network" + "git.hpds.cc/Component/network/frame" "git.hpds.cc/Component/network/log" + "github.com/quic-go/quic-go" ) const ( @@ -101,7 +101,7 @@ func WithCredential(payload string) Option { } // WithObserveDataTags sets client data tag list. -func WithObserveDataTags(tags ...byte) Option { +func WithObserveDataTags(tags ...frame.Tag) Option { return func(o *Options) { o.ClientOptions = append( o.ClientOptions, @@ -110,16 +110,6 @@ func WithObserveDataTags(tags ...byte) Option { } } -// WithLogger sets the client logger -func WithLogger(logger log.Logger) Option { - return func(o *Options) { - o.ClientOptions = append( - o.ClientOptions, - network.WithLogger(logger), - ) - } -} - // NewOptions creates a new options for YoMo-Client. func NewOptions(opts ...Option) *Options { options := &Options{} diff --git a/stream_func.go b/stream_func.go index 59924ba..643fac1 100644 --- a/stream_func.go +++ b/stream_func.go @@ -14,7 +14,7 @@ const ( type StreamFunction interface { // SetObserveDataTags set the data tag list that will be observed // Deprecated: use hpds.WithObserveDataTags instead - SetObserveDataTags(tag ...byte) + SetObserveDataTags(tag ...frame.Tag) // SetHandler set the handler function, which accept the raw bytes data and return the tag & response SetHandler(fn network.AsyncHandler) error // SetErrorHandler set the error handler function when server error occurs @@ -26,7 +26,7 @@ type StreamFunction interface { // Close will close the connection Close() error // Write Send a data to mq. - Write(tag byte, carriage []byte) error + Write(tag frame.Tag, carriage []byte) error } // NewStreamFunction create a stream function. @@ -35,9 +35,9 @@ func NewStreamFunction(name string, opts ...Option) StreamFunction { client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...) sf := &streamFunction{ name: name, - zipperEndpoint: options.MqAddr, + emitterEndpoint: options.MqAddr, client: client, - observeDataTags: make([]byte, 0), + observeDataTags: make([]frame.Tag, 0), } return sf @@ -48,9 +48,9 @@ var _ StreamFunction = &streamFunction{} // streamFunction implements StreamFunction interface. type streamFunction struct { name string - zipperEndpoint string + emitterEndpoint string client *network.Client - observeDataTags []byte // tag list that will be observed + observeDataTags []frame.Tag // tag list that will be observed fn network.AsyncHandler // user's function which will be invoked when data arrived pfn network.PipeHandler pIn chan []byte @@ -59,8 +59,11 @@ type streamFunction struct { // SetObserveDataTags set the data tag list that will be observed. // Deprecated: use hpds.WithObserveDataTags instead -func (s *streamFunction) SetObserveDataTags(tag ...byte) { +func (s *streamFunction) SetObserveDataTags(tag ...frame.Tag) { s.client.SetObserveDataTags(tag...) + //co := network.WithObserveDataTags(tag...) + //s.client. + //s.client.SetObserveDataTags(tag...) s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags) } @@ -102,16 +105,16 @@ func (s *streamFunction) Connect() error { data := <-s.pOut if data != nil { s.client.Logger().Debugf("%spipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage) - frame := frame.NewDataFrame() - // todo: frame.SetTransactionId - frame.SetCarriage(data.Tag, data.Carriage) - s.client.WriteFrame(frame) + frm := frame.NewDataFrame() + // todo: frm.SetTransactionId + frm.SetCarriage(data.Tag, data.Carriage) + _ = s.client.WriteFrame(frm) } } }() } - err := s.client.Connect(context.Background(), s.zipperEndpoint) + err := s.client.Connect(context.Background(), s.emitterEndpoint) if err != nil { s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err) } @@ -153,13 +156,13 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { s.client.Logger().Debugf("%sstart WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) // build a DataFrame // TODO: seems we should implement a DeepCopy() of MetaFrame in the future - frame := frame.NewDataFrame() + frm := frame.NewDataFrame() // reuse transactionId - frame.SetTransactionId(metaFrame.TransactionId()) + frm.SetTransactionId(metaFrame.TransactionId()) // reuse sourceId - frame.SetSourceId(metaFrame.SourceId()) - frame.SetCarriage(tag, resp) - s.client.WriteFrame(frame) + frm.SetSourceId(metaFrame.SourceId()) + frm.SetCarriage(tag, resp) + _ = s.client.WriteFrame(frm) } }() } else if s.pfn != nil { @@ -171,7 +174,7 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { } // Send a DataFrame to mq. -func (s *streamFunction) Write(tag byte, carriage []byte) error { +func (s *streamFunction) Write(tag frame.Tag, carriage []byte) error { fm := frame.NewDataFrame() fm.SetCarriage(tag, carriage) return s.client.WriteFrame(fm)