This commit is contained in:
wangjian 2022-10-12 11:55:36 +08:00
commit a5e67c8e8d
20 changed files with 1265 additions and 0 deletions

109
ap.go Normal file
View File

@ -0,0 +1,109 @@
package hpds_node
import (
"context"
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame"
)
const (
apLogPrefix = "\033[32m[hpds:access_point]\033[0m "
)
// AccessPoint is responsible for sending data to hpds.
type AccessPoint interface {
// Close will close the connection to HPDS-Mq.
Close() error
// Connect to HPDS-Mq.
Connect() error
// SetDataTag will set the tag of data when invoking Write().
SetDataTag(tag uint8)
// Write the data to downstream.
Write(p []byte) (n int, err error)
// WriteWithTag will write data with specified tag, default transactionId is epoch time.
WriteWithTag(tag uint8, data []byte) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
// SetReceiveHandler [Experimental] set to observe handler function
SetReceiveHandler(fn func(tag byte, data []byte))
}
// hpds-AccessPoint
type accessPoint struct {
name string
mqEndpoint string
client *network.Client
tag uint8
fn func(byte, []byte)
}
var _ AccessPoint = &accessPoint{}
// NewAccessPoint create a hpds-AccessPoint
func NewAccessPoint(name string, opts ...Option) AccessPoint {
options := NewOptions(opts...)
client := network.NewClient(name, network.ClientTypeProtocolGateway, options.ClientOptions...)
return &accessPoint{
name: name,
mqEndpoint: options.MqAddr,
client: client,
}
}
// Write the data to downstream.
func (s *accessPoint) Write(data []byte) (int, error) {
return len(data), s.WriteWithTag(s.tag, data)
}
// SetDataTag will set the tag of data when invoking Write().
func (s *accessPoint) SetDataTag(tag uint8) {
s.tag = tag
}
// Close will close the connection to YoMo-MessageQueue.
func (s *accessPoint) Close() error {
if err := s.client.Close(); err != nil {
s.client.Logger().Errorf("%sClose(): %v", apLogPrefix, err)
return err
}
s.client.Logger().Debugf("%s is closed", apLogPrefix)
return nil
}
// Connect to YoMo-MessageQueue.
func (s *accessPoint) Connect() error {
// set backFlowFrame handler
s.client.SetBackFlowFrameObserver(func(frm *frame.BackFlowFrame) {
if s.fn != nil {
s.fn(frm.GetDataTag(), frm.GetCarriage())
}
})
err := s.client.Connect(context.Background(), s.mqEndpoint)
if err != nil {
s.client.Logger().Errorf("%sConnect() error: %s", apLogPrefix, err)
}
return err
}
// WriteWithTag will write data with specified tag, default transactionID is epoch time.
func (s *accessPoint) WriteWithTag(tag uint8, data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(byte(tag), data)
f.SetSourceId(s.client.ClientId())
s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x",
apLogPrefix, f.TransactionId(), f.SourceId(), len(data), frame.Shortly(data))
return s.client.WriteFrame(f)
}
// SetErrorHandler set the error handler function when server error occurs
func (s *accessPoint) SetErrorHandler(fn func(err error)) {
s.client.SetErrorHandler(fn)
}
// SetReceiveHandler [Experimental] set to observe handler function
func (s *accessPoint) SetReceiveHandler(fn func(byte, []byte)) {
s.fn = fn
s.client.Logger().Debugf("%sSetReceiveHandler(%v)", apLogPrefix, s.fn)
}

20
ap_test.go Normal file
View File

@ -0,0 +1,20 @@
package hpds_node
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestAccessPointSendDataToServer(t *testing.T) {
ap := NewAccessPoint("test-ap")
defer ap.Close()
// connect to server
err := ap.Connect()
assert.Nil(t, err)
// send data to server
n, err := ap.Write([]byte("test"))
assert.Greater(t, n, 0, "[ap.Write] expected n > 0, but got %d", n)
assert.Nil(t, err)
}

9
config/mesh.go Normal file
View File

