hpds_node/router.go

104 lines
1.9 KiB
Go
Raw Permalink Normal View History

2022-10-12 11:55:36 +08:00
package hpds_node
import (
"git.hpds.cc/Component/network"
"git.hpds.cc/pavement/hpds_node/config"
"sync"
)
type router struct {
r *route
}
func newRouter(functions []config.App) network.Router {
return &router{r: newRoute(functions)}
}
func (r *router) Route(metadata network.Metadata) network.Route {
return r.r
}
func (r *router) Clean() {
r.r = nil
}
type route struct {
functions []config.App
data map[byte]map[string]string
mu sync.RWMutex
}
func newRoute(functions []config.App) *route {
return &route{
functions: functions,
data: make(map[byte]map[string]string),
}
}
func (r *route) Add(connId string, name string, observeDataTags []byte) (err error) {
r.mu.Lock()
defer r.mu.Unlock()
ok := false
for _, v := range r.functions {
if v.Name == name {
ok = true
break
}
}
if !ok {
2023-03-24 08:49:01 +08:00
//return fmt.Errorf("SFN[%s] does not exist in config functions", name)
//如果不存在,自动增加
item := config.App{
Name: name,
}
r.functions = append(r.functions, item)
2022-10-12 11:55:36 +08:00
}
2023-03-07 17:48:26 +08:00
//去除只能订阅一次的问题
//LOOP:
//for _, connects := range r.data {
// for connKey, n := range connects {
// if n == name {
// err = hpds_err.NewDuplicateNameError(connKey, fmt.Errorf("node:Stream Function[%s] is already linked to another connection", name))
// delete(connects, connKey)
// break LOOP
// }
// }
//}
2022-10-12 11:55:36 +08:00
for _, tag := range observeDataTags {
connects := r.data[tag]
if connects == nil {
connects = make(map[string]string)
r.data[tag] = connects
}
r.data[tag][connId] = name
}
return err
}
func (r *route) Remove(connId string) error {
r.mu.Lock()
defer r.mu.Unlock()
for _, connects := range r.data {
delete(connects, connId)
}
return nil
}
func (r *route) GetForwardRoutes(tag byte) []string {
r.mu.RLock()
defer r.mu.RUnlock()
var keys []string
if connects := r.data[tag]; connects != nil {
for k := range connects {
keys = append(keys, k)
}
}
return keys
}