commit a5e67c8e8d702e0150a40f6a122652ec9434979a Author: wangjian Date: Wed Oct 12 11:55:36 2022 +0800 init diff --git a/ap.go b/ap.go new file mode 100644 index 0000000..0da8358 --- /dev/null +++ b/ap.go @@ -0,0 +1,109 @@ +package hpds_node + +import ( + "context" + "git.hpds.cc/Component/network" + "git.hpds.cc/Component/network/frame" +) + +const ( + apLogPrefix = "\033[32m[hpds:access_point]\033[0m " +) + +// AccessPoint is responsible for sending data to hpds. +type AccessPoint interface { + // Close will close the connection to HPDS-Mq. + Close() error + // Connect to HPDS-Mq. + Connect() error + // SetDataTag will set the tag of data when invoking Write(). + SetDataTag(tag uint8) + // Write the data to downstream. + Write(p []byte) (n int, err error) + // WriteWithTag will write data with specified tag, default transactionId is epoch time. + WriteWithTag(tag uint8, data []byte) error + // SetErrorHandler set the error handler function when server error occurs + SetErrorHandler(fn func(err error)) + // SetReceiveHandler [Experimental] set to observe handler function + SetReceiveHandler(fn func(tag byte, data []byte)) +} + +// hpds-AccessPoint +type accessPoint struct { + name string + mqEndpoint string + client *network.Client + tag uint8 + fn func(byte, []byte) +} + +var _ AccessPoint = &accessPoint{} + +// NewAccessPoint create a hpds-AccessPoint +func NewAccessPoint(name string, opts ...Option) AccessPoint { + options := NewOptions(opts...) + client := network.NewClient(name, network.ClientTypeProtocolGateway, options.ClientOptions...) + + return &accessPoint{ + name: name, + mqEndpoint: options.MqAddr, + client: client, + } +} + +// Write the data to downstream. +func (s *accessPoint) Write(data []byte) (int, error) { + return len(data), s.WriteWithTag(s.tag, data) +} + +// SetDataTag will set the tag of data when invoking Write(). +func (s *accessPoint) SetDataTag(tag uint8) { + s.tag = tag +} + +// Close will close the connection to YoMo-MessageQueue. +func (s *accessPoint) Close() error { + if err := s.client.Close(); err != nil { + s.client.Logger().Errorf("%sClose(): %v", apLogPrefix, err) + return err + } + s.client.Logger().Debugf("%s is closed", apLogPrefix) + return nil +} + +// Connect to YoMo-MessageQueue. +func (s *accessPoint) Connect() error { + // set backFlowFrame handler + s.client.SetBackFlowFrameObserver(func(frm *frame.BackFlowFrame) { + if s.fn != nil { + s.fn(frm.GetDataTag(), frm.GetCarriage()) + } + }) + + err := s.client.Connect(context.Background(), s.mqEndpoint) + if err != nil { + s.client.Logger().Errorf("%sConnect() error: %s", apLogPrefix, err) + } + return err +} + +// WriteWithTag will write data with specified tag, default transactionID is epoch time. +func (s *accessPoint) WriteWithTag(tag uint8, data []byte) error { + f := frame.NewDataFrame() + f.SetCarriage(byte(tag), data) + f.SetSourceId(s.client.ClientId()) + s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x", + apLogPrefix, f.TransactionId(), f.SourceId(), len(data), frame.Shortly(data)) + return s.client.WriteFrame(f) +} + +// SetErrorHandler set the error handler function when server error occurs +func (s *accessPoint) SetErrorHandler(fn func(err error)) { + s.client.SetErrorHandler(fn) +} + +// SetReceiveHandler [Experimental] set to observe handler function +func (s *accessPoint) SetReceiveHandler(fn func(byte, []byte)) { + s.fn = fn + s.client.Logger().Debugf("%sSetReceiveHandler(%v)", apLogPrefix, s.fn) +} diff --git a/ap_test.go b/ap_test.go new file mode 100644 index 0000000..75055e5 --- /dev/null +++ b/ap_test.go @@ -0,0 +1,20 @@ +package hpds_node + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestAccessPointSendDataToServer(t *testing.T) { + ap := NewAccessPoint("test-ap") + defer ap.Close() + + // connect to server + err := ap.Connect() + assert.Nil(t, err) + + // send data to server + n, err := ap.Write([]byte("test")) + assert.Greater(t, n, 0, "[ap.Write] expected n > 0, but got %d", n) + assert.Nil(t, err) +} diff --git a/config/mesh.go b/config/mesh.go new file mode 100644 index 0000000..f677672 --- /dev/null +++ b/config/mesh.go @@ -0,0 +1,9 @@ +package config + +// MeshMessageQueue describes mesh configurations. +type MeshMessageQueue struct { + Name string `json:"name"` + Host string `json:"host"` + Port int `json:"port"` + Credential string `json:"credential,omitempty"` +} diff --git a/config/workflow.go b/config/workflow.go new file mode 100644 index 0000000..1bf8ed6 --- /dev/null +++ b/config/workflow.go @@ -0,0 +1,108 @@ +package config + +import ( + "errors" + "os" + "path/filepath" + "strings" + + "gopkg.in/yaml.v3" +) + +// App represents a HPDS Application. +type App struct { + Name string `yaml:"name"` +} + +// Workflow represents a HPDS Workflow. +type Workflow struct { + Functions []App `yaml:"functions"` +} + +// WorkflowConfig represents a HPDS Workflow config. +type WorkflowConfig struct { + // Name represents the name of the zipper. + Name string `yaml:"name"` + // Host represents the listening host of the zipper. + Host string `yaml:"host"` + // Port represents the listening port of the zipper. + Port int `yaml:"port"` + // Workflow represents the sfn workflow. + Workflow `yaml:",inline"` +} + +// ErrWorkflowConfigExt represents the extension of workflow config is incorrect. +var ErrWorkflowConfigExt = errors.New(`workflow: the extension of workflow config is incorrect, it should ".yaml|.yml"`) + +// LoadWorkflowConfig the WorkflowConfig by path. +func LoadWorkflowConfig(path string) (*WorkflowConfig, error) { + buffer, err := os.ReadFile(path) + if err != nil { + return nil, err + } + return load(buffer) +} + +func load(data []byte) (*WorkflowConfig, error) { + var config = &WorkflowConfig{} + err := yaml.Unmarshal(data, config) + if err != nil { + return nil, err + } + return config, nil +} + +// ParseWorkflowConfig parses the config. +func ParseWorkflowConfig(config string) (*WorkflowConfig, error) { + if ext := filepath.Ext(config); ext != ".yaml" && ext != ".yml" { + return nil, ErrWorkflowConfigExt + } + + // parse workflow.yaml + wfConf, err := LoadWorkflowConfig(config) + if err != nil { + return nil, err + } + + // validate + err = validateWorkflowConfig(wfConf) + if err != nil { + return nil, err + } + + return wfConf, nil +} + +func validateWorkflowConfig(wfConf *WorkflowConfig) error { + if wfConf == nil { + return errors.New("workflow: config nil") + } + + m := map[string][]App{ + "Functions": wfConf.Functions, + } + + missingParams := []string{} + for k, apps := range m { + for _, app := range apps { + if app.Name == "" { + missingParams = append(missingParams, k) + } + } + } + + errMsg := "" + if wfConf.Name == "" || wfConf.Host == "" || wfConf.Port <= 0 { + errMsg = "Missing name, host or port in workflow config. " + } + + if len(missingParams) > 0 { + errMsg += "Missing name, host or port in " + strings.Join(missingParams, ", "+". ") + } + + if errMsg != "" { + return errors.New(errMsg) + } + + return nil +} diff --git a/example/multi-mq/mock_ap/main.go b/example/multi-mq/mock_ap/main.go new file mode 100644 index 0000000..0bca70a --- /dev/null +++ b/example/multi-mq/mock_ap/main.go @@ -0,0 +1,52 @@ +package main + +import ( + "fmt" + "git.hpds.cc/Component/network/log" + "git.hpds.cc/pavement/hpds_node" + "math/rand" + "os" + "time" +) + +func main() { + // connect to HPDS-MQ. + source := hpds_node.NewAccessPoint( + "hpds-ap", + hpds_node.WithMqAddr("localhost:27188"), + hpds_node.WithCredential("token:z1"), + ) + err := source.Connect() + if err != nil { + log.Printf("[AccessPoint] Emit the data to HPDS-MQ failure with err: %v", err) + return + } + defer source.Close() + + source.SetDataTag(0x33) + + // generate mock data and send it to HPDS-MQ. + err = generateAndSendData(source) + if err != nil { + log.Printf("[AccessPoint] >>>> ERR >>>> %v", err) + os.Exit(1) + } + select {} +} + +func generateAndSendData(stream hpds_node.AccessPoint) error { + for { + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + data := []byte(fmt.Sprintf("%d", rnd.Uint32())) + // send data via QUIC stream. + _, err := stream.Write(data) + if err != nil { + log.Printf("[AccessPoint] Emit %v to HPDS-MQ failure with err: %v", data, err) + time.Sleep(500 * time.Millisecond) + continue + } + + log.Printf("[AccessPoint] Emit %s to HPDS-MQ", data) + time.Sleep(1000 * time.Millisecond) + } +} diff --git a/example/multi-mq/mq1/main.go b/example/multi-mq/mq1/main.go new file mode 100644 index 0000000..7e8a8ab --- /dev/null +++ b/example/multi-mq/mq1/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "git.hpds.cc/Component/network/log" + "git.hpds.cc/pavement/hpds_node" + "os" +) + +func main() { + mq, err := hpds_node.NewMq("mq_1.yaml") + if err != nil { + panic(err) + } + mq.InitOptions(hpds_node.WithAuth("token", "z1")) + defer mq.Close() + + // add Downstream mq + mq.AddDownstreamMq(hpds_node.NewDownstreamMq( + "mq-2", + hpds_node.WithMqAddr("localhost:27187"), + hpds_node.WithCredential("token:z2"), + )) + + // start mq service + log.Printf("Server has started!, pid: %d", os.Getpid()) + go func() { + err := mq.ListenAndServe() + if err != nil { + panic(err) + } + }() + select {} +} diff --git a/example/multi-mq/mq1/mq_1.yaml b/example/multi-mq/mq1/mq_1.yaml new file mode 100644 index 0000000..d983b05 --- /dev/null +++ b/example/multi-mq/mq1/mq_1.yaml @@ -0,0 +1,5 @@ +name: mq-1 +host: 0.0.0.0 +port: 27188 +functions: + - name: echo-sf diff --git a/example/multi-mq/mq2/main.go b/example/multi-mq/mq2/main.go new file mode 100644 index 0000000..a94b19b --- /dev/null +++ b/example/multi-mq/mq2/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "git.hpds.cc/Component/network/log" + "git.hpds.cc/pavement/hpds_node" + "os" +) + +func main() { + mq := hpds_node.NewMqWithOptions( + "mq-2", + hpds_node.WithMqAddr("localhost:27187"), + hpds_node.WithAuth("token", "z2"), + ) + defer mq.Close() + + mq.ConfigWorkflow("mq_2.yaml") + + // start mq service + log.Printf("Server has started!, pid: %d", os.Getpid()) + go func() { + err := mq.ListenAndServe() + if err != nil { + panic(err) + } + }() + select {} +} diff --git a/example/multi-mq/mq2/mq_2.yaml b/example/multi-mq/mq2/mq_2.yaml new file mode 100644 index 0000000..e9dc984 --- /dev/null +++ b/example/multi-mq/mq2/mq_2.yaml @@ -0,0 +1,5 @@ +name: mq-2 +host: 0.0.0.0 +port: 27188 +functions: + - name: echo-sf diff --git a/example/multi-mq/sf/main.go b/example/multi-mq/sf/main.go new file mode 100644 index 0000000..f70ff4a --- /dev/null +++ b/example/multi-mq/sf/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "git.hpds.cc/pavement/hpds_node" + "log" + "os" +) + +func main() { + sf := hpds_node.NewStreamFunction( + "echo-sf", + hpds_node.WithMqAddr("localhost:27187"), + hpds_node.WithObserveDataTags(0x33), + hpds_node.WithCredential("token:z2"), + ) + defer sf.Close() + + // set handler + sf.SetHandler(handler) + + // start + err := sf.Connect() + if err != nil { + log.Fatalf("[sf] connect err=%v", err) + os.Exit(1) + } + + select {} +} + +func handler(data []byte) (byte, []byte) { + val := string(data) + log.Printf(">> [streamFunction] got tag=0x33, data=%s", val) + return 0x0, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..973ecc4 --- /dev/null +++ b/go.mod @@ -0,0 +1,38 @@ +module git.hpds.cc/pavement/hpds_node + +go 1.19 + +require ( + git.hpds.cc/Component/network v0.0.0-20221012021659-2433c68452d5 + github.com/lucas-clemente/quic-go v0.29.1 + github.com/stretchr/testify v1.8.0 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect + github.com/golang/mock v1.6.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect + github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect + github.com/matoous/go-nanoid/v2 v2.0.0 // indirect + github.com/nxadm/tail v1.4.8 // indirect + github.com/onsi/ginkgo v1.16.4 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rogpeppe/go-internal v1.9.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.23.0 // indirect + golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect + golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect + golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect + golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/tools v0.1.10 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect +) diff --git a/metadata.go b/metadata.go new file mode 100644 index 0000000..7027987 --- /dev/null +++ b/metadata.go @@ -0,0 +1,30 @@ +package hpds_node + +import ( + "git.hpds.cc/Component/network" + "git.hpds.cc/Component/network/frame" +) + +type metadata struct{} + +func (m *metadata) Encode() []byte { + return nil +} + +type metadataBuilder struct { + m *metadata +} + +func newMetadataBuilder() network.MetadataBuilder { + return &metadataBuilder{ + m: &metadata{}, + } +} + +func (builder *metadataBuilder) Build(f *frame.HandshakeFrame) (network.Metadata, error) { + return builder.m, nil +} + +func (builder *metadataBuilder) Decode(buf []byte) (network.Metadata, error) { + return builder.m, nil +} diff --git a/mq.go b/mq.go new file mode 100644 index 0000000..27255d6 --- /dev/null +++ b/mq.go @@ -0,0 +1,272 @@ +package hpds_node + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "git.hpds.cc/Component/network" + "git.hpds.cc/Component/network/log" + "git.hpds.cc/pavement/hpds_node/config" +) + +const ( + mqLogPrefix = "\033[33m[hpds:mq]\033[0m " +) + +// MessageQueue is the orchestrator of hpds. There are two types of mq: +// one is Upstream MessageQueue, which is used to connect to multiple downstream mq, +// another one is Downstream MessageQueue (will call it as MessageQueue directly), which is used +// to connected by `Message Queue`, `Access Point` and `Stream Function` +type MessageQueue interface { + // ConfigWorkflow will register workflows from config files to messageQueue. + ConfigWorkflow(conf string) error + + // ConfigMesh will register edge-mesh config URL + ConfigMesh(url string) error + + // ListenAndServe start mq as server. + ListenAndServe() error + + // AddDownstreamMq will add downstream mq. + AddDownstreamMq(downstream MessageQueue) error + + // Addr returns the listen address of mq. + Addr() string + + // Stats return insight data + Stats() int + + // Close will close the mq. + Close() error + + // InitOptions initialize options + InitOptions(opts ...Option) + + // ReadConfigFile(conf string) error + // AddWorkflow(wf ...core.Workflow) error + // ConfigDownstream(opts ...interface{}) error + // Connect() error + // RemoveDownstreamMq(downstream MessageQueue) error + // ListenAddr() string +} + +// messageQueue is the implementation of MessageQueue interface. +type messageQueue struct { + name string + addr string + hasDownStreams bool + server *network.Server + client *network.Client + downstreamMqs []MessageQueue + wfc *config.WorkflowConfig +} + +var _ MessageQueue = &messageQueue{} + +// NewMqWithOptions create a messageQueue instance. +func NewMqWithOptions(name string, opts ...Option) MessageQueue { + options := NewOptions(opts...) + zipper := createMessageQueueServer(name, options, nil) + zipper.ConfigMesh(options.MeshConfigURL) + + return zipper +} + +// NewMq create a messageQueue instance from config files. +func NewMq(conf string) (MessageQueue, error) { + config, err := config.ParseWorkflowConfig(conf) + if err != nil { + log.Errorf("%s[ERR] %v", mqLogPrefix, err) + return nil, err + } + // listening address + listenAddr := fmt.Sprintf("%s:%d", config.Host, config.Port) + + options := NewOptions() + options.MqAddr = listenAddr + zipper := createMessageQueueServer(config.Name, options, config) + // messageQueue workflow + err = zipper.configWorkflow(config) + + return zipper, err +} + +// NewDownstreamMq create a messageQueue descriptor for downstream messageQueue. +func NewDownstreamMq(name string, opts ...Option) MessageQueue { + options := NewOptions(opts...) + client := network.NewClient(name, network.ClientTypeMessageQueue, options.ClientOptions...) + + return &messageQueue{ + name: name, + addr: options.MqAddr, + client: client, + } +} + +/*************** Server ONLY ***************/ +// createMessageQueueServer create a messageQueue instance as server. +func createMessageQueueServer(name string, options *Options, cfg *config.WorkflowConfig) *messageQueue { + // create underlying QUIC server + srv := network.NewServer(name, options.ServerOptions...) + z := &messageQueue{ + server: srv, + name: name, + addr: options.MqAddr, + wfc: cfg, + } + // initialize + z.init() + return z +} + +// ConfigWorkflow will read workflows from config files and register them to messageQueue. +func (z *messageQueue) ConfigWorkflow(conf string) error { + cfg, err := config.ParseWorkflowConfig(conf) + if err != nil { + log.Errorf("%s[ERR] %v", mqLogPrefix, err) + return err + } + log.Debugf("%sConfigWorkflow cfg=%+v", mqLogPrefix, cfg) + return z.configWorkflow(cfg) +} + +func (z *messageQueue) configWorkflow(config *config.WorkflowConfig) error { + z.wfc = config + z.server.ConfigMetadataBuilder(newMetadataBuilder()) + z.server.ConfigRouter(newRouter(config.Functions)) + return nil +} + +func (z *messageQueue) ConfigMesh(url string) error { + if url == "" { + return nil + } + + log.Printf("%sDownloading mesh config...", mqLogPrefix) + // download mesh conf + res, err := http.Get(url) + if err != nil { + return err + } + defer res.Body.Close() + + decoder := json.NewDecoder(res.Body) + var configs []config.MeshMessageQueue + err = decoder.Decode(&configs) + if err != nil { + log.Errorf("%s downloaded the Mesh config with err=%v", mqLogPrefix, err) + return err + } + + log.Printf("%s Successfully downloaded the Mesh config. ", mqLogPrefix) + + if len(configs) == 0 { + return nil + } + + for _, downstream := range configs { + if downstream.Name == z.name { + continue + } + addr := fmt.Sprintf("%s:%d", downstream.Host, downstream.Port) + opts := []Option{WithMqAddr(addr)} + if downstream.Credential != "" { + opts = append(opts, WithCredential(downstream.Credential)) + } + _ = z.AddDownstreamMq(NewDownstreamMq(downstream.Name, opts...)) + } + + return nil +} + +// ListenAndServe will start messageQueue service. +func (z *messageQueue) ListenAndServe() error { + log.Debugf("%sCreating MessageQueue Server ...", mqLogPrefix) + // check downstream zippers + for _, ds := range z.downstreamMqs { + if dsZipper, ok := ds.(*messageQueue); ok { + go func(dsZipper *messageQueue) { + _ = dsZipper.client.Connect(context.Background(), dsZipper.addr) + z.server.AddDownstreamServer(dsZipper.addr, dsZipper.client) + }(dsZipper) + } + } + return z.server.ListenAndServe(context.Background(), z.addr) +} + +// AddDownstreamMq will add downstream messageQueue. +func (z *messageQueue) AddDownstreamMq(downstream MessageQueue) error { + log.Debugf("%sAddDownStreamMq: %v", mqLogPrefix, downstream) + z.downstreamMqs = append(z.downstreamMqs, downstream) + z.hasDownStreams = true + log.Debugf("%scurrent downStreams: %d", mqLogPrefix, len(z.downstreamMqs)) + return nil +} + +// RemoveDownstreamMq remove downstream messageQueue. +func (z *messageQueue) RemoveDownstreamMq(downstream MessageQueue) error { + index := -1 + for i, v := range z.downstreamMqs { + if v.Addr() == downstream.Addr() { + index = i + break + } + } + + // remove from slice + z.downstreamMqs = append(z.downstreamMqs[:index], z.downstreamMqs[index+1:]...) + return nil +} + +// Addr returns listen address of messageQueue. +func (z *messageQueue) Addr() string { + return z.addr +} + +// Close will close a connection. If messageQueue is Server, close the server. If messageQueue is Client, close the client. +func (z *messageQueue) Close() error { + if z.server != nil { + if err := z.server.Close(); err != nil { + log.Errorf("%s Close(): %v", mqLogPrefix, err) + return err + } + } + if z.client != nil { + if err := z.client.Close(); err != nil { + log.Errorf("%s Close(): %v", mqLogPrefix, err) + return err + } + } + return nil +} + +// Stats inspects current server. +func (z *messageQueue) Stats() int { + log.Printf("[%s] all connections: %d", z.name, len(z.server.StatsFunctions())) + for connID, name := range z.server.StatsFunctions() { + log.Printf("[%s] -> ConnId=%s, Name=%s", z.name, connID, name) + } + + log.Printf("[%s] all downstream mq connected: %d", z.name, len(z.server.DownStreams())) + for k, v := range z.server.DownStreams() { + log.Printf("[%s] |> [%s] %s", z.name, k, v.ServerAddr()) + } + + log.Printf("[%s] total DataFrames received: %d", z.name, z.server.StatsCounter()) + + return len(z.server.StatsFunctions()) +} + +func (z *messageQueue) InitOptions(opts ...Option) { + options := &Options{MqAddr: z.addr} + for _, o := range opts { + o(options) + } + srv := network.NewServer(z.name, options.ServerOptions...) + z.server = srv + if z.wfc != nil { + _ = z.configWorkflow(z.wfc) + } +} diff --git a/mq_notwindows.go b/mq_notwindows.go new file mode 100644 index 0000000..4d7307f --- /dev/null +++ b/mq_notwindows.go @@ -0,0 +1,39 @@ +//go:build !windows +// +build !windows + +package hpds_node + +import ( + "fmt" + "os" + "os/signal" + "runtime" + "syscall" + + "git.hpds.cc/Component/network/log" +) + +// initialize when mq running as server. support inspection: +// - `kill -SIGUSR1 ` inspect state() +// - `kill -SIGTERM ` graceful shutdown +// - `kill -SIGUSR2 ` inspect golang GC +func (z *messageQueue) init() { + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGUSR1, syscall.SIGINT) + log.Printf("%sListening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...", mqLogPrefix) + for p1 := range c { + log.Printf("Received signal: %s", p1) + if p1 == syscall.SIGTERM || p1 == syscall.SIGINT { + log.Printf("graceful shutting down ... %s", p1) + os.Exit(0) + } else if p1 == syscall.SIGUSR2 { + var m runtime.MemStats + runtime.ReadMemStats(&m) + fmt.Printf("\tNumGC = %v\n", m.NumGC) + } else if p1 == syscall.SIGUSR1 { + log.Printf("print MessageQueue stats(): %d", z.Stats()) + } + } + }() +} diff --git a/mq_test.go b/mq_test.go new file mode 100644 index 0000000..ca3c10d --- /dev/null +++ b/mq_test.go @@ -0,0 +1,16 @@ +package hpds_node + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestMqRun(t *testing.T) { + mq := NewMqWithOptions("mq", WithMqAddr("localhost:9001")) + time.Sleep(time.Second) + assert.NotNil(t, mq) + err := mq.Close() + time.Sleep(time.Second) + assert.Nil(t, err) +} diff --git a/mq_windows.go b/mq_windows.go new file mode 100644 index 0000000..171de41 --- /dev/null +++ b/mq_windows.go @@ -0,0 +1,25 @@ +//go:build windows +// +build windows + +package hpds_node + +import ( + "git.hpds.cc/Component/network/log" +) + +// initialize when mq running as server. support inspection: +// - `kill -SIGTERM ` graceful shutdown +func (z *messageQueue) init() { + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) + log.Printf("%sListening SIGTERM/SIGINT...", mqLogPrefix) + for p1 := range c { + log.Printf("Received signal: %s", p1) + if p1 == syscall.SIGTERM || p1 == syscall.SIGINT { + log.Printf("graceful shutting down ... %s", p1) + os.Exit(0) + } + } + }() +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..24be4ca --- /dev/null +++ b/options.go @@ -0,0 +1,136 @@ +package hpds_node + +import ( + "crypto/tls" + "github.com/lucas-clemente/quic-go" + + "git.hpds.cc/Component/network" + "git.hpds.cc/Component/network/log" +) + +const ( + // DefaultMqAddr is the default address of downstream mq. + DefaultMqAddr = "localhost:9000" +) + +// Option is a function that applies a hpds-Client option. +type Option func(o *Options) + +// Options are the options for YoMo +type Options struct { + MqAddr string // target mq endpoint address + // MqListenAddr string // mq endpoint address + MqWorkflowConfig string // mq workflow file + MeshConfigURL string // meshConfigURL is the URL of edge-mesh config + ServerOptions []network.ServerOption + ClientOptions []network.ClientOption + QuicConfig *quic.Config + TLSConfig *tls.Config + Logger log.Logger +} + +// WithMqAddr return a new options with MqAddr set to addr. +func WithMqAddr(addr string) Option { + return func(o *Options) { + o.MqAddr = addr + } +} + +// // WithMqListenAddr return a new options with MqListenAddr set to addr. +// func WithMqListenAddr(addr string) Option { +// return func(o *options) { +// o.MqListenAddr = addr +// } +// } + +// TODO: WithWorkflowConfig + +// WithMeshConfigURL sets the initial edge-mesh config URL for the YoMo-MessageQueue. +func WithMeshConfigURL(url string) Option { + return func(o *Options) { + o.MeshConfigURL = url + } +} + +// WithTLSConfig sets the TLS configuration for the client. +func WithTLSConfig(tc *tls.Config) Option { + return func(o *Options) { + o.TLSConfig = tc + } +} + +// WithQuicConfig sets the QUIC configuration for the client. +func WithQuicConfig(qc *quic.Config) Option { + return func(o *Options) { + o.QuicConfig = qc + } +} + +// WithClientOptions returns a new options with opts. +func WithClientOptions(opts ...network.ClientOption) Option { + return func(o *Options) { + o.ClientOptions = opts + } +} + +// WithServerOptions returns a new options with opts. +func WithServerOptions(opts ...network.ServerOption) Option { + return func(o *Options) { + o.ServerOptions = opts + } +} + +// WithAuth sets the server authentication method (used by server) +func WithAuth(name string, args ...string) Option { + return func(o *Options) { + o.ServerOptions = append( + o.ServerOptions, + network.WithAuth(name, args...), + ) + } +} + +// WithCredential sets the client credential method (used by client) +func WithCredential(payload string) Option { + return func(o *Options) { + o.ClientOptions = append( + o.ClientOptions, + network.WithCredential(payload), + ) + } +} + +// WithObserveDataTags sets client data tag list. +func WithObserveDataTags(tags ...byte) Option { + return func(o *Options) { + o.ClientOptions = append( + o.ClientOptions, + network.WithObserveDataTags(tags...), + ) + } +} + +// WithLogger sets the client logger +func WithLogger(logger log.Logger) Option { + return func(o *Options) { + o.ClientOptions = append( + o.ClientOptions, + network.WithLogger(logger), + ) + } +} + +// NewOptions creates a new options for YoMo-Client. +func NewOptions(opts ...Option) *Options { + options := &Options{} + + for _, o := range opts { + o(options) + } + + if options.MqAddr == "" { + options.MqAddr = DefaultMqAddr + } + + return options +} diff --git a/router.go b/router.go new file mode 100644 index 0000000..585ba07 --- /dev/null +++ b/router.go @@ -0,0 +1,100 @@ +package hpds_node + +import ( + "fmt" + "git.hpds.cc/Component/network" + "git.hpds.cc/Component/network/hpds_err" + "git.hpds.cc/pavement/hpds_node/config" + "sync" +) + +type router struct { + r *route +} + +func newRouter(functions []config.App) network.Router { + return &router{r: newRoute(functions)} +} + +func (r *router) Route(metadata network.Metadata) network.Route { + return r.r +} + +func (r *router) Clean() { + r.r = nil +} + +type route struct { + functions []config.App + data map[byte]map[string]string + mu sync.RWMutex +} + +func newRoute(functions []config.App) *route { + return &route{ + functions: functions, + data: make(map[byte]map[string]string), + } +} + +func (r *route) Add(connId string, name string, observeDataTags []byte) (err error) { + r.mu.Lock() + defer r.mu.Unlock() + + ok := false + for _, v := range r.functions { + if v.Name == name { + ok = true + break + } + } + if !ok { + return fmt.Errorf("SFN[%s] does not exist in config functions", name) + } + +LOOP: + for _, connects := range r.data { + for connKey, n := range connects { + if n == name { + err = hpds_err.NewDuplicateNameError(connKey, fmt.Errorf("node:Stream Function[%s] is already linked to another connection", name)) + delete(connects, connKey) + break LOOP + } + } + } + + for _, tag := range observeDataTags { + connects := r.data[tag] + if connects == nil { + connects = make(map[string]string) + r.data[tag] = connects + } + r.data[tag][connId] = name + } + + return err +} + +func (r *route) Remove(connId string) error { + r.mu.Lock() + defer r.mu.Unlock() + + for _, connects := range r.data { + delete(connects, connId) + } + + return nil +} + +func (r *route) GetForwardRoutes(tag byte) []string { + r.mu.RLock() + defer r.mu.RUnlock() + + var keys []string + if connects := r.data[tag]; connects != nil { + for k := range connects { + keys = append(keys, k) + } + } + return keys +} diff --git a/sf_test.go b/sf_test.go new file mode 100644 index 0000000..7347f9f --- /dev/null +++ b/sf_test.go @@ -0,0 +1,22 @@ +package hpds_node + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestSfConnectToServer(t *testing.T) { + sf := NewStreamFunction( + "test-sf", + WithMqAddr("localhost:9000"), + WithObserveDataTags(0x33), + ) + defer sf.Close() + + // set handler + _ = sf.SetHandler(nil) + + // connect to server + err := sf.Connect() + assert.Nil(t, err) +} diff --git a/stream_func.go b/stream_func.go new file mode 100644 index 0000000..59924ba --- /dev/null +++ b/stream_func.go @@ -0,0 +1,183 @@ +package hpds_node + +import ( + "context" + "git.hpds.cc/Component/network" + "git.hpds.cc/Component/network/frame" +) + +const ( + streamFunctionLogPrefix = "\033[31m[hpds:stream_function]\033[0m " +) + +// StreamFunction defines serverless streaming functions. +type StreamFunction interface { + // SetObserveDataTags set the data tag list that will be observed + // Deprecated: use hpds.WithObserveDataTags instead + SetObserveDataTags(tag ...byte) + // SetHandler set the handler function, which accept the raw bytes data and return the tag & response + SetHandler(fn network.AsyncHandler) error + // SetErrorHandler set the error handler function when server error occurs + SetErrorHandler(fn func(err error)) + // SetPipeHandler set the pipe handler function + SetPipeHandler(fn network.PipeHandler) error + // Connect create a connection to the messageQueue + Connect() error + // Close will close the connection + Close() error + // Write Send a data to mq. + Write(tag byte, carriage []byte) error +} + +// NewStreamFunction create a stream function. +func NewStreamFunction(name string, opts ...Option) StreamFunction { + options := NewOptions(opts...) + client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...) + sf := &streamFunction{ + name: name, + zipperEndpoint: options.MqAddr, + client: client, + observeDataTags: make([]byte, 0), + } + + return sf +} + +var _ StreamFunction = &streamFunction{} + +// streamFunction implements StreamFunction interface. +type streamFunction struct { + name string + zipperEndpoint string + client *network.Client + observeDataTags []byte // tag list that will be observed + fn network.AsyncHandler // user's function which will be invoked when data arrived + pfn network.PipeHandler + pIn chan []byte + pOut chan *frame.PayloadFrame +} + +// SetObserveDataTags set the data tag list that will be observed. +// Deprecated: use hpds.WithObserveDataTags instead +func (s *streamFunction) SetObserveDataTags(tag ...byte) { + s.client.SetObserveDataTags(tag...) + s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags) +} + +// SetHandler set the handler function, which accept the raw bytes data and return the tag & response. +func (s *streamFunction) SetHandler(fn network.AsyncHandler) error { + s.fn = fn + s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn) + return nil +} + +func (s *streamFunction) SetPipeHandler(fn network.PipeHandler) error { + s.pfn = fn + s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn) + return nil +} + +// Connect create a connection to the mq, when data arrived, the data will be passed to the +// handler which set by SetHandler method. +func (s *streamFunction) Connect() error { + s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix) + // notify underlying network operations, when data with tag we observed arrived, invoke the func + s.client.SetDataFrameObserver(func(data *frame.DataFrame) { + s.client.Logger().Debugf("%sreceive DataFrame, tag=%# x, carriage=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage()) + s.onDataFrame(data.GetCarriage(), data.GetMetaFrame()) + }) + + if s.pfn != nil { + s.pIn = make(chan []byte) + s.pOut = make(chan *frame.PayloadFrame) + + // handle user's pipe function + go func() { + s.pfn(s.pIn, s.pOut) + }() + + // send user's pipe function outputs to mq + go func() { + for { + data := <-s.pOut + if data != nil { + s.client.Logger().Debugf("%spipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage) + frame := frame.NewDataFrame() + // todo: frame.SetTransactionId + frame.SetCarriage(data.Tag, data.Carriage) + s.client.WriteFrame(frame) + } + } + }() + } + + err := s.client.Connect(context.Background(), s.zipperEndpoint) + if err != nil { + s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err) + } + return err +} + +// Close will close the connection. +func (s *streamFunction) Close() error { + if s.pIn != nil { + close(s.pIn) + } + + if s.pOut != nil { + close(s.pOut) + } + + if s.client != nil { + if err := s.client.Close(); err != nil { + s.client.Logger().Errorf("%sClose(): %v", err) + return err + } + } + + return nil +} + +// when DataFrame we observed arrived, invoke the user's function +func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { + s.client.Logger().Infof("%sonDataFrame ->[%s]", streamFunctionLogPrefix, s.name) + + if s.fn != nil { + go func() { + s.client.Logger().Debugf("%sexecute-start fn: data[%d]=%# x", streamFunctionLogPrefix, len(data), frame.Shortly(data)) + // invoke serverless + tag, resp := s.fn(data) + s.client.Logger().Debugf("%sexecute-done fn: tag=%#x, resp[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) + // if resp is not nil, means the user's function has returned something, we should send it to the mq + if len(resp) != 0 { + s.client.Logger().Debugf("%sstart WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) + // build a DataFrame + // TODO: seems we should implement a DeepCopy() of MetaFrame in the future + frame := frame.NewDataFrame() + // reuse transactionId + frame.SetTransactionId(metaFrame.TransactionId()) + // reuse sourceId + frame.SetSourceId(metaFrame.SourceId()) + frame.SetCarriage(tag, resp) + s.client.WriteFrame(frame) + } + }() + } else if s.pfn != nil { + s.client.Logger().Debugf("%spipe fn receive: data[%d]=%# x", streamFunctionLogPrefix, len(data), data) + s.pIn <- data + } else { + s.client.Logger().Warnf("%sStreamFunction is nil", streamFunctionLogPrefix) + } +} + +// Send a DataFrame to mq. +func (s *streamFunction) Write(tag byte, carriage []byte) error { + fm := frame.NewDataFrame() + fm.SetCarriage(tag, carriage) + return s.client.WriteFrame(fm) +} + +// SetErrorHandler set the error handler function when server error occurs +func (s *streamFunction) SetErrorHandler(fn func(err error)) { + s.client.SetErrorHandler(fn) +}