@ -0,0 +1,9 @@
package config
// MeshMessageQueue describes mesh configurations.
type MeshMessageQueue struct {
Name string `json:"name"`
Host string `json:"host"`
Port int `json:"port"`
Credential string `json:"credential,omitempty"`
}

108
config/workflow.go Normal file
View File

@ -0,0 +1,108 @@
package config
import (
"errors"
"os"
"path/filepath"
"strings"
"gopkg.in/yaml.v3"
)
// App represents a HPDS Application.
type App struct {
Name string `yaml:"name"`
}
// Workflow represents a HPDS Workflow.
type Workflow struct {
Functions []App `yaml:"functions"`
}
// WorkflowConfig represents a HPDS Workflow config.
type WorkflowConfig struct {
// Name represents the name of the zipper.
Name string `yaml:"name"`
// Host represents the listening host of the zipper.
Host string `yaml:"host"`
// Port represents the listening port of the zipper.
Port int `yaml:"port"`
// Workflow represents the sfn workflow.
Workflow `yaml:",inline"`
}
// ErrWorkflowConfigExt represents the extension of workflow config is incorrect.
var ErrWorkflowConfigExt = errors.New(`workflow: the extension of workflow config is incorrect, it should ".yaml|.yml"`)
// LoadWorkflowConfig the WorkflowConfig by path.
func LoadWorkflowConfig(path string) (*WorkflowConfig, error) {
buffer, err := os.ReadFile(path)
if err != nil {
return nil, err
}
return load(buffer)
}
func load(data []byte) (*WorkflowConfig, error) {
var config = &WorkflowConfig{}
err := yaml.Unmarshal(data, config)
if err != nil {
return nil, err
}
return config, nil
}
// ParseWorkflowConfig parses the config.
func ParseWorkflowConfig(config string) (*WorkflowConfig, error) {
if ext := filepath.Ext(config); ext != ".yaml" && ext != ".yml" {
return nil, ErrWorkflowConfigExt
}
// parse workflow.yaml
wfConf, err := LoadWorkflowConfig(config)
if err != nil {
return nil, err
}
// validate
err = validateWorkflowConfig(wfConf)
if err != nil {
return nil, err
}
return wfConf, nil
}
func validateWorkflowConfig(wfConf *WorkflowConfig) error {
if wfConf == nil {
return errors.New("workflow: config nil")
}
m := map[string][]App{
"Functions": wfConf.Functions,
}
missingParams := []string{}
for k, apps := range m {
for _, app := range apps {
if app.Name == "" {
missingParams = append(missingParams, k)
}
}
}
errMsg := ""
if wfConf.Name == "" || wfConf.Host == "" || wfConf.Port <= 0 {
errMsg = "Missing name, host or port in workflow config. "
}
if len(missingParams) > 0 {
errMsg += "Missing name, host or port in " + strings.Join(missingParams, ", "+". ")
}
if errMsg != "" {
return errors.New(errMsg)
}
return nil
}

View File

@ -0,0 +1,52 @@
package main
import (
"fmt"
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"math/rand"
"os"
"time"
)
func main() {
// connect to HPDS-MQ.
source := hpds_node.NewAccessPoint(
"hpds-ap",
hpds_node.WithMqAddr("localhost:27188"),
hpds_node.WithCredential("token:z1"),
)
err := source.Connect()
if err != nil {
log.Printf("[AccessPoint] Emit the data to HPDS-MQ failure with err: %v", err)
return
}
defer source.Close()
source.SetDataTag(0x33)
// generate mock data and send it to HPDS-MQ.
err = generateAndSendData(source)
if err != nil {
log.Printf("[AccessPoint] >>>> ERR >>>> %v", err)
os.Exit(1)
}
select {}
}
func generateAndSendData(stream hpds_node.AccessPoint) error {
for {
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
data := []byte(fmt.Sprintf("%d", rnd.Uint32()))
// send data via QUIC stream.
_, err := stream.Write(data)
if err != nil {
log.Printf("[AccessPoint] Emit %v to HPDS-MQ failure with err: %v", data, err)
time.Sleep(500 * time.Millisecond)
continue
}
log.Printf("[AccessPoint] Emit %s to HPDS-MQ", data)
time.Sleep(1000 * time.Millisecond)
}
}

