From e0eab082e6ab01f42182079500e5efbc00185aab Mon Sep 17 00:00:00 2001 From: wangjian Date: Fri, 24 Mar 2023 08:49:01 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E4=BF=AE=E6=94=B9bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ap.go | 2 +- example/multi-mq/mq1/main.go | 8 ++++--- example/multi-mq/mq1/mq_1.yaml | 3 ++- example/multi-mq/mq2/main.go | 14 ++++++++---- example/multi-mq/mq2/mq_2.yaml | 1 + example/multi-mq/sf/main.go | 16 +++++++++----- example/multi-mq/stream_process_ca/main.go | 35 ++++++++++++++++++++++++++++++ mq.go | 14 +++++++----- router.go | 8 +++++-- 9 files changed, 78 insertions(+), 23 deletions(-) create mode 100644 example/multi-mq/stream_process_ca/main.go diff --git a/ap.go b/ap.go index 0da8358..28a45be 100644 --- a/ap.go +++ b/ap.go @@ -90,7 +90,7 @@ func (s *accessPoint) Connect() error { // WriteWithTag will write data with specified tag, default transactionID is epoch time. func (s *accessPoint) WriteWithTag(tag uint8, data []byte) error { f := frame.NewDataFrame() - f.SetCarriage(byte(tag), data) + f.SetCarriage(tag, data) f.SetSourceId(s.client.ClientId()) s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x", apLogPrefix, f.TransactionId(), f.SourceId(), len(data), frame.Shortly(data)) diff --git a/example/multi-mq/mq1/main.go b/example/multi-mq/mq1/main.go index 7e8a8ab..6efb6d7 100644 --- a/example/multi-mq/mq1/main.go +++ b/example/multi-mq/mq1/main.go @@ -11,11 +11,13 @@ func main() { if err != nil { panic(err) } - mq.InitOptions(hpds_node.WithAuth("token", "z1")) - defer mq.Close() + mq.InitOptions(hpds_node.WithAuth("token", "06d36c6f5705507dae778fdce90d0767")) + defer func(mq hpds_node.MessageQueue) { + _ = mq.Close() + }(mq) // add Downstream mq - mq.AddDownstreamMq(hpds_node.NewDownstreamMq( + _ = mq.AddDownstreamMq(hpds_node.NewDownstreamMq( "mq-2", hpds_node.WithMqAddr("localhost:27187"), hpds_node.WithCredential("token:z2"), diff --git a/example/multi-mq/mq1/mq_1.yaml b/example/multi-mq/mq1/mq_1.yaml index d983b05..9f31ac3 100644 --- a/example/multi-mq/mq1/mq_1.yaml +++ b/example/multi-mq/mq1/mq_1.yaml @@ -1,5 +1,6 @@ name: mq-1 host: 0.0.0.0 -port: 27188 +port: 27187 functions: - name: echo-sf + - name: capture-agent diff --git a/example/multi-mq/mq2/main.go b/example/multi-mq/mq2/main.go index a94b19b..d252dbc 100644 --- a/example/multi-mq/mq2/main.go +++ b/example/multi-mq/mq2/main.go @@ -9,12 +9,18 @@ import ( func main() { mq := hpds_node.NewMqWithOptions( "mq-2", - hpds_node.WithMqAddr("localhost:27187"), - hpds_node.WithAuth("token", "z2"), + hpds_node.WithMqAddr("localhost:27188"), + hpds_node.WithAuth("token", "06d36c6f5705507dae778fdce90d0767"), ) - defer mq.Close() + defer func(mq hpds_node.MessageQueue) { + _ = mq.Close() + }(mq) - mq.ConfigWorkflow("mq_2.yaml") + err := mq.ConfigWorkflow("mq_2.yaml") + if err != nil { + log.Errorf("Server load workflow error! %s", err) + return + } // start mq service log.Printf("Server has started!, pid: %d", os.Getpid()) diff --git a/example/multi-mq/mq2/mq_2.yaml b/example/multi-mq/mq2/mq_2.yaml index e9dc984..f7cf0e7 100644 --- a/example/multi-mq/mq2/mq_2.yaml +++ b/example/multi-mq/mq2/mq_2.yaml @@ -3,3 +3,4 @@ host: 0.0.0.0 port: 27188 functions: - name: echo-sf + - name: capture-agent diff --git a/example/multi-mq/sf/main.go b/example/multi-mq/sf/main.go index f70ff4a..0da07bc 100644 --- a/example/multi-mq/sf/main.go +++ b/example/multi-mq/sf/main.go @@ -8,18 +8,22 @@ import ( func main() { sf := hpds_node.NewStreamFunction( - "echo-sf", - hpds_node.WithMqAddr("localhost:27187"), + "hpds-ap", + hpds_node.WithMqAddr("localhost:27188"), hpds_node.WithObserveDataTags(0x33), hpds_node.WithCredential("token:z2"), ) - defer sf.Close() + defer func(sf hpds_node.StreamFunction) { + _ = sf.Close() + }(sf) // set handler - sf.SetHandler(handler) - + err := sf.SetHandler(handler) + if err != nil { + log.Fatalf("[sf] handler err=%v", err) + } // start - err := sf.Connect() + err = sf.Connect() if err != nil { log.Fatalf("[sf] connect err=%v", err) os.Exit(1) diff --git a/example/multi-mq/stream_process_ca/main.go b/example/multi-mq/stream_process_ca/main.go new file mode 100644 index 0000000..74cbb0d --- /dev/null +++ b/example/multi-mq/stream_process_ca/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "git.hpds.cc/pavement/hpds_node" + "log" + "os" +) + +func main() { + sf := hpds_node.NewStreamFunction( + "capture-agent", + hpds_node.WithMqAddr("localhost:27187"), + hpds_node.WithObserveDataTags(18), + hpds_node.WithCredential("06d36c6f5705507dae778fdce90d0767"), + ) + defer sf.Close() + + // set handler + sf.SetHandler(handler) + + // start + err := sf.Connect() + if err != nil { + log.Fatalf("[sf] connect err=%v", err) + os.Exit(1) + } + + select {} +} + +func handler(data []byte) (byte, []byte) { + val := string(data) + log.Printf(">> [streamFunction] got tag=0x12, data=%s", val) + return 0x0, nil +} diff --git a/mq.go b/mq.go index 27255d6..3fb71f1 100644 --- a/mq.go +++ b/mq.go @@ -69,26 +69,26 @@ var _ MessageQueue = &messageQueue{} func NewMqWithOptions(name string, opts ...Option) MessageQueue { options := NewOptions(opts...) zipper := createMessageQueueServer(name, options, nil) - zipper.ConfigMesh(options.MeshConfigURL) + _ = zipper.ConfigMesh(options.MeshConfigURL) return zipper } // NewMq create a messageQueue instance from config files. func NewMq(conf string) (MessageQueue, error) { - config, err := config.ParseWorkflowConfig(conf) + confWf, err := config.ParseWorkflowConfig(conf) if err != nil { log.Errorf("%s[ERR] %v", mqLogPrefix, err) return nil, err } // listening address - listenAddr := fmt.Sprintf("%s:%d", config.Host, config.Port) + listenAddr := fmt.Sprintf("%s:%d", confWf.Host, confWf.Port) options := NewOptions() options.MqAddr = listenAddr - zipper := createMessageQueueServer(config.Name, options, config) + zipper := createMessageQueueServer(confWf.Name, options, confWf) // messageQueue workflow - err = zipper.configWorkflow(config) + err = zipper.configWorkflow(confWf) return zipper, err } @@ -150,7 +150,9 @@ func (z *messageQueue) ConfigMesh(url string) error { if err != nil { return err } - defer res.Body.Close() + defer func() { + _ = res.Body.Close() + }() decoder := json.NewDecoder(res.Body) var configs []config.MeshMessageQueue diff --git a/router.go b/router.go index c8b5b44..805b53f 100644 --- a/router.go +++ b/router.go @@ -1,7 +1,6 @@ package hpds_node import ( - "fmt" "git.hpds.cc/Component/network" "git.hpds.cc/pavement/hpds_node/config" "sync" @@ -48,7 +47,12 @@ func (r *route) Add(connId string, name string, observeDataTags []byte) (err err } } if !ok { - return fmt.Errorf("SFN[%s] does not exist in config functions", name) + //return fmt.Errorf("SFN[%s] does not exist in config functions", name) + //如果不存在,自动增加 + item := config.App{ + Name: name, + } + r.functions = append(r.functions, item) } //去除只能订阅一次的问题 //LOOP: