Compare commits

...

14 Commits
v1.0 ... main

Author SHA1 Message Date
wangjian 47b7490878 1、修改bug 2023-04-21 10:53:04 +08:00
wangjian 9403c4d01e 1、修改bug 2023-04-05 23:35:16 +08:00
wangjian 033e5f845d 1、修改bug 2023-04-05 21:17:46 +08:00
wangjian be7c3e11c6 1、修改bug 2023-04-05 20:59:07 +08:00
wangjian 2c72ebec4f 1、修改bug 2023-04-05 17:42:17 +08:00
wangjian ff095c2312 1、修改bug,如果连接报错,就会拒绝服务的问题 2023-04-05 17:30:22 +08:00
wangjian 44824a7e9f 1、修改bug,如果连接报错,就会拒绝服务的问题 2023-04-05 17:25:22 +08:00
wangjian 3aac3a04ba 1、修改bug,如果连接报错,就会拒绝服务的问题 2023-04-05 11:19:29 +08:00
wangjian 41414aafa9 1、修改bug,如果连接报错,就会拒绝服务的问题 2023-04-02 23:26:19 +08:00
wangjian a1c0ad2f70 1、修改bug 2023-03-26 23:29:49 +08:00
wangjian e0eab082e6 1、修改bug 2023-03-24 08:49:01 +08:00
wangjian 753c4fe9c8 1、去除只能订阅一次的问题 2023-03-07 17:48:26 +08:00
wangjian 37f7ba99ea 1、add video stream demo
2、add config read string function
2022-10-23 13:33:16 +08:00
wangjian 6717831eb6 1、add video stream demo
2、add config read string function
2022-10-18 14:41:01 +08:00
23 changed files with 528 additions and 246 deletions

53
ap.go
View File

@ -4,6 +4,7 @@ import (
"context"
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame"
"git.hpds.cc/Component/network/log"
)
const (
@ -17,15 +18,17 @@ type AccessPoint interface {
// Connect to HPDS-Mq.
Connect() error
// SetDataTag will set the tag of data when invoking Write().
SetDataTag(tag uint8)
SetDataTag(tag frame.Tag)
// Write the data to downstream.
Write(p []byte) (n int, err error)
// WriteWithTag will write data with specified tag, default transactionId is epoch time.
WriteWithTag(tag uint8, data []byte) error
WriteWithTag(tag frame.Tag, data []byte) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
// SetReceiveHandler [Experimental] set to observe handler function
SetReceiveHandler(fn func(tag byte, data []byte))
SetReceiveHandler(fn func(tag frame.Tag, data []byte))
// Broadcast the data to all downstream
Broadcast(data []byte) error
}
// hpds-AccessPoint
@ -33,8 +36,8 @@ type accessPoint struct {
name string
mqEndpoint string
client *network.Client
tag uint8
fn func(byte, []byte)
tag frame.Tag
fn func(frame.Tag, []byte)
}
var _ AccessPoint = &accessPoint{}
@ -42,7 +45,7 @@ var _ AccessPoint = &accessPoint{}
// NewAccessPoint create a hpds-AccessPoint
func NewAccessPoint(name string, opts ...Option) AccessPoint {
options := NewOptions(opts...)
client := network.NewClient(name, network.ClientTypeProtocolGateway, options.ClientOptions...)
client := network.NewClient(name, network.ClientTypeSource, options.ClientOptions...)
return &accessPoint{
name: name,
@ -53,21 +56,25 @@ func NewAccessPoint(name string, opts ...Option) AccessPoint {
// Write the data to downstream.
func (s *accessPoint) Write(data []byte) (int, error) {
return len(data), s.WriteWithTag(s.tag, data)
err := s.WriteWithTag(s.tag, data)
if err != nil {
return 0, err
}
return len(data), nil
}
// SetDataTag will set the tag of data when invoking Write().
func (s *accessPoint) SetDataTag(tag uint8) {
func (s *accessPoint) SetDataTag(tag frame.Tag) {
s.tag = tag
}
// Close will close the connection to YoMo-MessageQueue.
// Close will close the connection to MessageQueue.
func (s *accessPoint) Close() error {
if err := s.client.Close(); err != nil {
s.client.Logger().Errorf("%sClose(): %v", apLogPrefix, err)
log.Errorf("%s Close(): %v", apLogPrefix, err)
return err
}
s.client.Logger().Debugf("%s is closed", apLogPrefix)
log.Debugf("%s is closed", apLogPrefix)
return nil
}
@ -82,18 +89,18 @@ func (s *accessPoint) Connect() error {
err := s.client.Connect(context.Background(), s.mqEndpoint)
if err != nil {
s.client.Logger().Errorf("%sConnect() error: %s", apLogPrefix, err)
log.Errorf("%s Connect() error: %s", apLogPrefix, err)
}
return err
}
// WriteWithTag will write data with specified tag, default transactionID is epoch time.
func (s *accessPoint) WriteWithTag(tag uint8, data []byte) error {
func (s *accessPoint) WriteWithTag(tag frame.Tag, data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(byte(tag), data)
f.SetCarriage(tag, data)
f.SetSourceId(s.client.ClientId())
s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x",
apLogPrefix, f.TransactionId(), f.SourceId(), len(data), frame.Shortly(data))
log.Debugf("%s WriteWithTag: tid=%s, source_id=%s, data[%d]",
apLogPrefix, f.TransactionId(), f.SourceId(), len(data))
return s.client.WriteFrame(f)
}
@ -103,7 +110,17 @@ func (s *accessPoint) SetErrorHandler(fn func(err error)) {
}
// SetReceiveHandler [Experimental] set to observe handler function
func (s *accessPoint) SetReceiveHandler(fn func(byte, []byte)) {
func (s *accessPoint) SetReceiveHandler(fn func(frame.Tag, []byte)) {
s.fn = fn
s.client.Logger().Debugf("%sSetReceiveHandler(%v)", apLogPrefix, s.fn)
log.Debugf("%s SetReceiveHandler(%v)", apLogPrefix, s.fn)
}
// Broadcast Write the data to all downstream
func (s *accessPoint) Broadcast(data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(s.tag, data)
f.SetSourceId(s.client.ClientId())
f.SetBroadcast(true)
log.Debugf("Broadcast", "data_frame", f.String())
return s.client.WriteFrame(f)
}

View File

@ -7,7 +7,9 @@ import (
func TestAccessPointSendDataToServer(t *testing.T) {
ap := NewAccessPoint("test-ap")
defer ap.Close()
defer func() {
_ = ap.Close()
}()
// connect to server
err := ap.Connect()

View File

@ -43,6 +43,10 @@ func LoadWorkflowConfig(path string) (*WorkflowConfig, error) {
return load(buffer)
}
func LoadWorkflowConfigForString(cfg string) (*WorkflowConfig, error) {
return load([]byte(cfg))
}
func load(data []byte) (*WorkflowConfig, error) {
var config = &WorkflowConfig{}
err := yaml.Unmarshal(data, config)
@ -73,6 +77,21 @@ func ParseWorkflowConfig(config string) (*WorkflowConfig, error) {
return wfConf, nil
}
func ParseWorkflowConfigContent(config string) (*WorkflowConfig, error) {
wfConf, err := LoadWorkflowConfigForString(config)
if err != nil {
return nil, err
}
// validate
err = validateWorkflowConfig(wfConf)
if err != nil {
return nil, err
}
return wfConf, nil
}
func validateWorkflowConfig(wfConf *WorkflowConfig) error {
if wfConf == nil {
return errors.New("workflow: config nil")

View File

@ -11,11 +11,13 @@ func main() {
if err != nil {
panic(err)
}
mq.InitOptions(hpds_node.WithAuth("token", "z1"))
defer mq.Close()
mq.InitOptions(hpds_node.WithAuth("token", "06d36c6f5705507dae778fdce90d0767"))
defer func(mq hpds_node.MessageQueue) {
_ = mq.Close()
}(mq)
// add Downstream mq
mq.AddDownstreamMq(hpds_node.NewDownstreamMq(
_ = mq.AddDownstreamMq(hpds_node.NewDownstreamMq(
"mq-2",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithCredential("token:z2"),

View File

@ -1,5 +1,6 @@
name: mq-1
host: 0.0.0.0
port: 27188
port: 27187
functions:
- name: echo-sf
- name: capture-agent

View File

@ -9,12 +9,18 @@ import (
func main() {
mq := hpds_node.NewMqWithOptions(
"mq-2",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithAuth("token", "z2"),
hpds_node.WithMqAddr("localhost:27188"),
hpds_node.WithAuth("token", "06d36c6f5705507dae778fdce90d0767"),
)
defer mq.Close()
defer func(mq hpds_node.MessageQueue) {
_ = mq.Close()
}(mq)
mq.ConfigWorkflow("mq_2.yaml")
err := mq.ConfigWorkflow("mq_2.yaml")
if err != nil {
log.Errorf("Server load workflow error! %s", err)
return
}
// start mq service
log.Printf("Server has started!, pid: %d", os.Getpid())

View File

@ -3,3 +3,4 @@ host: 0.0.0.0
port: 27188
functions:
- name: echo-sf
- name: capture-agent

View File

@ -8,18 +8,22 @@ import (
func main() {
sf := hpds_node.NewStreamFunction(
"echo-sf",
hpds_node.WithMqAddr("localhost:27187"),
"hpds-ap",
hpds_node.WithMqAddr("localhost:27188"),
hpds_node.WithObserveDataTags(0x33),
hpds_node.WithCredential("token:z2"),
)
defer sf.Close()
defer func(sf hpds_node.StreamFunction) {
_ = sf.Close()
}(sf)
// set handler
sf.SetHandler(handler)
err := sf.SetHandler(handler)
if err != nil {
log.Fatalf("[sf] handler err=%v", err)
}
// start
err := sf.Connect()
err = sf.Connect()
if err != nil {
log.Fatalf("[sf] connect err=%v", err)
os.Exit(1)

View File

@ -0,0 +1,35 @@
package main
import (
"git.hpds.cc/pavement/hpds_node"
"log"
"os"
)
func main() {
sf := hpds_node.NewStreamFunction(
"capture-agent",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithObserveDataTags(18),
hpds_node.WithCredential("06d36c6f5705507dae778fdce90d0767"),
)
defer sf.Close()
// set handler
sf.SetHandler(handler)
// start
err := sf.Connect()
if err != nil {
log.Fatalf("[sf] connect err=%v", err)
os.Exit(1)
}
select {}
}
func handler(data []byte) (byte, []byte) {
val := string(data)
log.Printf(">> [streamFunction] got tag=0x12, data=%s", val)
return 0x0, nil
}

View File

@ -0,0 +1,166 @@
package main
import (
"bytes"
"crypto/sha1"
"encoding/json"
"fmt"
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"github.com/disintegration/imaging"
ffmpeg "github.com/u2takey/ffmpeg-go"
"io"
"os"
"strconv"
"time"
)
func main() {
// connect to HPDS-MQ.
ap := hpds_node.NewAccessPoint(
"hpds-ap",
hpds_node.WithMqAddr("localhost:27188"),
hpds_node.WithCredential("token:z1"),
)
err := ap.Connect()
if err != nil {
log.Printf("[AccessPoint] Emit the data to HPDS-MQ failure with err: %v", err)
return
}
//defer ap.Close()
ap.SetDataTag(0x10)
//获取本地livego的直播流
send := func(id int, img []byte) {
if len(img) > 0 {
//log.Printf("send img %s", HexToString(img))
str := fmt.Sprintf("%d", time.Now().UnixNano()/1e6)
tick := fmt.Sprintf("%d_%s", id, time.Now().Format("20060102150405")+str[10:])
go WriteFile(fmt.Sprintf("./%s.jpeg", tick), img)
n, err := ap.Write(img)
if err != nil {
log.Printf("Send image-%v to mq failure with err: %v", id, err)
} else {
log.Printf("Send image-frame-%v to mq, n: %d, hash=%s, img_size=%v", id, n, genSha1(img), len(img))
}
}
//time.Sleep(1 * time.Millisecond)
}
video := VideoImage{}
ffStream := ffmpeg.Input("rtmp://ns8.indexforce.com/home/mystream")
//i := 0
//quit := make(chan int)
tick := time.NewTicker(time.Millisecond * 42)
defer tick.Stop()
go func() {
i := 0
for range tick.C {
if i%24 == 0 {
img, err := video.ExtractImageBytes(ffStream, i)
if err != nil {
fmt.Printf("ExtractImage64 error: %v\n", err)
}
fmt.Printf("i ========== %d \n", i)
send(i, img)
//log.Debugf("send img %#v", img)
}
i++
}
}()
select {}
//ch := make(chan int, 1)
//
//select {
//case <-quit:
// fmt.Println("quit")
// close(ch)
// return
//case <-tick.C:
// if i%24 == 0 {
// img, err := video.ExtractImageBytes(ffStream, i)
// if err != nil {
// fmt.Printf("ExtractImage64 error: %v\n", err)
// }
// fmt.Printf("i ========== %d \n", i)
// send(i, img)
// //log.Debugf("send img %#v", img)
// }
// i++
//}
}
func genSha1(buf []byte) string {
h := sha1.New()
h.Write(buf)
return fmt.Sprintf("%x", h.Sum(nil))
}
func WriteFile(fn string, data []byte) {
err := os.WriteFile(fn, data, 777)
if err != nil {
log.Printf("write file error =%v \n", err)
}
}
type VideoImage struct {
}
func (v *VideoImage) ExtractImageBytes(stream *ffmpeg.Stream, frameNum int) ([]byte, error) {
reader := v.extractImage(stream, frameNum)
img, err := imaging.Decode(reader)
if err != nil {
return nil, err
}
imgBuf := new(bytes.Buffer)
err = imaging.Encode(imgBuf, img, imaging.JPEG)
if err != nil {
return nil, err
}
return imgBuf.Bytes(), nil
}
func (v *VideoImage) extractImage(stream *ffmpeg.Stream, frameNum int) io.Reader {
buf := bytes.NewBuffer(nil)
err := stream.
Filter("select", ffmpeg.Args{fmt.Sprintf("gte(n,%d)", frameNum)}).
Output("pipe:", ffmpeg.KwArgs{"vframes": 1, "format": "image2", "vcodec": "mjpeg"}).
//WithOutput(buf, os.Stdout).
WithOutput(buf, nil).
Run()
if err != nil {
log.Errorf("stream error %s", err)
panic(err)
}
return buf
}
func (v *VideoImage) GetFrameCount(inFileName string) (int, error) {
data, _ := ffmpeg.Probe(inFileName)
var m map[string]interface{}
err := json.Unmarshal([]byte(data), &m)
if err != nil {
return 0, err
}
var strInt string
items := m["streams"].([]interface{})
for _, item := range items {
v := item.(map[string]interface{})
if v["profile"] == "Main" || v["profile"] == "High" {
strInt = v["nb_frames"].(string)
break
}
}
if len(strInt) == 0 {
return 0, fmt.Errorf("not find profile(Main).nb_frames")
}
num, err := strconv.Atoi(strInt)
if err != nil {
return 0, nil
}
return num, nil
}

View File

@ -0,0 +1,33 @@
package main
import (
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"os"
)
func main() {
mq, err := hpds_node.NewMq("mq_1.yaml")
if err != nil {
panic(err)
}
mq.InitOptions(hpds_node.WithAuth("token", "z1"))
defer mq.Close()
// add Downstream mq
mq.AddDownstreamMq(hpds_node.NewDownstreamMq(
"mq-2",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithCredential("token:z2"),
))
// start mq service
log.Printf("Server has started!, pid: %d", os.Getpid())
go func() {
err := mq.ListenAndServe()
if err != nil {
panic(err)
}
}()
select {}
}

View File

@ -0,0 +1,5 @@
name: mq-1
host: 0.0.0.0
port: 27188
functions:
- name: echo-sf

View File

@ -0,0 +1,28 @@
package main
import (
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"os"
)
func main() {
mq := hpds_node.NewMqWithOptions(
"mq-2",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithAuth("token", "z2"),
)
defer mq.Close()
mq.ConfigWorkflow("mq_2.yaml")
// start mq service
log.Printf("Server has started!, pid: %d", os.Getpid())
go func() {
err := mq.ListenAndServe()
if err != nil {
panic(err)
}
}()
select {}
}

View File

@ -0,0 +1,5 @@
name: mq-2
host: 0.0.0.0
port: 27188
functions:
- name: echo-sf

View File

@ -0,0 +1,79 @@
package main
import (
"crypto/sha1"
"fmt"
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"os"
"sync/atomic"
"time"
)
const ImageDataKey = 0x10
var (
counter uint64
)
func main() {
sf := hpds_node.NewStreamFunction(
"echo-sf",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithObserveDataTags(ImageDataKey),
hpds_node.WithCredential("token:z1"),
)
defer sf.Close()
sf.SetHandler(Handler)
err := sf.Connect()
if err != nil {
log.Printf("Connect to MQ failure: ", err)
os.Exit(1)
}
select {}
}
// Handler process the data in the stream
func Handler(img []byte) (byte, []byte) {
// Initialize WasmEdge's VM
//vmConf, vm := initVM()
//bg := bindgen.Instantiate(vm)
//defer bg.Release()
//defer vm.Release()
//defer vmConf.Release()
//
//// recognize the image
//res, err := bg.Execute("infer", img)
//if err == nil {
// fmt.Println("GO: Run bindgen -- infer:", string(res[0].([]byte)))
//} else {
// fmt.Println("GO: Run bindgen -- infer FAILED")
//}
id := atomic.AddUint64(&counter, 1)
str := fmt.Sprintf("%d", time.Now().UnixNano()/1e6)
tick := time.Now().Format("20060102150405") + str[10:]
go WriteFile(fmt.Sprintf("./%d_%s.jpeg", id, tick), img)
// print logs
hash := genSha1(img)
log.Printf("received image-%d hash %v, img_size=%d \n", id, hash, len(img))
return 0x11, nil
}
// genSha1 generate the hash value of the image
func genSha1(buf []byte) string {
h := sha1.New()
h.Write(buf)
return fmt.Sprintf("%x", h.Sum(nil))
}
func WriteFile(fn string, data []byte) {
err := os.WriteFile(fn, data, 777)
if err != nil {
log.Printf("write file error =%v \n", err)
}
}

37
go.mod
View File

@ -3,36 +3,43 @@ module git.hpds.cc/pavement/hpds_node
go 1.19
require (
git.hpds.cc/Component/network v0.0.0-20221012021659-2433c68452d5
github.com/lucas-clemente/quic-go v0.29.1
git.hpds.cc/Component/network v0.0.0-20230421024959-bf7300c92a95
github.com/disintegration/imaging v1.6.2
github.com/quic-go/quic-go v0.33.0
github.com/stretchr/testify v1.8.0
github.com/u2takey/ffmpeg-go v0.4.1
gopkg.in/yaml.v3 v3.0.1
)
require (
git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect
github.com/aws/aws-sdk-go v1.38.20 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect
github.com/matoous/go-nanoid/v2 v2.0.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/onsi/ginkgo/v2 v2.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/quic-go/qtls-go1-19 v0.2.1 // indirect
github.com/quic-go/qtls-go1-20 v0.1.1 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/u2takey/go-utils v0.3.1 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/image v0.1.0 // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/tools v0.2.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
)
//replace (
// git.hpds.cc/Component/network => ../network
//)

View File

@ -1,30 +0,0 @@
package hpds_node
import (
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame"
)
type metadata struct{}
func (m *metadata) Encode() []byte {
return nil
}
type metadataBuilder struct {
m *metadata
}
func newMetadataBuilder() network.MetadataBuilder {
return &metadataBuilder{
m: &metadata{},
}
}
func (builder *metadataBuilder) Build(f *frame.HandshakeFrame) (network.Metadata, error) {
return builder.m, nil
}
func (builder *metadataBuilder) Decode(buf []byte) (network.Metadata, error) {
return builder.m, nil
}

47
mq.go
View File

@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"git.hpds.cc/Component/network/metadata"
"git.hpds.cc/Component/network/router"
"net/http"
"git.hpds.cc/Component/network"
@ -68,35 +70,35 @@ var _ MessageQueue = &messageQueue{}
// NewMqWithOptions create a messageQueue instance.
func NewMqWithOptions(name string, opts ...Option) MessageQueue {
options := NewOptions(opts...)
zipper := createMessageQueueServer(name, options, nil)
zipper.ConfigMesh(options.MeshConfigURL)
emitter := createMessageQueueServer(name, options, nil)
_ = emitter.ConfigMesh(options.MeshConfigURL)
return zipper
return emitter
}
// NewMq create a messageQueue instance from config files.
func NewMq(conf string) (MessageQueue, error) {
config, err := config.ParseWorkflowConfig(conf)
confWf, err := config.ParseWorkflowConfig(conf)
if err != nil {
log.Errorf("%s[ERR] %v", mqLogPrefix, err)
return nil, err
}
// listening address
listenAddr := fmt.Sprintf("%s:%d", config.Host, config.Port)
listenAddr := fmt.Sprintf("%s:%d", confWf.Host, confWf.Port)
options := NewOptions()
options.MqAddr = listenAddr
zipper := createMessageQueueServer(config.Name, options, config)
emitter := createMessageQueueServer(confWf.Name, options, confWf)
// messageQueue workflow
err = zipper.configWorkflow(config)
err = emitter.configWorkflow(confWf)
return zipper, err
return emitter, err
}
// NewDownstreamMq create a messageQueue descriptor for downstream messageQueue.
func NewDownstreamMq(name string, opts ...Option) MessageQueue {
options := NewOptions(opts...)
client := network.NewClient(name, network.ClientTypeMessageQueue, options.ClientOptions...)
client := network.NewClient(name, network.ClientTypeUpstreamEmitter, options.ClientOptions...)
return &messageQueue{
name: name,
@ -120,6 +122,9 @@ func createMessageQueueServer(name string, options *Options, cfg *config.Workflo
z.init()
return z
}
func (z *messageQueue) Logger() log.Logger {
return log.Default()
}
// ConfigWorkflow will read workflows from config files and register them to messageQueue.
func (z *messageQueue) ConfigWorkflow(conf string) error {
@ -134,8 +139,12 @@ func (z *messageQueue) ConfigWorkflow(conf string) error {
func (z *messageQueue) configWorkflow(config *config.WorkflowConfig) error {
z.wfc = config
z.server.ConfigMetadataBuilder(newMetadataBuilder())
z.server.ConfigRouter(newRouter(config.Functions))
z.server.ConfigMetadataBuilder(metadata.DefaultBuilder())
funcList := make([]string, len(config.Functions))
for k, v := range config.Functions {
funcList[k] = v.Name
}
z.server.ConfigRouter(router.Default(funcList))
return nil
}
@ -150,7 +159,9 @@ func (z *messageQueue) ConfigMesh(url string) error {
if err != nil {
return err
}
defer res.Body.Close()
defer func() {
_ = res.Body.Close()
}()
decoder := json.NewDecoder(res.Body)
var configs []config.MeshMessageQueue
@ -186,11 +197,11 @@ func (z *messageQueue) ListenAndServe() error {
log.Debugf("%sCreating MessageQueue Server ...", mqLogPrefix)
// check downstream zippers
for _, ds := range z.downstreamMqs {
if dsZipper, ok := ds.(*messageQueue); ok {
go func(dsZipper *messageQueue) {
_ = dsZipper.client.Connect(context.Background(), dsZipper.addr)
z.server.AddDownstreamServer(dsZipper.addr, dsZipper.client)
}(dsZipper)
if dsEmitter, ok := ds.(*messageQueue); ok {
go func(dsEmitter *messageQueue) {
_ = dsEmitter.client.Connect(context.Background(), dsEmitter.addr)
z.server.AddDownstreamServer(dsEmitter.addr, dsEmitter.client)
}(dsEmitter)
}
}
return z.server.ListenAndServe(context.Background(), z.addr)
@ -251,7 +262,7 @@ func (z *messageQueue) Stats() int {
log.Printf("[%s] all downstream mq connected: %d", z.name, len(z.server.DownStreams()))
for k, v := range z.server.DownStreams() {
log.Printf("[%s] |> [%s] %s", z.name, k, v.ServerAddr())
log.Printf("[%s] |> [%s] %v", z.name, k, v)
}
log.Printf("[%s] total DataFrames received: %d", z.name, z.server.StatsCounter())

View File

@ -21,18 +21,19 @@ func (z *messageQueue) init() {
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGUSR1, syscall.SIGINT)
log.Printf("%sListening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...", mqLogPrefix)
log.Infof("%sListening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...", mqLogPrefix)
for p1 := range c {
log.Printf("Received signal: %s", p1)
log.Infof("Received signal: %s", p1)
if p1 == syscall.SIGTERM || p1 == syscall.SIGINT {
log.Printf("graceful shutting down ... %s", p1)
log.Infof("graceful shutting down ... %s", p1)
_ = z.Close()
os.Exit(0)
} else if p1 == syscall.SIGUSR2 {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("\tNumGC = %v\n", m.NumGC)
} else if p1 == syscall.SIGUSR1 {
log.Printf("print MessageQueue stats(): %d", z.Stats())
log.Infof("print MessageQueue stats(): %d", z.Stats())
}
}
}()

View File

@ -13,11 +13,11 @@ func (z *messageQueue) init() {
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
log.Printf("%sListening SIGTERM/SIGINT...", mqLogPrefix)
log.Infof("%s Listening SIGTERM/SIGINT...", mqLogPrefix)
for p1 := range c {
log.Printf("Received signal: %s", p1)
if p1 == syscall.SIGTERM || p1 == syscall.SIGINT {
log.Printf("graceful shutting down ... %s", p1)
log.Infof("%s graceful shutting down ... %s", mqLogPrefix, p1)
os.Exit(0)
}
}

View File

@ -2,10 +2,10 @@ package hpds_node
import (
"crypto/tls"
"github.com/lucas-clemente/quic-go"
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame"
"git.hpds.cc/Component/network/log"
"github.com/quic-go/quic-go"
)
const (
@ -101,7 +101,7 @@ func WithCredential(payload string) Option {
}
// WithObserveDataTags sets client data tag list.
func WithObserveDataTags(tags ...byte) Option {
func WithObserveDataTags(tags ...frame.Tag) Option {
return func(o *Options) {
o.ClientOptions = append(
o.ClientOptions,
@ -110,16 +110,6 @@ func WithObserveDataTags(tags ...byte) Option {
}
}
// WithLogger sets the client logger
func WithLogger(logger log.Logger) Option {
return func(o *Options) {
o.ClientOptions = append(
o.ClientOptions,
network.WithLogger(logger),
)
}
}
// NewOptions creates a new options for YoMo-Client.
func NewOptions(opts ...Option) *Options {
options := &Options{}

100
router.go
View File

@ -1,100 +0,0 @@
package hpds_node
import (
"fmt"
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/hpds_err"
"git.hpds.cc/pavement/hpds_node/config"
"sync"
)
type router struct {
r *route
}
func newRouter(functions []config.App) network.Router {
return &router{r: newRoute(functions)}
}
func (r *router) Route(metadata network.Metadata) network.Route {
return r.r
}
func (r *router) Clean() {
r.r = nil
}
type route struct {
functions []config.App
data map[byte]map[string]string
mu sync.RWMutex
}
func newRoute(functions []config.App) *route {
return &route{
functions: functions,
data: make(map[byte]map[string]string),
}
}
func (r *route) Add(connId string, name string, observeDataTags []byte) (err error) {
r.mu.Lock()
defer r.mu.Unlock()
ok := false
for _, v := range r.functions {
if v.Name == name {
ok = true
break
}
}
if !ok {
return fmt.Errorf("SFN[%s] does not exist in config functions", name)
}
LOOP:
for _, connects := range r.data {
for connKey, n := range connects {
if n == name {
err = hpds_err.NewDuplicateNameError(connKey, fmt.Errorf("node:Stream Function[%s] is already linked to another connection", name))
delete(connects, connKey)
break LOOP
}
}
}
for _, tag := range observeDataTags {
connects := r.data[tag]
if connects == nil {
connects = make(map[string]string)
r.data[tag] = connects
}
r.data[tag][connId] = name
}
return err
}
func (r *route) Remove(connId string) error {
r.mu.Lock()
defer r.mu.Unlock()
for _, connects := range r.data {
delete(connects, connId)
}
return nil
}
func (r *route) GetForwardRoutes(tag byte) []string {
r.mu.RLock()
defer r.mu.RUnlock()
var keys []string
if connects := r.data[tag]; connects != nil {
for k := range connects {
keys = append(keys, k)
}
}
return keys
}

View File

@ -14,7 +14,7 @@ const (
type StreamFunction interface {
// SetObserveDataTags set the data tag list that will be observed
// Deprecated: use hpds.WithObserveDataTags instead
SetObserveDataTags(tag ...byte)
SetObserveDataTags(tag ...frame.Tag)
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response
SetHandler(fn network.AsyncHandler) error
// SetErrorHandler set the error handler function when server error occurs
@ -26,7 +26,7 @@ type StreamFunction interface {
// Close will close the connection
Close() error
// Write Send a data to mq.
Write(tag byte, carriage []byte) error
Write(tag frame.Tag, carriage []byte) error
}
// NewStreamFunction create a stream function.
@ -35,9 +35,9 @@ func NewStreamFunction(name string, opts ...Option) StreamFunction {
client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...)
sf := &streamFunction{
name: name,
zipperEndpoint: options.MqAddr,
emitterEndpoint: options.MqAddr,
client: client,
observeDataTags: make([]byte, 0),
observeDataTags: make([]frame.Tag, 0),
}
return sf
@ -48,9 +48,9 @@ var _ StreamFunction = &streamFunction{}
// streamFunction implements StreamFunction interface.
type streamFunction struct {
name string
zipperEndpoint string
emitterEndpoint string
client *network.Client
observeDataTags []byte // tag list that will be observed
observeDataTags []frame.Tag // tag list that will be observed
fn network.AsyncHandler // user's function which will be invoked when data arrived
pfn network.PipeHandler
pIn chan []byte
@ -59,21 +59,21 @@ type streamFunction struct {
// SetObserveDataTags set the data tag list that will be observed.
// Deprecated: use hpds.WithObserveDataTags instead
func (s *streamFunction) SetObserveDataTags(tag ...byte) {
func (s *streamFunction) SetObserveDataTags(tag ...frame.Tag) {
s.client.SetObserveDataTags(tag...)
s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags)
s.client.Logger().Debugf("%s SetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags)
}
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response.
func (s *streamFunction) SetHandler(fn network.AsyncHandler) error {
s.fn = fn
s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn)
s.client.Logger().Debugf("%s SetHandler(%v)", streamFunctionLogPrefix, s.fn)
return nil
}
func (s *streamFunction) SetPipeHandler(fn network.PipeHandler) error {
s.pfn = fn
s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn)
s.client.Logger().Debugf("%s SetHandler(%v)", streamFunctionLogPrefix, s.fn)
return nil
}
@ -83,7 +83,7 @@ func (s *streamFunction) Connect() error {
s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix)
// notify underlying network operations, when data with tag we observed arrived, invoke the func
s.client.SetDataFrameObserver(func(data *frame.DataFrame) {
s.client.Logger().Debugf("%sreceive DataFrame, tag=%# x, carriage=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage())
s.client.Logger().Debugf("%s receive DataFrame, tag=%# x, carriage length=%d", streamFunctionLogPrefix, data.Tag(), len(data.GetCarriage()))
s.onDataFrame(data.GetCarriage(), data.GetMetaFrame())
})
@ -101,19 +101,19 @@ func (s *streamFunction) Connect() error {
for {
data := <-s.pOut
if data != nil {
s.client.Logger().Debugf("%spipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage)
frame := frame.NewDataFrame()
// todo: frame.SetTransactionId
frame.SetCarriage(data.Tag, data.Carriage)
s.client.WriteFrame(frame)
s.client.Logger().Debugf("%s pipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage)
frm := frame.NewDataFrame()
// todo: frm.SetTransactionId
frm.SetCarriage(data.Tag, data.Carriage)
_ = s.client.WriteFrame(frm)
}
}
}()
}
err := s.client.Connect(context.Background(), s.zipperEndpoint)
err := s.client.Connect(context.Background(), s.emitterEndpoint)
if err != nil {
s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err)
s.client.Logger().Errorf("%s Connect() error: %s", streamFunctionLogPrefix, err)
}
return err
}
@ -130,7 +130,7 @@ func (s *streamFunction) Close() error {
if s.client != nil {
if err := s.client.Close(); err != nil {
s.client.Logger().Errorf("%sClose(): %v", err)
s.client.Logger().Errorf("%s Close(): %v", err)
return err
}
}
@ -140,38 +140,38 @@ func (s *streamFunction) Close() error {
// when DataFrame we observed arrived, invoke the user's function
func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
s.client.Logger().Infof("%sonDataFrame ->[%s]", streamFunctionLogPrefix, s.name)
s.client.Logger().Infof("%s onDataFrame ->[%s]", streamFunctionLogPrefix, s.name)
if s.fn != nil {
go func() {
s.client.Logger().Debugf("%sexecute-start fn: data[%d]=%# x", streamFunctionLogPrefix, len(data), frame.Shortly(data))
s.client.Logger().Debugf("%s execute-start fn: data[%d]", streamFunctionLogPrefix, len(data)) //, frame.Shortly(data)
// invoke serverless
tag, resp := s.fn(data)
s.client.Logger().Debugf("%sexecute-done fn: tag=%#x, resp[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp))
s.client.Logger().Debugf("%s execute-done fn: tag=%#x, resp[%d]", streamFunctionLogPrefix, tag, len(resp))
// if resp is not nil, means the user's function has returned something, we should send it to the mq
if len(resp) != 0 {
s.client.Logger().Debugf("%sstart WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp))
s.client.Logger().Debugf("%s start WriteFrame(): tag=%#x, data[%d]", streamFunctionLogPrefix, tag, len(resp))
// build a DataFrame
// TODO: seems we should implement a DeepCopy() of MetaFrame in the future
frame := frame.NewDataFrame()
frm := frame.NewDataFrame()
// reuse transactionId
frame.SetTransactionId(metaFrame.TransactionId())
frm.SetTransactionId(metaFrame.TransactionId())
// reuse sourceId
frame.SetSourceId(metaFrame.SourceId())
frame.SetCarriage(tag, resp)
s.client.WriteFrame(frame)
frm.SetSourceId(metaFrame.SourceId())
frm.SetCarriage(tag, resp)
_ = s.client.WriteFrame(frm)
}
}()
} else if s.pfn != nil {
s.client.Logger().Debugf("%spipe fn receive: data[%d]=%# x", streamFunctionLogPrefix, len(data), data)
s.client.Logger().Debugf("%s pipe fn receive: data[%d]", streamFunctionLogPrefix, len(data))
s.pIn <- data
} else {
s.client.Logger().Warnf("%sStreamFunction is nil", streamFunctionLogPrefix)
s.client.Logger().Warnf("%s StreamFunction is nil", streamFunctionLogPrefix)
}
}
// Send a DataFrame to mq.
func (s *streamFunction) Write(tag byte, carriage []byte) error {
func (s *streamFunction) Write(tag frame.Tag, carriage []byte) error {
fm := frame.NewDataFrame()
fm.SetCarriage(tag, carriage)
return s.client.WriteFrame(fm)