package hpds_node import ( "context" "encoding/json" "fmt" "net/http" "git.hpds.cc/Component/network" "git.hpds.cc/Component/network/log" "git.hpds.cc/pavement/hpds_node/config" ) const ( mqLogPrefix = "\033[33m[hpds:mq]\033[0m " ) // MessageQueue is the orchestrator of hpds. There are two types of mq: // one is Upstream MessageQueue, which is used to connect to multiple downstream mq, // another one is Downstream MessageQueue (will call it as MessageQueue directly), which is used // to connected by `Message Queue`, `Access Point` and `Stream Function` type MessageQueue interface { // ConfigWorkflow will register workflows from config files to messageQueue. ConfigWorkflow(conf string) error // ConfigMesh will register edge-mesh config URL ConfigMesh(url string) error // ListenAndServe start mq as server. ListenAndServe() error // AddDownstreamMq will add downstream mq. AddDownstreamMq(downstream MessageQueue) error // Addr returns the listen address of mq. Addr() string // Stats return insight data Stats() int // Close will close the mq. Close() error // InitOptions initialize options InitOptions(opts ...Option) // ReadConfigFile(conf string) error // AddWorkflow(wf ...core.Workflow) error // ConfigDownstream(opts ...interface{}) error // Connect() error // RemoveDownstreamMq(downstream MessageQueue) error // ListenAddr() string } // messageQueue is the implementation of MessageQueue interface. type messageQueue struct { name string addr string hasDownStreams bool server *network.Server client *network.Client downstreamMqs []MessageQueue wfc *config.WorkflowConfig } var _ MessageQueue = &messageQueue{} // NewMqWithOptions create a messageQueue instance. func NewMqWithOptions(name string, opts ...Option) MessageQueue { options := NewOptions(opts...) zipper := createMessageQueueServer(name, options, nil) _ = zipper.ConfigMesh(options.MeshConfigURL) return zipper } // NewMq create a messageQueue instance from config files. func NewMq(conf string) (MessageQueue, error) { confWf, err := config.ParseWorkflowConfig(conf) if err != nil { log.Errorf("%s[ERR] %v", mqLogPrefix, err) return nil, err } // listening address listenAddr := fmt.Sprintf("%s:%d", confWf.Host, confWf.Port) options := NewOptions() options.MqAddr = listenAddr zipper := createMessageQueueServer(confWf.Name, options, confWf) // messageQueue workflow err = zipper.configWorkflow(confWf) return zipper, err } // NewDownstreamMq create a messageQueue descriptor for downstream messageQueue. func NewDownstreamMq(name string, opts ...Option) MessageQueue { options := NewOptions(opts...) client := network.NewClient(name, network.ClientTypeMessageQueue, options.ClientOptions...) return &messageQueue{ name: name, addr: options.MqAddr, client: client, } } /*************** Server ONLY ***************/ // createMessageQueueServer create a messageQueue instance as server. func createMessageQueueServer(name string, options *Options, cfg *config.WorkflowConfig) *messageQueue { // create underlying QUIC server srv := network.NewServer(name, options.ServerOptions...) z := &messageQueue{ server: srv, name: name, addr: options.MqAddr, wfc: cfg, } // initialize z.init() return z } // ConfigWorkflow will read workflows from config files and register them to messageQueue. func (z *messageQueue) ConfigWorkflow(conf string) error { cfg, err := config.ParseWorkflowConfig(conf) if err != nil { log.Errorf("%s[ERR] %v", mqLogPrefix, err) return err } log.Debugf("%sConfigWorkflow cfg=%+v", mqLogPrefix, cfg) return z.configWorkflow(cfg) } func (z *messageQueue) configWorkflow(config *config.WorkflowConfig) error { z.wfc = config z.server.ConfigMetadataBuilder(newMetadataBuilder()) z.server.ConfigRouter(newRouter(config.Functions)) return nil } func (z *messageQueue) ConfigMesh(url string) error { if url == "" { return nil } log.Printf("%sDownloading mesh config...", mqLogPrefix) // download mesh conf res, err := http.Get(url) if err != nil { return err } defer func() { _ = res.Body.Close() }() decoder := json.NewDecoder(res.Body) var configs []config.MeshMessageQueue err = decoder.Decode(&configs) if err != nil { log.Errorf("%s downloaded the Mesh config with err=%v", mqLogPrefix, err) return err } log.Printf("%s Successfully downloaded the Mesh config. ", mqLogPrefix) if len(configs) == 0 { return nil } for _, downstream := range configs { if downstream.Name == z.name { continue } addr := fmt.Sprintf("%s:%d", downstream.Host, downstream.Port) opts := []Option{WithMqAddr(addr)} if downstream.Credential != "" { opts = append(opts, WithCredential(downstream.Credential)) } _ = z.AddDownstreamMq(NewDownstreamMq(downstream.Name, opts...)) } return nil } // ListenAndServe will start messageQueue service. func (z *messageQueue) ListenAndServe() error { log.Debugf("%sCreating MessageQueue Server ...", mqLogPrefix) // check downstream zippers for _, ds := range z.downstreamMqs { if dsZipper, ok := ds.(*messageQueue); ok { go func(dsZipper *messageQueue) { _ = dsZipper.client.Connect(context.Background(), dsZipper.addr) z.server.AddDownstreamServer(dsZipper.addr, dsZipper.client) }(dsZipper) } } return z.server.ListenAndServe(context.Background(), z.addr) } // AddDownstreamMq will add downstream messageQueue. func (z *messageQueue) AddDownstreamMq(downstream MessageQueue) error { log.Debugf("%sAddDownStreamMq: %v", mqLogPrefix, downstream) z.downstreamMqs = append(z.downstreamMqs, downstream) z.hasDownStreams = true log.Debugf("%scurrent downStreams: %d", mqLogPrefix, len(z.downstreamMqs)) return nil } // RemoveDownstreamMq remove downstream messageQueue. func (z *messageQueue) RemoveDownstreamMq(downstream MessageQueue) error { index := -1 for i, v := range z.downstreamMqs { if v.Addr() == downstream.Addr() { index = i break } } // remove from slice z.downstreamMqs = append(z.downstreamMqs[:index], z.downstreamMqs[index+1:]...) return nil } // Addr returns listen address of messageQueue. func (z *messageQueue) Addr() string { return z.addr } // Close will close a connection. If messageQueue is Server, close the server. If messageQueue is Client, close the client. func (z *messageQueue) Close() error { if z.server != nil { if err := z.server.Close(); err != nil { log.Errorf("%s Close(): %v", mqLogPrefix, err) return err } } if z.client != nil { if err := z.client.Close(); err != nil { log.Errorf("%s Close(): %v", mqLogPrefix, err) return err } } return nil } // Stats inspects current server. func (z *messageQueue) Stats() int { log.Printf("[%s] all connections: %d", z.name, len(z.server.StatsFunctions())) for connID, name := range z.server.StatsFunctions() { log.Printf("[%s] -> ConnId=%s, Name=%s", z.name, connID, name) } log.Printf("[%s] all downstream mq connected: %d", z.name, len(z.server.DownStreams())) for k, v := range z.server.DownStreams() { log.Printf("[%s] |> [%s] %s", z.name, k, v.ServerAddr()) } log.Printf("[%s] total DataFrames received: %d", z.name, z.server.StatsCounter()) return len(z.server.StatsFunctions()) } func (z *messageQueue) InitOptions(opts ...Option) { options := &Options{MqAddr: z.addr} for _, o := range opts { o(options) } srv := network.NewServer(z.name, options.ServerOptions...) z.server = srv if z.wfc != nil { _ = z.configWorkflow(z.wfc) } }