You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
283 lines
7.7 KiB
283 lines
7.7 KiB
package hpds_node |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"fmt" |
|
"git.hpds.cc/Component/network/metadata" |
|
"git.hpds.cc/Component/network/router" |
|
"net/http" |
|
|
|
"git.hpds.cc/Component/network" |
|
"git.hpds.cc/Component/network/log" |
|
"git.hpds.cc/pavement/hpds_node/config" |
|
) |
|
|
|
const ( |
|
mqLogPrefix = "\033[33m[hpds:mq]\033[0m " |
|
) |
|
|
|
// MessageQueue is the orchestrator of hpds. There are two types of mq: |
|
// one is Upstream MessageQueue, which is used to connect to multiple downstream mq, |
|
// another one is Downstream MessageQueue (will call it as MessageQueue directly), which is used |
|
// to connected by `Message Queue`, `Access Point` and `Stream Function` |
|
type MessageQueue interface { |
|
// ConfigWorkflow will register workflows from config files to messageQueue. |
|
ConfigWorkflow(conf string) error |
|
|
|
// ConfigMesh will register edge-mesh config URL |
|
ConfigMesh(url string) error |
|
|
|
// ListenAndServe start mq as server. |
|
ListenAndServe() error |
|
|
|
// AddDownstreamMq will add downstream mq. |
|
AddDownstreamMq(downstream MessageQueue) error |
|
|
|
// Addr returns the listen address of mq. |
|
Addr() string |
|
|
|
// Stats return insight data |
|
Stats() int |
|
|
|
// Close will close the mq. |
|
Close() error |
|
|
|
// InitOptions initialize options |
|
InitOptions(opts ...Option) |
|
|
|
// ReadConfigFile(conf string) error |
|
// AddWorkflow(wf ...core.Workflow) error |
|
// ConfigDownstream(opts ...interface{}) error |
|
// Connect() error |
|
// RemoveDownstreamMq(downstream MessageQueue) error |
|
// ListenAddr() string |
|
} |
|
|
|
// messageQueue is the implementation of MessageQueue interface. |
|
type messageQueue struct { |
|
name string |
|
addr string |
|
hasDownStreams bool |
|
server *network.Server |
|
client *network.Client |
|
downstreamMqs []MessageQueue |
|
wfc *config.WorkflowConfig |
|
} |
|
|
|
var _ MessageQueue = &messageQueue{} |
|
|
|
// NewMqWithOptions create a messageQueue instance. |
|
func NewMqWithOptions(name string, opts ...Option) MessageQueue { |
|
options := NewOptions(opts...) |
|
emitter := createMessageQueueServer(name, options, nil) |
|
_ = emitter.ConfigMesh(options.MeshConfigURL) |
|
|
|
return emitter |
|
} |
|
|
|
// NewMq create a messageQueue instance from config files. |
|
func NewMq(conf string) (MessageQueue, error) { |
|
confWf, err := config.ParseWorkflowConfig(conf) |
|
if err != nil { |
|
log.Errorf("%s[ERR] %v", mqLogPrefix, err) |
|
return nil, err |
|
} |
|
// listening address |
|
listenAddr := fmt.Sprintf("%s:%d", confWf.Host, confWf.Port) |
|
|
|
options := NewOptions() |
|
options.MqAddr = listenAddr |
|
emitter := createMessageQueueServer(confWf.Name, options, confWf) |
|
// messageQueue workflow |
|
err = emitter.configWorkflow(confWf) |
|
|
|
return emitter, err |
|
} |
|
|
|
// NewDownstreamMq create a messageQueue descriptor for downstream messageQueue. |
|
func NewDownstreamMq(name string, opts ...Option) MessageQueue { |
|
options := NewOptions(opts...) |
|
client := network.NewClient(name, network.ClientTypeUpstreamEmitter, options.ClientOptions...) |
|
|
|
return &messageQueue{ |
|
name: name, |
|
addr: options.MqAddr, |
|
client: client, |
|
} |
|
} |
|
|
|
/*************** Server ONLY ***************/ |
|
// createMessageQueueServer create a messageQueue instance as server. |
|
func createMessageQueueServer(name string, options *Options, cfg *config.WorkflowConfig) *messageQueue { |
|
// create underlying QUIC server |
|
srv := network.NewServer(name, options.ServerOptions...) |
|
z := &messageQueue{ |
|
server: srv, |
|
name: name, |
|
addr: options.MqAddr, |
|
wfc: cfg, |
|
} |
|
// initialize |
|
z.init() |
|
return z |
|
} |
|
func (z *messageQueue) Logger() log.Logger { |
|
return log.Default() |
|
} |
|
|
|
// ConfigWorkflow will read workflows from config files and register them to messageQueue. |
|
func (z *messageQueue) ConfigWorkflow(conf string) error { |
|
cfg, err := config.ParseWorkflowConfig(conf) |
|
if err != nil { |
|
log.Errorf("%s[ERR] %v", mqLogPrefix, err) |
|
return err |
|
} |
|
log.Debugf("%sConfigWorkflow cfg=%+v", mqLogPrefix, cfg) |
|
return z.configWorkflow(cfg) |
|
} |
|
|
|
func (z *messageQueue) configWorkflow(config *config.WorkflowConfig) error { |
|
z.wfc = config |
|
z.server.ConfigMetadataBuilder(metadata.DefaultBuilder()) |
|
funcList := make([]string, len(config.Functions)) |
|
for k, v := range config.Functions { |
|
funcList[k] = v.Name |
|
} |
|
z.server.ConfigRouter(router.Default(funcList)) |
|
return nil |
|
} |
|
|
|
func (z *messageQueue) ConfigMesh(url string) error { |
|
if url == "" { |
|
return nil |
|
} |
|
|
|
log.Printf("%sDownloading mesh config...", mqLogPrefix) |
|
// download mesh conf |
|
res, err := http.Get(url) |
|
if err != nil { |
|
return err |
|
} |
|
defer func() { |
|
_ = res.Body.Close() |
|
}() |
|
|
|
decoder := json.NewDecoder(res.Body) |
|
var configs []config.MeshMessageQueue |
|
err = decoder.Decode(&configs) |
|
if err != nil { |
|
log.Errorf("%s downloaded the Mesh config with err=%v", mqLogPrefix, err) |
|
return err |
|
} |
|
|
|
log.Printf("%s Successfully downloaded the Mesh config. ", mqLogPrefix) |
|
|
|
if len(configs) == 0 { |
|
return nil |
|
} |
|
|
|
for _, downstream := range configs { |
|
if downstream.Name == z.name { |
|
continue |
|
} |
|
addr := fmt.Sprintf("%s:%d", downstream.Host, downstream.Port) |
|
opts := []Option{WithMqAddr(addr)} |
|
if downstream.Credential != "" { |
|
opts = append(opts, WithCredential(downstream.Credential)) |
|
} |
|
_ = z.AddDownstreamMq(NewDownstreamMq(downstream.Name, opts...)) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// ListenAndServe will start messageQueue service. |
|
func (z *messageQueue) ListenAndServe() error { |
|
log.Debugf("%sCreating MessageQueue Server ...", mqLogPrefix) |
|
// check downstream zippers |
|
for _, ds := range z.downstreamMqs { |
|
if dsEmitter, ok := ds.(*messageQueue); ok { |
|
go func(dsEmitter *messageQueue) { |
|
_ = dsEmitter.client.Connect(context.Background(), dsEmitter.addr) |
|
z.server.AddDownstreamServer(dsEmitter.addr, dsEmitter.client) |
|
}(dsEmitter) |
|
} |
|
} |
|
return z.server.ListenAndServe(context.Background(), z.addr) |
|
} |
|
|
|
// AddDownstreamMq will add downstream messageQueue. |
|
func (z *messageQueue) AddDownstreamMq(downstream MessageQueue) error { |
|
log.Debugf("%sAddDownStreamMq: %v", mqLogPrefix, downstream) |
|
z.downstreamMqs = append(z.downstreamMqs, downstream) |
|
z.hasDownStreams = true |
|
log.Debugf("%scurrent downStreams: %d", mqLogPrefix, len(z.downstreamMqs)) |
|
return nil |
|
} |
|
|
|
// RemoveDownstreamMq remove downstream messageQueue. |
|
func (z *messageQueue) RemoveDownstreamMq(downstream MessageQueue) error { |
|
index := -1 |
|
for i, v := range z.downstreamMqs { |
|
if v.Addr() == downstream.Addr() { |
|
index = i |
|
break |
|
} |
|
} |
|
|
|
// remove from slice |
|
z.downstreamMqs = append(z.downstreamMqs[:index], z.downstreamMqs[index+1:]...) |
|
return nil |
|
} |
|
|
|
// Addr returns listen address of messageQueue. |
|
func (z *messageQueue) Addr() string { |
|
return z.addr |
|
} |
|
|
|
// Close will close a connection. If messageQueue is Server, close the server. If messageQueue is Client, close the client. |
|
func (z *messageQueue) Close() error { |
|
if z.server != nil { |
|
if err := z.server.Close(); err != nil { |
|
log.Errorf("%s Close(): %v", mqLogPrefix, err) |
|
return err |
|
} |
|
} |
|
if z.client != nil { |
|
if err := z.client.Close(); err != nil { |
|
log.Errorf("%s Close(): %v", mqLogPrefix, err) |
|
return err |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
// Stats inspects current server. |
|
func (z *messageQueue) Stats() int { |
|
log.Printf("[%s] all connections: %d", z.name, len(z.server.StatsFunctions())) |
|
for connID, name := range z.server.StatsFunctions() { |
|
log.Printf("[%s] -> ConnId=%s, Name=%s", z.name, connID, name) |
|
} |
|
|
|
log.Printf("[%s] all downstream mq connected: %d", z.name, len(z.server.DownStreams())) |
|
for k, v := range z.server.DownStreams() { |
|
log.Printf("[%s] |> [%s] %v", z.name, k, v) |
|
} |
|
|
|
log.Printf("[%s] total DataFrames received: %d", z.name, z.server.StatsCounter()) |
|
|
|
return len(z.server.StatsFunctions()) |
|
} |
|
|
|
func (z *messageQueue) InitOptions(opts ...Option) { |
|
options := &Options{MqAddr: z.addr} |
|
for _, o := range opts { |
|
o(options) |
|
} |
|
srv := network.NewServer(z.name, options.ServerOptions...) |
|
z.server = srv |
|
if z.wfc != nil { |
|
_ = z.configWorkflow(z.wfc) |
|
} |
|
}
|
|
|