Browse Source

1、修改bug

tags/v1.3 v1.3
wangjian 1 year ago
parent
commit
e0eab082e6
  1. 2
      ap.go
  2. 8
      example/multi-mq/mq1/main.go
  3. 3
      example/multi-mq/mq1/mq_1.yaml
  4. 14
      example/multi-mq/mq2/main.go
  5. 1
      example/multi-mq/mq2/mq_2.yaml
  6. 16
      example/multi-mq/sf/main.go
  7. 35
      example/multi-mq/stream_process_ca/main.go
  8. 14
      mq.go
  9. 8
      router.go

2
ap.go

@ -90,7 +90,7 @@ func (s *accessPoint) Connect() error {
// WriteWithTag will write data with specified tag, default transactionID is epoch time.
func (s *accessPoint) WriteWithTag(tag uint8, data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(byte(tag), data)
f.SetCarriage(tag, data)
f.SetSourceId(s.client.ClientId())
s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x",
apLogPrefix, f.TransactionId(), f.SourceId(), len(data), frame.Shortly(data))

8
example/multi-mq/mq1/main.go

@ -11,11 +11,13 @@ func main() {
if err != nil {
panic(err)
}
mq.InitOptions(hpds_node.WithAuth("token", "z1"))
defer mq.Close()
mq.InitOptions(hpds_node.WithAuth("token", "06d36c6f5705507dae778fdce90d0767"))
defer func(mq hpds_node.MessageQueue) {
_ = mq.Close()
}(mq)
// add Downstream mq
mq.AddDownstreamMq(hpds_node.NewDownstreamMq(
_ = mq.AddDownstreamMq(hpds_node.NewDownstreamMq(
"mq-2",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithCredential("token:z2"),

3
example/multi-mq/mq1/mq_1.yaml

@ -1,5 +1,6 @@
name: mq-1
host: 0.0.0.0
port: 27188
port: 27187
functions:
- name: echo-sf
- name: capture-agent

14
example/multi-mq/mq2/main.go

@ -9,12 +9,18 @@ import (
func main() {
mq := hpds_node.NewMqWithOptions(
"mq-2",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithAuth("token", "z2"),
hpds_node.WithMqAddr("localhost:27188"),
hpds_node.WithAuth("token", "06d36c6f5705507dae778fdce90d0767"),
)
defer mq.Close()
defer func(mq hpds_node.MessageQueue) {
_ = mq.Close()
}(mq)
mq.ConfigWorkflow("mq_2.yaml")
err := mq.ConfigWorkflow("mq_2.yaml")
if err != nil {
log.Errorf("Server load workflow error! %s", err)
return
}
// start mq service
log.Printf("Server has started!, pid: %d", os.Getpid())

1
example/multi-mq/mq2/mq_2.yaml

@ -3,3 +3,4 @@ host: 0.0.0.0
port: 27188
functions:
- name: echo-sf
- name: capture-agent

16
example/multi-mq/sf/main.go

@ -8,18 +8,22 @@ import (
func main() {
sf := hpds_node.NewStreamFunction(
"echo-sf",
hpds_node.WithMqAddr("localhost:27187"),
"hpds-ap",
hpds_node.WithMqAddr("localhost:27188"),
hpds_node.WithObserveDataTags(0x33),
hpds_node.WithCredential("token:z2"),
)
defer sf.Close()
defer func(sf hpds_node.StreamFunction) {
_ = sf.Close()
}(sf)
// set handler
sf.SetHandler(handler)
err := sf.SetHandler(handler)
if err != nil {
log.Fatalf("[sf] handler err=%v", err)
}
// start
err := sf.Connect()
err = sf.Connect()
if err != nil {
log.Fatalf("[sf] connect err=%v", err)
os.Exit(1)

35
example/multi-mq/stream_process_ca/main.go

@ -0,0 +1,35 @@
package main
import (
"git.hpds.cc/pavement/hpds_node"
"log"
"os"
)
func main() {
sf := hpds_node.NewStreamFunction(
"capture-agent",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithObserveDataTags(18),
hpds_node.WithCredential("06d36c6f5705507dae778fdce90d0767"),
)
defer sf.Close()
// set handler
sf.SetHandler(handler)
// start
err := sf.Connect()
if err != nil {
log.Fatalf("[sf] connect err=%v", err)
os.Exit(1)
}
select {}
}
func handler(data []byte) (byte, []byte) {
val := string(data)
log.Printf(">> [streamFunction] got tag=0x12, data=%s", val)
return 0x0, nil
}

14
mq.go

@ -69,26 +69,26 @@ var _ MessageQueue = &messageQueue{}
func NewMqWithOptions(name string, opts ...Option) MessageQueue {
options := NewOptions(opts...)
zipper := createMessageQueueServer(name, options, nil)
zipper.ConfigMesh(options.MeshConfigURL)
_ = zipper.ConfigMesh(options.MeshConfigURL)
return zipper
}
// NewMq create a messageQueue instance from config files.
func NewMq(conf string) (MessageQueue, error) {
config, err := config.ParseWorkflowConfig(conf)
confWf, err := config.ParseWorkflowConfig(conf)
if err != nil {
log.Errorf("%s[ERR] %v", mqLogPrefix, err)
return nil, err
}
// listening address
listenAddr := fmt.Sprintf("%s:%d", config.Host, config.Port)
listenAddr := fmt.Sprintf("%s:%d", confWf.Host, confWf.Port)
options := NewOptions()
options.MqAddr = listenAddr
zipper := createMessageQueueServer(config.Name, options, config)
zipper := createMessageQueueServer(confWf.Name, options, confWf)
// messageQueue workflow
err = zipper.configWorkflow(config)
err = zipper.configWorkflow(confWf)
return zipper, err
}
@ -150,7 +150,9 @@ func (z *messageQueue) ConfigMesh(url string) error {
if err != nil {
return err
}
defer res.Body.Close()
defer func() {
_ = res.Body.Close()
}()
decoder := json.NewDecoder(res.Body)
var configs []config.MeshMessageQueue

8
router.go

@ -1,7 +1,6 @@
package hpds_node
import (
"fmt"
"git.hpds.cc/Component/network"
"git.hpds.cc/pavement/hpds_node/config"
"sync"
@ -48,7 +47,12 @@ func (r *route) Add(connId string, name string, observeDataTags []byte) (err err
}
}
if !ok {
return fmt.Errorf("SFN[%s] does not exist in config functions", name)
//return fmt.Errorf("SFN[%s] does not exist in config functions", name)
//如果不存在,自动增加
item := config.App{
Name: name,
}
r.functions = append(r.functions, item)
}
//去除只能订阅一次的问题
//LOOP:

Loading…
Cancel
Save