View File

@ -0,0 +1,33 @@
package main
import (
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"os"
)
func main() {
mq, err := hpds_node.NewMq("mq_1.yaml")
if err != nil {
panic(err)
}
mq.InitOptions(hpds_node.WithAuth("token", "z1"))
defer mq.Close()
// add Downstream mq
mq.AddDownstreamMq(hpds_node.NewDownstreamMq(
"mq-2",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithCredential("token:z2"),
))
// start mq service
log.Printf("Server has started!, pid: %d", os.Getpid())
go func() {
err := mq.ListenAndServe()
if err != nil {
panic(err)
}
}()
select {}
}

View File

@ -0,0 +1,5 @@
name: mq-1
host: 0.0.0.0
port: 27188
functions:
- name: echo-sf

View File

@ -0,0 +1,28 @@
package main
import (
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"os"
)
func main() {
mq := hpds_node.NewMqWithOptions(
"mq-2",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithAuth("token", "z2"),
)
defer mq.Close()
mq.ConfigWorkflow("mq_2.yaml")
// start mq service
log.Printf("Server has started!, pid: %d", os.Getpid())
go func() {
err := mq.ListenAndServe()
if err != nil {
panic(err)
}
}()
select {}
}

View File

@ -0,0 +1,5 @@
name: mq-2
host: 0.0.0.0
port: 27188
functions:
- name: echo-sf

View File

@ -0,0 +1,35 @@
package main
import (
"git.hpds.cc/pavement/hpds_node"
"log"
"os"
)
func main() {
sf := hpds_node.NewStreamFunction(
"echo-sf",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithObserveDataTags(0x33),
hpds_node.WithCredential("token:z2"),
)
defer sf.Close()
// set handler
sf.SetHandler(handler)
// start
err := sf.Connect()
if err != nil {
log.Fatalf("[sf] connect err=%v", err)
os.Exit(1)
}
select {}
}
func handler(data []byte) (byte, []byte) {
val := string(data)
log.Printf(">> [streamFunction] got tag=0x33, data=%s", val)
return 0x0, nil
}

38
go.mod Normal file
View File

@ -0,0 +1,38 @@
module git.hpds.cc/pavement/hpds_node
go 1.19
require (
git.hpds.cc/Component/network v0.0.0-20221012021659-2433c68452d5
github.com/lucas-clemente/quic-go v0.29.1
github.com/stretchr/testify v1.8.0
gopkg.in/yaml.v3 v3.0.1
)
require (
git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect
github.com/matoous/go-nanoid/v2 v2.0.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
)

30
metadata.go Normal file
View File

@ -0,0 +1,30 @@
package hpds_node
import (
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame"
)
type metadata struct{}
func (m *metadata) Encode() []byte {
return nil
}
type metadataBuilder struct {
m *metadata
}
func newMetadataBuilder() network.MetadataBuilder {
return &metadataBuilder{
m: &metadata{},
}
}
func (builder *metadataBuilder) Build(f *frame.HandshakeFrame) (network.Metadata, error) {
return builder.m, nil
}
func (builder *metadataBuilder) Decode(buf []byte) (network.Metadata, error) {
return builder.m, nil
}

272
mq.go Normal file
View File

@ -0,0 +1,272 @@
package hpds_node
import (
"context"
"encoding/json"
"fmt"
"net/http"
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node/config"
)
const (
mqLogPrefix = "\033[33m[hpds:mq]\033[0m "
)
// MessageQueue is the orchestrator of hpds. There are two types of mq:
// one is Upstream MessageQueue, which is used to connect to multiple downstream mq,
// another one is Downstream MessageQueue (will call it as MessageQueue directly), which is used
// to connected by `Message Queue`, `Access Point` and `Stream Function`
type MessageQueue interface {
// ConfigWorkflow will register workflows from config files to messageQueue.
ConfigWorkflow(conf string) error
// ConfigMesh will register edge-mesh config URL
ConfigMesh(url string) error
// ListenAndServe start mq as server.
ListenAndServe() error
// AddDownstreamMq will add downstream mq.
AddDownstreamMq(downstream MessageQueue) error
// Addr returns the listen address of mq.
Addr() string
// Stats return insight data
Stats() int
// Close will close the mq.
Close() error
// InitOptions initialize options
InitOptions(opts ...Option)
// ReadConfigFile(conf string) error
// AddWorkflow(wf ...core.Workflow) error
// ConfigDownstream(opts ...interface{}) error
// Connect() error
// RemoveDownstreamMq(downstream MessageQueue) error
// ListenAddr() string
}
// messageQueue is the implementation of MessageQueue interface.
type messageQueue struct {
name string
addr string
hasDownStreams bool
server *network.Server
client *network.Client
downstreamMqs []MessageQueue
wfc *config.WorkflowConfig
}
var _ MessageQueue = &messageQueue{}
// NewMqWithOptions create a messageQueue instance.
func NewMqWithOptions(name string, opts ...Option) MessageQueue {
options := NewOptions(opts...)
zipper := createMessageQueueServer(name, options, nil)
zipper.ConfigMesh(options.MeshConfigURL)
return zipper
}
// NewMq create a messageQueue instance from config files.
func NewMq(conf string) (MessageQueue, error) {
config, err := config.ParseWorkflowConfig(conf)
if err != nil {
log.Errorf("%s[ERR] %v", mqLogPrefix, err)
return nil, err
}
// listening address
listenAddr := fmt.Sprintf("%s:%d", config.Host, config.Port)
options := NewOptions()
options.MqAddr = listenAddr
zipper := createMessageQueueServer(config.Name, options, config)
// messageQueue workflow
err = zipper.configWorkflow(config)
return zipper, err
}
// NewDownstreamMq create a messageQueue descriptor for downstream messageQueue.
func NewDownstreamMq(name string, opts ...Option) MessageQueue {
options := NewOptions(opts...)
client := network.NewClient(name, network.ClientTypeMessageQueue, options.ClientOptions...)
return &messageQueue{
name: name,
addr: options.MqAddr,
client: client,
}
}
/*************** Server ONLY ***************/
// createMessageQueueServer create a messageQueue instance as server.
func createMessageQueueServer(name string, options *Options, cfg *config.WorkflowConfig) *messageQueue {
// create underlying QUIC server
srv := network.NewServer(name, options.ServerOptions...)
z := &messageQueue{
server: srv,
name: name,
addr: options.MqAddr,
wfc: cfg,
}
// initialize
z.init()
return z
}
// ConfigWorkflow will read workflows from config files and register them to messageQueue.
func (z *messageQueue) ConfigWorkflow(conf string) error {
cfg, err := config.ParseWorkflowConfig(conf)
if err != nil {
log.Errorf("%s[ERR] %v", mqLogPrefix, err)
return err
}
log.Debugf("%sConfigWorkflow cfg=%+v", mqLogPrefix, cfg)
return z.configWorkflow(cfg)
}
func (z *messageQueue) configWorkflow(config *config.WorkflowConfig) error {
z.wfc = config
z.server.ConfigMetadataBuilder(newMetadataBuilder())
z.server.ConfigRouter(newRouter(config.Functions))
return nil
}
func (z *messageQueue) ConfigMesh(url string) error {
if url == "" {
return nil
}
log.Printf("%sDownloading mesh config...", mqLogPrefix)
// download mesh conf
res, err := http.Get(url)
if err != nil {
return err
}
defer res.Body.Close()
decoder := json.NewDecoder(res.Body)
var configs []config.MeshMessageQueue
err = decoder.Decode(&configs)
if err != nil {
log.Errorf("%s downloaded the Mesh config with err=%v", mqLogPrefix, err)
return err
}
log.Printf("%s Successfully downloaded the Mesh config. ", mqLogPrefix)
if len(configs) == 0 {
return nil
}
for _, downstream := range configs {
if downstream.Name == z.name {
continue
}
addr := fmt.Sprintf("%s:%d", downstream.Host, downstream.Port)
opts := []Option{WithMqAddr(addr)}
if downstream.Credential != "" {
opts = append(opts, WithCredential(downstream.Credential))
}
_ = z.AddDownstreamMq(NewDownstreamMq(downstream.Name, opts...))
}
return nil
}
// ListenAndServe will start messageQueue service.
func (z *messageQueue) ListenAndServe() error {
log.Debugf("%sCreating MessageQueue Server ...", mqLogPrefix)
// check downstream zippers
for _, ds := range z.downstreamMqs {
if dsZipper, ok := ds.(*messageQueue); ok {
go func(dsZipper *messageQueue) {
_ = dsZipper.client.Connect(context.Background(), dsZipper.addr)
z.server.AddDownstreamServer(dsZipper.addr, dsZipper.client)
}(dsZipper)
}
}
return z.server.ListenAndServe(context.Background(), z.addr)
}
// AddDownstreamMq will add downstream messageQueue.
func (z *messageQueue) AddDownstreamMq(downstream MessageQueue) error {
log.Debugf("%sAddDownStreamMq: %v", mqLogPrefix, downstream)
z.downstreamMqs = append(z.downstreamMqs, downstream)
z.hasDownStreams = true
log.Debugf("%scurrent downStreams: %d", mqLogPrefix, len(z.downstreamMqs))
return nil
}
// RemoveDownstreamMq remove downstream messageQueue.
func (z *messageQueue) RemoveDownstreamMq(downstream MessageQueue) error {
index := -1
for i, v := range z.downstreamMqs {
if v.Addr() == downstream.Addr() {
index = i
break
}
}
// remove from slice
z.downstreamMqs = append(z.downstreamMqs[:index], z.downstreamMqs[index+1:]...)
return nil
}
// Addr returns listen address of messageQueue.
func (z *messageQueue) Addr() string {
return z.addr
}
// Close will close a connection. If messageQueue is Server, close the server. If messageQueue is Client, close the client.
func (z *messageQueue) Close() error {
if z.server != nil {
if err := z.server.Close(); err != nil {
log.Errorf("%s Close(): %v", mqLogPrefix, err)
return err
}
}
if z.client != nil {
if err := z.client.Close(); err != nil {
log.Errorf("%s Close(): %v", mqLogPrefix, err)
return err
}
}
return nil
}
// Stats inspects current server.
func (z *messageQueue) Stats() int {
log.Printf("[%s] all connections: %d", z.name, len(z.server.StatsFunctions()))
for connID, name := range z.server.StatsFunctions() {
log.Printf("[%s] -> ConnId=%s, Name=%s", z.name, connID, name)
}
log.Printf("[%s] all downstream mq connected: %d", z.name, len(z.server.DownStreams()))
for k, v := range z.server.DownStreams() {
log.Printf("[%s] |> [%s] %s", z.name, k, v.ServerAddr())
}
log.Printf("[%s] total DataFrames received: %d", z.name, z.server.StatsCounter())
return len(z.server.StatsFunctions())
}
func (z *messageQueue) InitOptions(opts ...Option) {
options := &Options{MqAddr: z.addr}
for _, o := range opts {
o(options)
}
srv := network.NewServer(z.name, options.ServerOptions...)
z.server = srv
if z.wfc != nil {
_ = z.configWorkflow(z.wfc)
}
}

