Compare commits
No commits in common. "main" and "v1.4" have entirely different histories.
51
ap.go
51
ap.go
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"git.hpds.cc/Component/network"
|
"git.hpds.cc/Component/network"
|
||||||
"git.hpds.cc/Component/network/frame"
|
"git.hpds.cc/Component/network/frame"
|
||||||
"git.hpds.cc/Component/network/log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -18,17 +17,15 @@ type AccessPoint interface {
|
||||||
// Connect to HPDS-Mq.
|
// Connect to HPDS-Mq.
|
||||||
Connect() error
|
Connect() error
|
||||||
// SetDataTag will set the tag of data when invoking Write().
|
// SetDataTag will set the tag of data when invoking Write().
|
||||||
SetDataTag(tag frame.Tag)
|
SetDataTag(tag uint8)
|
||||||
// Write the data to downstream.
|
// Write the data to downstream.
|
||||||
Write(p []byte) (n int, err error)
|
Write(p []byte) (n int, err error)
|
||||||
// WriteWithTag will write data with specified tag, default transactionId is epoch time.
|
// WriteWithTag will write data with specified tag, default transactionId is epoch time.
|
||||||
WriteWithTag(tag frame.Tag, data []byte) error
|
WriteWithTag(tag uint8, data []byte) error
|
||||||
// SetErrorHandler set the error handler function when server error occurs
|
// SetErrorHandler set the error handler function when server error occurs
|
||||||
SetErrorHandler(fn func(err error))
|
SetErrorHandler(fn func(err error))
|
||||||
// SetReceiveHandler [Experimental] set to observe handler function
|
// SetReceiveHandler [Experimental] set to observe handler function
|
||||||
SetReceiveHandler(fn func(tag frame.Tag, data []byte))
|
SetReceiveHandler(fn func(tag byte, data []byte))
|
||||||
// Broadcast the data to all downstream
|
|
||||||
Broadcast(data []byte) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// hpds-AccessPoint
|
// hpds-AccessPoint
|
||||||
|
@ -36,8 +33,8 @@ type accessPoint struct {
|
||||||
name string
|
name string
|
||||||
mqEndpoint string
|
mqEndpoint string
|
||||||
client *network.Client
|
client *network.Client
|
||||||
tag frame.Tag
|
tag uint8
|
||||||
fn func(frame.Tag, []byte)
|
fn func(byte, []byte)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ AccessPoint = &accessPoint{}
|
var _ AccessPoint = &accessPoint{}
|
||||||
|
@ -45,7 +42,7 @@ var _ AccessPoint = &accessPoint{}
|
||||||
// NewAccessPoint create a hpds-AccessPoint
|
// NewAccessPoint create a hpds-AccessPoint
|
||||||
func NewAccessPoint(name string, opts ...Option) AccessPoint {
|
func NewAccessPoint(name string, opts ...Option) AccessPoint {
|
||||||
options := NewOptions(opts...)
|
options := NewOptions(opts...)
|
||||||
client := network.NewClient(name, network.ClientTypeSource, options.ClientOptions...)
|
client := network.NewClient(name, network.ClientTypeProtocolGateway, options.ClientOptions...)
|
||||||
|
|
||||||
return &accessPoint{
|
return &accessPoint{
|
||||||
name: name,
|
name: name,
|
||||||
|
@ -56,25 +53,21 @@ func NewAccessPoint(name string, opts ...Option) AccessPoint {
|
||||||
|
|
||||||
// Write the data to downstream.
|
// Write the data to downstream.
|
||||||
func (s *accessPoint) Write(data []byte) (int, error) {
|
func (s *accessPoint) Write(data []byte) (int, error) {
|
||||||
err := s.WriteWithTag(s.tag, data)
|
return len(data), s.WriteWithTag(s.tag, data)
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return len(data), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetDataTag will set the tag of data when invoking Write().
|
// SetDataTag will set the tag of data when invoking Write().
|
||||||
func (s *accessPoint) SetDataTag(tag frame.Tag) {
|
func (s *accessPoint) SetDataTag(tag uint8) {
|
||||||
s.tag = tag
|
s.tag = tag
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close will close the connection to MessageQueue.
|
// Close will close the connection to YoMo-MessageQueue.
|
||||||
func (s *accessPoint) Close() error {
|
func (s *accessPoint) Close() error {
|
||||||
if err := s.client.Close(); err != nil {
|
if err := s.client.Close(); err != nil {
|
||||||
log.Errorf("%s Close(): %v", apLogPrefix, err)
|
s.client.Logger().Errorf("%sClose(): %v", apLogPrefix, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Debugf("%s is closed", apLogPrefix)
|
s.client.Logger().Debugf("%s is closed", apLogPrefix)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,18 +82,18 @@ func (s *accessPoint) Connect() error {
|
||||||
|
|
||||||
err := s.client.Connect(context.Background(), s.mqEndpoint)
|
err := s.client.Connect(context.Background(), s.mqEndpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("%s Connect() error: %s", apLogPrefix, err)
|
s.client.Logger().Errorf("%sConnect() error: %s", apLogPrefix, err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteWithTag will write data with specified tag, default transactionID is epoch time.
|
// WriteWithTag will write data with specified tag, default transactionID is epoch time.
|
||||||
func (s *accessPoint) WriteWithTag(tag frame.Tag, data []byte) error {
|
func (s *accessPoint) WriteWithTag(tag uint8, data []byte) error {
|
||||||
f := frame.NewDataFrame()
|
f := frame.NewDataFrame()
|
||||||
f.SetCarriage(tag, data)
|
f.SetCarriage(tag, data)
|
||||||
f.SetSourceId(s.client.ClientId())
|
f.SetSourceId(s.client.ClientId())
|
||||||
log.Debugf("%s WriteWithTag: tid=%s, source_id=%s, data[%d]",
|
s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x",
|
||||||
apLogPrefix, f.TransactionId(), f.SourceId(), len(data))
|
apLogPrefix, f.TransactionId(), f.SourceId(), len(data), frame.Shortly(data))
|
||||||
return s.client.WriteFrame(f)
|
return s.client.WriteFrame(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,17 +103,7 @@ func (s *accessPoint) SetErrorHandler(fn func(err error)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetReceiveHandler [Experimental] set to observe handler function
|
// SetReceiveHandler [Experimental] set to observe handler function
|
||||||
func (s *accessPoint) SetReceiveHandler(fn func(frame.Tag, []byte)) {
|
func (s *accessPoint) SetReceiveHandler(fn func(byte, []byte)) {
|
||||||
s.fn = fn
|
s.fn = fn
|
||||||
log.Debugf("%s SetReceiveHandler(%v)", apLogPrefix, s.fn)
|
s.client.Logger().Debugf("%sSetReceiveHandler(%v)", apLogPrefix, s.fn)
|
||||||
}
|
|
||||||
|
|
||||||
// Broadcast Write the data to all downstream
|
|
||||||
func (s *accessPoint) Broadcast(data []byte) error {
|
|
||||||
f := frame.NewDataFrame()
|
|
||||||
f.SetCarriage(s.tag, data)
|
|
||||||
f.SetSourceId(s.client.ClientId())
|
|
||||||
f.SetBroadcast(true)
|
|
||||||
log.Debugf("Broadcast", "data_frame", f.String())
|
|
||||||
return s.client.WriteFrame(f)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,9 +7,7 @@ import (
|
||||||
|
|
||||||
func TestAccessPointSendDataToServer(t *testing.T) {
|
func TestAccessPointSendDataToServer(t *testing.T) {
|
||||||
ap := NewAccessPoint("test-ap")
|
ap := NewAccessPoint("test-ap")
|
||||||
defer func() {
|
defer ap.Close()
|
||||||
_ = ap.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// connect to server
|
// connect to server
|
||||||
err := ap.Connect()
|
err := ap.Connect()
|
||||||
|
|
33
go.mod
33
go.mod
|
@ -3,9 +3,9 @@ module git.hpds.cc/pavement/hpds_node
|
||||||
go 1.19
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.hpds.cc/Component/network v0.0.0-20230421024959-bf7300c92a95
|
git.hpds.cc/Component/network v0.0.0-20230326151855-3c157f531d86
|
||||||
github.com/disintegration/imaging v1.6.2
|
github.com/disintegration/imaging v1.6.2
|
||||||
github.com/quic-go/quic-go v0.33.0
|
github.com/lucas-clemente/quic-go v0.29.1
|
||||||
github.com/stretchr/testify v1.8.0
|
github.com/stretchr/testify v1.8.0
|
||||||
github.com/u2takey/ffmpeg-go v0.4.1
|
github.com/u2takey/ffmpeg-go v0.4.1
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
|
@ -15,31 +15,30 @@ require (
|
||||||
git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect
|
git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect
|
||||||
github.com/aws/aws-sdk-go v1.38.20 // indirect
|
github.com/aws/aws-sdk-go v1.38.20 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
|
github.com/fsnotify/fsnotify v1.4.9 // indirect
|
||||||
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
|
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
|
||||||
github.com/golang/mock v1.6.0 // indirect
|
github.com/golang/mock v1.6.0 // indirect
|
||||||
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
|
|
||||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||||
github.com/kr/text v0.2.0 // indirect
|
github.com/kr/text v0.2.0 // indirect
|
||||||
|
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
|
||||||
|
github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect
|
||||||
github.com/matoous/go-nanoid/v2 v2.0.0 // indirect
|
github.com/matoous/go-nanoid/v2 v2.0.0 // indirect
|
||||||
github.com/onsi/ginkgo/v2 v2.2.0 // indirect
|
github.com/nxadm/tail v1.4.8 // indirect
|
||||||
|
github.com/onsi/ginkgo v1.16.4 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/quic-go/qtls-go1-19 v0.2.1 // indirect
|
|
||||||
github.com/quic-go/qtls-go1-20 v0.1.1 // indirect
|
|
||||||
github.com/rogpeppe/go-internal v1.9.0 // indirect
|
github.com/rogpeppe/go-internal v1.9.0 // indirect
|
||||||
github.com/u2takey/go-utils v0.3.1 // indirect
|
github.com/u2takey/go-utils v0.3.1 // indirect
|
||||||
go.uber.org/atomic v1.7.0 // indirect
|
go.uber.org/atomic v1.7.0 // indirect
|
||||||
go.uber.org/multierr v1.6.0 // indirect
|
go.uber.org/multierr v1.6.0 // indirect
|
||||||
go.uber.org/zap v1.23.0 // indirect
|
go.uber.org/zap v1.23.0 // indirect
|
||||||
golang.org/x/crypto v0.4.0 // indirect
|
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
|
||||||
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
|
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
|
||||||
golang.org/x/image v0.1.0 // indirect
|
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect
|
||||||
golang.org/x/mod v0.6.0 // indirect
|
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
|
||||||
golang.org/x/net v0.5.0 // indirect
|
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
|
||||||
golang.org/x/sys v0.4.0 // indirect
|
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
|
||||||
golang.org/x/tools v0.2.0 // indirect
|
golang.org/x/tools v0.1.10 // indirect
|
||||||
|
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
|
||||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
||||||
|
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
//replace (
|
|
||||||
// git.hpds.cc/Component/network => ../network
|
|
||||||
//)
|
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
package hpds_node
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.hpds.cc/Component/network"
|
||||||
|
"git.hpds.cc/Component/network/frame"
|
||||||
|
)
|
||||||
|
|
||||||
|
type metadata struct{}
|
||||||
|
|
||||||
|
func (m *metadata) Encode() []byte {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type metadataBuilder struct {
|
||||||
|
m *metadata
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMetadataBuilder() network.MetadataBuilder {
|
||||||
|
return &metadataBuilder{
|
||||||
|
m: &metadata{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (builder *metadataBuilder) Build(f *frame.HandshakeFrame) (network.Metadata, error) {
|
||||||
|
return builder.m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (builder *metadataBuilder) Decode(buf []byte) (network.Metadata, error) {
|
||||||
|
return builder.m, nil
|
||||||
|
}
|
39
mq.go
39
mq.go
|
@ -4,8 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.hpds.cc/Component/network/metadata"
|
|
||||||
"git.hpds.cc/Component/network/router"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"git.hpds.cc/Component/network"
|
"git.hpds.cc/Component/network"
|
||||||
|
@ -70,10 +68,10 @@ var _ MessageQueue = &messageQueue{}
|
||||||
// NewMqWithOptions create a messageQueue instance.
|
// NewMqWithOptions create a messageQueue instance.
|
||||||
func NewMqWithOptions(name string, opts ...Option) MessageQueue {
|
func NewMqWithOptions(name string, opts ...Option) MessageQueue {
|
||||||
options := NewOptions(opts...)
|
options := NewOptions(opts...)
|
||||||
emitter := createMessageQueueServer(name, options, nil)
|
zipper := createMessageQueueServer(name, options, nil)
|
||||||
_ = emitter.ConfigMesh(options.MeshConfigURL)
|
_ = zipper.ConfigMesh(options.MeshConfigURL)
|
||||||
|
|
||||||
return emitter
|
return zipper
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMq create a messageQueue instance from config files.
|
// NewMq create a messageQueue instance from config files.
|
||||||
|
@ -88,17 +86,17 @@ func NewMq(conf string) (MessageQueue, error) {
|
||||||
|
|
||||||
options := NewOptions()
|
options := NewOptions()
|
||||||
options.MqAddr = listenAddr
|
options.MqAddr = listenAddr
|
||||||
emitter := createMessageQueueServer(confWf.Name, options, confWf)
|
zipper := createMessageQueueServer(confWf.Name, options, confWf)
|
||||||
// messageQueue workflow
|
// messageQueue workflow
|
||||||
err = emitter.configWorkflow(confWf)
|
err = zipper.configWorkflow(confWf)
|
||||||
|
|
||||||
return emitter, err
|
return zipper, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDownstreamMq create a messageQueue descriptor for downstream messageQueue.
|
// NewDownstreamMq create a messageQueue descriptor for downstream messageQueue.
|
||||||
func NewDownstreamMq(name string, opts ...Option) MessageQueue {
|
func NewDownstreamMq(name string, opts ...Option) MessageQueue {
|
||||||
options := NewOptions(opts...)
|
options := NewOptions(opts...)
|
||||||
client := network.NewClient(name, network.ClientTypeUpstreamEmitter, options.ClientOptions...)
|
client := network.NewClient(name, network.ClientTypeMessageQueue, options.ClientOptions...)
|
||||||
|
|
||||||
return &messageQueue{
|
return &messageQueue{
|
||||||
name: name,
|
name: name,
|
||||||
|
@ -122,9 +120,6 @@ func createMessageQueueServer(name string, options *Options, cfg *config.Workflo
|
||||||
z.init()
|
z.init()
|
||||||
return z
|
return z
|
||||||
}
|
}
|
||||||
func (z *messageQueue) Logger() log.Logger {
|
|
||||||
return log.Default()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ConfigWorkflow will read workflows from config files and register them to messageQueue.
|
// ConfigWorkflow will read workflows from config files and register them to messageQueue.
|
||||||
func (z *messageQueue) ConfigWorkflow(conf string) error {
|
func (z *messageQueue) ConfigWorkflow(conf string) error {
|
||||||
|
@ -139,12 +134,8 @@ func (z *messageQueue) ConfigWorkflow(conf string) error {
|
||||||
|
|
||||||
func (z *messageQueue) configWorkflow(config *config.WorkflowConfig) error {
|
func (z *messageQueue) configWorkflow(config *config.WorkflowConfig) error {
|
||||||
z.wfc = config
|
z.wfc = config
|
||||||
z.server.ConfigMetadataBuilder(metadata.DefaultBuilder())
|
z.server.ConfigMetadataBuilder(newMetadataBuilder())
|
||||||
funcList := make([]string, len(config.Functions))
|
z.server.ConfigRouter(newRouter(config.Functions))
|
||||||
for k, v := range config.Functions {
|
|
||||||
funcList[k] = v.Name
|
|
||||||
}
|
|
||||||
z.server.ConfigRouter(router.Default(funcList))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,11 +188,11 @@ func (z *messageQueue) ListenAndServe() error {
|
||||||
log.Debugf("%sCreating MessageQueue Server ...", mqLogPrefix)
|
log.Debugf("%sCreating MessageQueue Server ...", mqLogPrefix)
|
||||||
// check downstream zippers
|
// check downstream zippers
|
||||||
for _, ds := range z.downstreamMqs {
|
for _, ds := range z.downstreamMqs {
|
||||||
if dsEmitter, ok := ds.(*messageQueue); ok {
|
if dsZipper, ok := ds.(*messageQueue); ok {
|
||||||
go func(dsEmitter *messageQueue) {
|
go func(dsZipper *messageQueue) {
|
||||||
_ = dsEmitter.client.Connect(context.Background(), dsEmitter.addr)
|
_ = dsZipper.client.Connect(context.Background(), dsZipper.addr)
|
||||||
z.server.AddDownstreamServer(dsEmitter.addr, dsEmitter.client)
|
z.server.AddDownstreamServer(dsZipper.addr, dsZipper.client)
|
||||||
}(dsEmitter)
|
}(dsZipper)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return z.server.ListenAndServe(context.Background(), z.addr)
|
return z.server.ListenAndServe(context.Background(), z.addr)
|
||||||
|
@ -262,7 +253,7 @@ func (z *messageQueue) Stats() int {
|
||||||
|
|
||||||
log.Printf("[%s] all downstream mq connected: %d", z.name, len(z.server.DownStreams()))
|
log.Printf("[%s] all downstream mq connected: %d", z.name, len(z.server.DownStreams()))
|
||||||
for k, v := range z.server.DownStreams() {
|
for k, v := range z.server.DownStreams() {
|
||||||
log.Printf("[%s] |> [%s] %v", z.name, k, v)
|
log.Printf("[%s] |> [%s] %s", z.name, k, v.ServerAddr())
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("[%s] total DataFrames received: %d", z.name, z.server.StatsCounter())
|
log.Printf("[%s] total DataFrames received: %d", z.name, z.server.StatsCounter())
|
||||||
|
|
|
@ -21,19 +21,18 @@ func (z *messageQueue) init() {
|
||||||
go func() {
|
go func() {
|
||||||
c := make(chan os.Signal, 1)
|
c := make(chan os.Signal, 1)
|
||||||
signal.Notify(c, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGUSR1, syscall.SIGINT)
|
signal.Notify(c, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGUSR1, syscall.SIGINT)
|
||||||
log.Infof("%sListening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...", mqLogPrefix)
|
log.Printf("%sListening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...", mqLogPrefix)
|
||||||
for p1 := range c {
|
for p1 := range c {
|
||||||
log.Infof("Received signal: %s", p1)
|
log.Printf("Received signal: %s", p1)
|
||||||
if p1 == syscall.SIGTERM || p1 == syscall.SIGINT {
|
if p1 == syscall.SIGTERM || p1 == syscall.SIGINT {
|
||||||
log.Infof("graceful shutting down ... %s", p1)
|
log.Printf("graceful shutting down ... %s", p1)
|
||||||
_ = z.Close()
|
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
} else if p1 == syscall.SIGUSR2 {
|
} else if p1 == syscall.SIGUSR2 {
|
||||||
var m runtime.MemStats
|
var m runtime.MemStats
|
||||||
runtime.ReadMemStats(&m)
|
runtime.ReadMemStats(&m)
|
||||||
fmt.Printf("\tNumGC = %v\n", m.NumGC)
|
fmt.Printf("\tNumGC = %v\n", m.NumGC)
|
||||||
} else if p1 == syscall.SIGUSR1 {
|
} else if p1 == syscall.SIGUSR1 {
|
||||||
log.Infof("print MessageQueue stats(): %d", z.Stats())
|
log.Printf("print MessageQueue stats(): %d", z.Stats())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -13,11 +13,11 @@ func (z *messageQueue) init() {
|
||||||
go func() {
|
go func() {
|
||||||
c := make(chan os.Signal, 1)
|
c := make(chan os.Signal, 1)
|
||||||
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
|
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
|
||||||
log.Infof("%s Listening SIGTERM/SIGINT...", mqLogPrefix)
|
log.Printf("%sListening SIGTERM/SIGINT...", mqLogPrefix)
|
||||||
for p1 := range c {
|
for p1 := range c {
|
||||||
log.Printf("Received signal: %s", p1)
|
log.Printf("Received signal: %s", p1)
|
||||||
if p1 == syscall.SIGTERM || p1 == syscall.SIGINT {
|
if p1 == syscall.SIGTERM || p1 == syscall.SIGINT {
|
||||||
log.Infof("%s graceful shutting down ... %s", mqLogPrefix, p1)
|
log.Printf("graceful shutting down ... %s", p1)
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
16
options.go
16
options.go
|
@ -2,10 +2,10 @@ package hpds_node
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"github.com/lucas-clemente/quic-go"
|
||||||
|
|
||||||
"git.hpds.cc/Component/network"
|
"git.hpds.cc/Component/network"
|
||||||
"git.hpds.cc/Component/network/frame"
|
|
||||||
"git.hpds.cc/Component/network/log"
|
"git.hpds.cc/Component/network/log"
|
||||||
"github.com/quic-go/quic-go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -101,7 +101,7 @@ func WithCredential(payload string) Option {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithObserveDataTags sets client data tag list.
|
// WithObserveDataTags sets client data tag list.
|
||||||
func WithObserveDataTags(tags ...frame.Tag) Option {
|
func WithObserveDataTags(tags ...byte) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ClientOptions = append(
|
o.ClientOptions = append(
|
||||||
o.ClientOptions,
|
o.ClientOptions,
|
||||||
|
@ -110,6 +110,16 @@ func WithObserveDataTags(tags ...frame.Tag) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLogger sets the client logger
|
||||||
|
func WithLogger(logger log.Logger) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ClientOptions = append(
|
||||||
|
o.ClientOptions,
|
||||||
|
network.WithLogger(logger),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewOptions creates a new options for YoMo-Client.
|
// NewOptions creates a new options for YoMo-Client.
|
||||||
func NewOptions(opts ...Option) *Options {
|
func NewOptions(opts ...Option) *Options {
|
||||||
options := &Options{}
|
options := &Options{}
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
package hpds_node
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.hpds.cc/Component/network"
|
||||||
|
"git.hpds.cc/pavement/hpds_node/config"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type router struct {
|
||||||
|
r *route
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRouter(functions []config.App) network.Router {
|
||||||
|
return &router{r: newRoute(functions)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *router) Route(metadata network.Metadata) network.Route {
|
||||||
|
return r.r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *router) Clean() {
|
||||||
|
r.r = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type route struct {
|
||||||
|
functions []config.App
|
||||||
|
data map[byte]map[string]string
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRoute(functions []config.App) *route {
|
||||||
|
return &route{
|
||||||
|
functions: functions,
|
||||||
|
data: make(map[byte]map[string]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *route) Add(connId string, name string, observeDataTags []byte) (err error) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
ok := false
|
||||||
|
for _, v := range r.functions {
|
||||||
|
if v.Name == name {
|
||||||
|
ok = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
//return fmt.Errorf("SFN[%s] does not exist in config functions", name)
|
||||||
|
//如果不存在,自动增加
|
||||||
|
item := config.App{
|
||||||
|
Name: name,
|
||||||
|
}
|
||||||
|
r.functions = append(r.functions, item)
|
||||||
|
}
|
||||||
|
//去除只能订阅一次的问题
|
||||||
|
//LOOP:
|
||||||
|
//for _, connects := range r.data {
|
||||||
|
// for connKey, n := range connects {
|
||||||
|
// if n == name {
|
||||||
|
// err = hpds_err.NewDuplicateNameError(connKey, fmt.Errorf("node:Stream Function[%s] is already linked to another connection", name))
|
||||||
|
// delete(connects, connKey)
|
||||||
|
// break LOOP
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
|
for _, tag := range observeDataTags {
|
||||||
|
connects := r.data[tag]
|
||||||
|
if connects == nil {
|
||||||
|
connects = make(map[string]string)
|
||||||
|
r.data[tag] = connects
|
||||||
|
}
|
||||||
|
r.data[tag][connId] = name
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *route) Remove(connId string) error {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
for _, connects := range r.data {
|
||||||
|
delete(connects, connId)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *route) GetForwardRoutes(tag byte) []string {
|
||||||
|
r.mu.RLock()
|
||||||
|
defer r.mu.RUnlock()
|
||||||
|
|
||||||
|
var keys []string
|
||||||
|
if connects := r.data[tag]; connects != nil {
|
||||||
|
for k := range connects {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return keys
|
||||||
|
}
|
|
@ -14,7 +14,7 @@ const (
|
||||||
type StreamFunction interface {
|
type StreamFunction interface {
|
||||||
// SetObserveDataTags set the data tag list that will be observed
|
// SetObserveDataTags set the data tag list that will be observed
|
||||||
// Deprecated: use hpds.WithObserveDataTags instead
|
// Deprecated: use hpds.WithObserveDataTags instead
|
||||||
SetObserveDataTags(tag ...frame.Tag)
|
SetObserveDataTags(tag ...byte)
|
||||||
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response
|
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response
|
||||||
SetHandler(fn network.AsyncHandler) error
|
SetHandler(fn network.AsyncHandler) error
|
||||||
// SetErrorHandler set the error handler function when server error occurs
|
// SetErrorHandler set the error handler function when server error occurs
|
||||||
|
@ -26,7 +26,7 @@ type StreamFunction interface {
|
||||||
// Close will close the connection
|
// Close will close the connection
|
||||||
Close() error
|
Close() error
|
||||||
// Write Send a data to mq.
|
// Write Send a data to mq.
|
||||||
Write(tag frame.Tag, carriage []byte) error
|
Write(tag byte, carriage []byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStreamFunction create a stream function.
|
// NewStreamFunction create a stream function.
|
||||||
|
@ -35,9 +35,9 @@ func NewStreamFunction(name string, opts ...Option) StreamFunction {
|
||||||
client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...)
|
client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...)
|
||||||
sf := &streamFunction{
|
sf := &streamFunction{
|
||||||
name: name,
|
name: name,
|
||||||
emitterEndpoint: options.MqAddr,
|
zipperEndpoint: options.MqAddr,
|
||||||
client: client,
|
client: client,
|
||||||
observeDataTags: make([]frame.Tag, 0),
|
observeDataTags: make([]byte, 0),
|
||||||
}
|
}
|
||||||
|
|
||||||
return sf
|
return sf
|
||||||
|
@ -48,9 +48,9 @@ var _ StreamFunction = &streamFunction{}
|
||||||
// streamFunction implements StreamFunction interface.
|
// streamFunction implements StreamFunction interface.
|
||||||
type streamFunction struct {
|
type streamFunction struct {
|
||||||
name string
|
name string
|
||||||
emitterEndpoint string
|
zipperEndpoint string
|
||||||
client *network.Client
|
client *network.Client
|
||||||
observeDataTags []frame.Tag // tag list that will be observed
|
observeDataTags []byte // tag list that will be observed
|
||||||
fn network.AsyncHandler // user's function which will be invoked when data arrived
|
fn network.AsyncHandler // user's function which will be invoked when data arrived
|
||||||
pfn network.PipeHandler
|
pfn network.PipeHandler
|
||||||
pIn chan []byte
|
pIn chan []byte
|
||||||
|
@ -59,21 +59,21 @@ type streamFunction struct {
|
||||||
|
|
||||||
// SetObserveDataTags set the data tag list that will be observed.
|
// SetObserveDataTags set the data tag list that will be observed.
|
||||||
// Deprecated: use hpds.WithObserveDataTags instead
|
// Deprecated: use hpds.WithObserveDataTags instead
|
||||||
func (s *streamFunction) SetObserveDataTags(tag ...frame.Tag) {
|
func (s *streamFunction) SetObserveDataTags(tag ...byte) {
|
||||||
s.client.SetObserveDataTags(tag...)
|
s.client.SetObserveDataTags(tag...)
|
||||||
s.client.Logger().Debugf("%s SetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags)
|
s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response.
|
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response.
|
||||||
func (s *streamFunction) SetHandler(fn network.AsyncHandler) error {
|
func (s *streamFunction) SetHandler(fn network.AsyncHandler) error {
|
||||||
s.fn = fn
|
s.fn = fn
|
||||||
s.client.Logger().Debugf("%s SetHandler(%v)", streamFunctionLogPrefix, s.fn)
|
s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamFunction) SetPipeHandler(fn network.PipeHandler) error {
|
func (s *streamFunction) SetPipeHandler(fn network.PipeHandler) error {
|
||||||
s.pfn = fn
|
s.pfn = fn
|
||||||
s.client.Logger().Debugf("%s SetHandler(%v)", streamFunctionLogPrefix, s.fn)
|
s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ func (s *streamFunction) Connect() error {
|
||||||
s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix)
|
s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix)
|
||||||
// notify underlying network operations, when data with tag we observed arrived, invoke the func
|
// notify underlying network operations, when data with tag we observed arrived, invoke the func
|
||||||
s.client.SetDataFrameObserver(func(data *frame.DataFrame) {
|
s.client.SetDataFrameObserver(func(data *frame.DataFrame) {
|
||||||
s.client.Logger().Debugf("%s receive DataFrame, tag=%# x, carriage length=%d", streamFunctionLogPrefix, data.Tag(), len(data.GetCarriage()))
|
s.client.Logger().Debugf("%sreceive DataFrame, tag=%# x, carriage=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage())
|
||||||
s.onDataFrame(data.GetCarriage(), data.GetMetaFrame())
|
s.onDataFrame(data.GetCarriage(), data.GetMetaFrame())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -101,19 +101,19 @@ func (s *streamFunction) Connect() error {
|
||||||
for {
|
for {
|
||||||
data := <-s.pOut
|
data := <-s.pOut
|
||||||
if data != nil {
|
if data != nil {
|
||||||
s.client.Logger().Debugf("%s pipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage)
|
s.client.Logger().Debugf("%spipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage)
|
||||||
frm := frame.NewDataFrame()
|
frame := frame.NewDataFrame()
|
||||||
// todo: frm.SetTransactionId
|
// todo: frame.SetTransactionId
|
||||||
frm.SetCarriage(data.Tag, data.Carriage)
|
frame.SetCarriage(data.Tag, data.Carriage)
|
||||||
_ = s.client.WriteFrame(frm)
|
s.client.WriteFrame(frame)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.client.Connect(context.Background(), s.emitterEndpoint)
|
err := s.client.Connect(context.Background(), s.zipperEndpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.client.Logger().Errorf("%s Connect() error: %s", streamFunctionLogPrefix, err)
|
s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,7 @@ func (s *streamFunction) Close() error {
|
||||||
|
|
||||||
if s.client != nil {
|
if s.client != nil {
|
||||||
if err := s.client.Close(); err != nil {
|
if err := s.client.Close(); err != nil {
|
||||||
s.client.Logger().Errorf("%s Close(): %v", err)
|
s.client.Logger().Errorf("%sClose(): %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -140,38 +140,38 @@ func (s *streamFunction) Close() error {
|
||||||
|
|
||||||
// when DataFrame we observed arrived, invoke the user's function
|
// when DataFrame we observed arrived, invoke the user's function
|
||||||
func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
|
func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
|
||||||
s.client.Logger().Infof("%s onDataFrame ->[%s]", streamFunctionLogPrefix, s.name)
|
s.client.Logger().Infof("%sonDataFrame ->[%s]", streamFunctionLogPrefix, s.name)
|
||||||
|
|
||||||
if s.fn != nil {
|
if s.fn != nil {
|
||||||
go func() {
|
go func() {
|
||||||
s.client.Logger().Debugf("%s execute-start fn: data[%d]", streamFunctionLogPrefix, len(data)) //, frame.Shortly(data)
|
s.client.Logger().Debugf("%sexecute-start fn: data[%d]=%# x", streamFunctionLogPrefix, len(data), frame.Shortly(data))
|
||||||
// invoke serverless
|
// invoke serverless
|
||||||
tag, resp := s.fn(data)
|
tag, resp := s.fn(data)
|
||||||
s.client.Logger().Debugf("%s execute-done fn: tag=%#x, resp[%d]", streamFunctionLogPrefix, tag, len(resp))
|
s.client.Logger().Debugf("%sexecute-done fn: tag=%#x, resp[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp))
|
||||||
// if resp is not nil, means the user's function has returned something, we should send it to the mq
|
// if resp is not nil, means the user's function has returned something, we should send it to the mq
|
||||||
if len(resp) != 0 {
|
if len(resp) != 0 {
|
||||||
s.client.Logger().Debugf("%s start WriteFrame(): tag=%#x, data[%d]", streamFunctionLogPrefix, tag, len(resp))
|
s.client.Logger().Debugf("%sstart WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp))
|
||||||
// build a DataFrame
|
// build a DataFrame
|
||||||
// TODO: seems we should implement a DeepCopy() of MetaFrame in the future
|
// TODO: seems we should implement a DeepCopy() of MetaFrame in the future
|
||||||
frm := frame.NewDataFrame()
|
frame := frame.NewDataFrame()
|
||||||
// reuse transactionId
|
// reuse transactionId
|
||||||
frm.SetTransactionId(metaFrame.TransactionId())
|
frame.SetTransactionId(metaFrame.TransactionId())
|
||||||
// reuse sourceId
|
// reuse sourceId
|
||||||
frm.SetSourceId(metaFrame.SourceId())
|
frame.SetSourceId(metaFrame.SourceId())
|
||||||
frm.SetCarriage(tag, resp)
|
frame.SetCarriage(tag, resp)
|
||||||
_ = s.client.WriteFrame(frm)
|
s.client.WriteFrame(frame)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
} else if s.pfn != nil {
|
} else if s.pfn != nil {
|
||||||
s.client.Logger().Debugf("%s pipe fn receive: data[%d]", streamFunctionLogPrefix, len(data))
|
s.client.Logger().Debugf("%spipe fn receive: data[%d]=%# x", streamFunctionLogPrefix, len(data), data)
|
||||||
s.pIn <- data
|
s.pIn <- data
|
||||||
} else {
|
} else {
|
||||||
s.client.Logger().Warnf("%s StreamFunction is nil", streamFunctionLogPrefix)
|
s.client.Logger().Warnf("%sStreamFunction is nil", streamFunctionLogPrefix)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a DataFrame to mq.
|
// Send a DataFrame to mq.
|
||||||
func (s *streamFunction) Write(tag frame.Tag, carriage []byte) error {
|
func (s *streamFunction) Write(tag byte, carriage []byte) error {
|
||||||
fm := frame.NewDataFrame()
|
fm := frame.NewDataFrame()
|
||||||
fm.SetCarriage(tag, carriage)
|
fm.SetCarriage(tag, carriage)
|
||||||
return s.client.WriteFrame(fm)
|
return s.client.WriteFrame(fm)
|
||||||
|
|
Loading…
Reference in New Issue