39
mq_notwindows.go Normal file
View File

@ -0,0 +1,39 @@
//go:build !windows
// +build !windows
package hpds_node
import (
"fmt"
"os"
"os/signal"
"runtime"
"syscall"
"git.hpds.cc/Component/network/log"
)
// initialize when mq running as server. support inspection:
// - `kill -SIGUSR1 <pid>` inspect state()
// - `kill -SIGTERM <pid>` graceful shutdown
// - `kill -SIGUSR2 <pid>` inspect golang GC
func (z *messageQueue) init() {
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGUSR1, syscall.SIGINT)
log.Printf("%sListening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...", mqLogPrefix)
for p1 := range c {
log.Printf("Received signal: %s", p1)
if p1 == syscall.SIGTERM || p1 == syscall.SIGINT {
log.Printf("graceful shutting down ... %s", p1)
os.Exit(0)
} else if p1 == syscall.SIGUSR2 {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("\tNumGC = %v\n", m.NumGC)
} else if p1 == syscall.SIGUSR1 {
log.Printf("print MessageQueue stats(): %d", z.Stats())
}
}
}()
}

16
mq_test.go Normal file
View File

@ -0,0 +1,16 @@
package hpds_node
import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestMqRun(t *testing.T) {
mq := NewMqWithOptions("mq", WithMqAddr("localhost:9001"))
time.Sleep(time.Second)
assert.NotNil(t, mq)
err := mq.Close()
time.Sleep(time.Second)
assert.Nil(t, err)
}

25
mq_windows.go Normal file
View File

@ -0,0 +1,25 @@
//go:build windows
// +build windows
package hpds_node
import (
"git.hpds.cc/Component/network/log"
)
// initialize when mq running as server. support inspection:
// - `kill -SIGTERM <pid>` graceful shutdown
func (z *messageQueue) init() {
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
log.Printf("%sListening SIGTERM/SIGINT...", mqLogPrefix)
for p1 := range c {
log.Printf("Received signal: %s", p1)
if p1 == syscall.SIGTERM || p1 == syscall.SIGINT {
log.Printf("graceful shutting down ... %s", p1)
os.Exit(0)
}
}
}()
}

136
options.go Normal file
View File

@ -0,0 +1,136 @@
package hpds_node
import (
"crypto/tls"
"github.com/lucas-clemente/quic-go"
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/log"
)
const (
// DefaultMqAddr is the default address of downstream mq.
DefaultMqAddr = "localhost:9000"
)
// Option is a function that applies a hpds-Client option.
type Option func(o *Options)
// Options are the options for YoMo
type Options struct {
MqAddr string // target mq endpoint address
// MqListenAddr string // mq endpoint address
MqWorkflowConfig string // mq workflow file
MeshConfigURL string // meshConfigURL is the URL of edge-mesh config
ServerOptions []network.ServerOption
ClientOptions []network.ClientOption
QuicConfig *quic.Config
TLSConfig *tls.Config
Logger log.Logger
}
// WithMqAddr return a new options with MqAddr set to addr.
func WithMqAddr(addr string) Option {
return func(o *Options) {
o.MqAddr = addr
}
}
// // WithMqListenAddr return a new options with MqListenAddr set to addr.
// func WithMqListenAddr(addr string) Option {
// return func(o *options) {
// o.MqListenAddr = addr
// }
// }
// TODO: WithWorkflowConfig
// WithMeshConfigURL sets the initial edge-mesh config URL for the YoMo-MessageQueue.
func WithMeshConfigURL(url string) Option {
return func(o *Options) {
o.MeshConfigURL = url
}
}
// WithTLSConfig sets the TLS configuration for the client.
func WithTLSConfig(tc *tls.Config) Option {
return func(o *Options) {
o.TLSConfig = tc
}
}
// WithQuicConfig sets the QUIC configuration for the client.
func WithQuicConfig(qc *quic.Config) Option {
return func(o *Options) {
o.QuicConfig = qc
}
}
// WithClientOptions returns a new options with opts.
func WithClientOptions(opts ...network.ClientOption) Option {
return func(o *Options) {
o.ClientOptions = opts
}
}
// WithServerOptions returns a new options with opts.
func WithServerOptions(opts ...network.ServerOption) Option {
return func(o *Options) {
o.ServerOptions = opts
}
}
// WithAuth sets the server authentication method (used by server)
func WithAuth(name string, args ...string) Option {
return func(o *Options) {
o.ServerOptions = append(
o.ServerOptions,
network.WithAuth(name, args...),
)
}
}
// WithCredential sets the client credential method (used by client)
func WithCredential(payload string) Option {
return func(o *Options) {
o.ClientOptions = append(
o.ClientOptions,
network.WithCredential(payload),
)
}
}
// WithObserveDataTags sets client data tag list.
func WithObserveDataTags(tags ...byte) Option {
return func(o *Options) {
o.ClientOptions = append(
o.ClientOptions,
network.WithObserveDataTags(tags...),
)
}
}
// WithLogger sets the client logger
func WithLogger(logger log.Logger) Option {
return func(o *Options) {
o.ClientOptions = append(
o.ClientOptions,
network.WithLogger(logger),
)
}
}
// NewOptions creates a new options for YoMo-Client.
func NewOptions(opts ...Option) *Options {
options := &Options{}
for _, o := range opts {
o(options)
}
if options.MqAddr == "" {
options.MqAddr = DefaultMqAddr
}
return options
}

100
router.go Normal file
View File

@ -0,0 +1,100 @@
package hpds_node
import (
"fmt"
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/hpds_err"
"git.hpds.cc/pavement/hpds_node/config"
"sync"
)
type router struct {
r *route
}
func newRouter(functions []config.App) network.Router {
return &router{r: newRoute(functions)}
}
func (r *router) Route(metadata network.Metadata) network.Route {
return r.r
}
func (r *router) Clean() {
r.r = nil
}
type route struct {
functions []config.App
data map[byte]map[string]string
mu sync.RWMutex
}
func newRoute(functions []config.App) *route {
return &route{
functions: functions,
data: make(map[byte]map[string]string),
}
}
func (r *route) Add(connId string, name string, observeDataTags []byte) (err error) {
r.mu.Lock()
defer r.mu.Unlock()
ok := false
for _, v := range r.functions {
if v.Name == name {
ok = true
break
}
}
if !ok {
return fmt.Errorf("SFN[%s] does not exist in config functions", name)
}
LOOP:
for _, connects := range r.data {
for connKey, n := range connects {
if n == name {
err = hpds_err.NewDuplicateNameError(connKey, fmt.Errorf("node:Stream Function[%s] is already linked to another connection", name))
delete(connects, connKey)
break LOOP
}
}
}
for _, tag := range observeDataTags {
connects := r.data[tag]
if connects == nil {
connects = make(map[string]string)
r.data[tag] = connects
}
r.data[tag][connId] = name
}
return err
}
func (r *route) Remove(connId string) error {
r.mu.Lock()
defer r.mu.Unlock()
for _, connects := range r.data {
delete(connects, connId)
}
return nil
}
func (r *route) GetForwardRoutes(tag byte) []string {
r.mu.RLock()
defer r.mu.RUnlock()
var keys []string
if connects := r.data[tag]; connects != nil {
for k := range connects {
keys = append(keys, k)
}
}
return keys
}

22
sf_test.go Normal file
View File

@ -0,0 +1,22 @@
package hpds_node
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestSfConnectToServer(t *testing.T) {
sf := NewStreamFunction(
"test-sf",
WithMqAddr("localhost:9000"),
WithObserveDataTags(0x33),
)
defer sf.Close()
// set handler
_ = sf.SetHandler(nil)
// connect to server
err := sf.Connect()
assert.Nil(t, err)
}

183
stream_func.go Normal file
View File

@ -0,0 +1,183 @@
package hpds_node
import (
"context"
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame"
)
const (
streamFunctionLogPrefix = "\033[31m[hpds:stream_function]\033[0m "
)
// StreamFunction defines serverless streaming functions.
type StreamFunction interface {
// SetObserveDataTags set the data tag list that will be observed
// Deprecated: use hpds.WithObserveDataTags instead
SetObserveDataTags(tag ...byte)
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response
SetHandler(fn network.AsyncHandler) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
// SetPipeHandler set the pipe handler function
SetPipeHandler(fn network.PipeHandler) error
// Connect create a connection to the messageQueue
Connect() error
// Close will close the connection
Close() error
// Write Send a data to mq.
Write(tag byte, carriage []byte) error
}
// NewStreamFunction create a stream function.
func NewStreamFunction(name string, opts ...Option) StreamFunction {
options := NewOptions(opts...)
client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...)
sf := &streamFunction{
name: name,
zipperEndpoint: options.MqAddr,
client: client,
observeDataTags: make([]byte, 0),
}
return sf
}
var _ StreamFunction = &streamFunction{}
// streamFunction implements StreamFunction interface.
type streamFunction struct {
name string
zipperEndpoint string
client *network.Client
observeDataTags []byte // tag list that will be observed
fn network.AsyncHandler // user's function which will be invoked when data arrived
pfn network.PipeHandler
pIn chan []byte
pOut chan *frame.PayloadFrame
}
// SetObserveDataTags set the data tag list that will be observed.
// Deprecated: use hpds.WithObserveDataTags instead
func (s *streamFunction) SetObserveDataTags(tag ...byte) {
s.client.SetObserveDataTags(tag...)
s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags)
}
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response.
func (s *streamFunction) SetHandler(fn network.AsyncHandler) error {
s.fn = fn
s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn)
return nil
}
func (s *streamFunction) SetPipeHandler(fn network.PipeHandler) error {
s.pfn = fn
s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn)
return nil
}
// Connect create a connection to the mq, when data arrived, the data will be passed to the
// handler which set by SetHandler method.
func (s *streamFunction) Connect() error {
s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix)
// notify underlying network operations, when data with tag we observed arrived, invoke the func
s.client.SetDataFrameObserver(func(data *frame.DataFrame) {
s.client.Logger().Debugf("%sreceive DataFrame, tag=%# x, carriage=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage())
s.onDataFrame(data.GetCarriage(), data.GetMetaFrame())
})
if s.pfn != nil {
s.pIn = make(chan []byte)
s.pOut = make(chan *frame.PayloadFrame)
// handle user's pipe function
go func() {
s.pfn(s.pIn, s.pOut)
}()
// send user's pipe function outputs to mq
go func() {
for {
data := <-s.pOut
if data != nil {
s.client.Logger().Debugf("%spipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage)
frame := frame.NewDataFrame()
// todo: frame.SetTransactionId
frame.SetCarriage(data.Tag, data.Carriage)
s.client.WriteFrame(frame)
}
}
}()
}
err := s.client.Connect(context.Background(), s.zipperEndpoint)
if err != nil {
s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err)
}
return err
}
// Close will close the connection.
func (s *streamFunction) Close() error {
if s.pIn != nil {
close(s.pIn)
}
if s.pOut != nil {
close(s.pOut)
}
if s.client != nil {
if err := s.client.Close(); err != nil {
s.client.Logger().Errorf("%sClose(): %v", err)
return err
}
}
return nil
}
// when DataFrame we observed arrived, invoke the user's function
func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
s.client.Logger().Infof("%sonDataFrame ->[%s]", streamFunctionLogPrefix, s.name)
if s.fn != nil {
go func() {
s.client.Logger().Debugf("%sexecute-start fn: data[%d]=%# x", streamFunctionLogPrefix, len(data), frame.Shortly(data))
// invoke serverless
tag, resp := s.fn(data)
s.client.Logger().Debugf("%sexecute-done fn: tag=%#x, resp[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp))
// if resp is not nil, means the user's function has returned something, we should send it to the mq
if len(resp) != 0 {
s.client.Logger().Debugf("%sstart WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp))
// build a DataFrame
// TODO: seems we should implement a DeepCopy() of MetaFrame in the future
frame := frame.NewDataFrame()
// reuse transactionId
frame.SetTransactionId(metaFrame.TransactionId())
// reuse sourceId
frame.SetSourceId(metaFrame.SourceId())
frame.SetCarriage(tag, resp)
s.client.WriteFrame(frame)
}
}()
} else if s.pfn != nil {
s.client.Logger().Debugf("%spipe fn receive: data[%d]=%# x", streamFunctionLogPrefix, len(data), data)
s.pIn <- data
} else {
s.client.Logger().Warnf("%sStreamFunction is nil", streamFunctionLogPrefix)
}
}
// Send a DataFrame to mq.
func (s *streamFunction) Write(tag byte, carriage []byte) error {
fm := frame.NewDataFrame()
fm.SetCarriage(tag, carriage)
return s.client.WriteFrame(fm)
}
// SetErrorHandler set the error handler function when server error occurs
func (s *streamFunction) SetErrorHandler(fn func(err error)) {
s.client.SetErrorHandler(fn)
}