package hpds_node import ( "fmt" "git.hpds.cc/Component/network" "git.hpds.cc/pavement/hpds_node/config" "sync" ) type router struct { r *route } func newRouter(functions []config.App) network.Router { return &router{r: newRoute(functions)} } func (r *router) Route(metadata network.Metadata) network.Route { return r.r } func (r *router) Clean() { r.r = nil } type route struct { functions []config.App data map[byte]map[string]string mu sync.RWMutex } func newRoute(functions []config.App) *route { return &route{ functions: functions, data: make(map[byte]map[string]string), } } func (r *route) Add(connId string, name string, observeDataTags []byte) (err error) { r.mu.Lock() defer r.mu.Unlock() ok := false for _, v := range r.functions { if v.Name == name { ok = true break } } if !ok { return fmt.Errorf("SFN[%s] does not exist in config functions", name) } //去除只能订阅一次的问题 //LOOP: //for _, connects := range r.data { // for connKey, n := range connects { // if n == name { // err = hpds_err.NewDuplicateNameError(connKey, fmt.Errorf("node:Stream Function[%s] is already linked to another connection", name)) // delete(connects, connKey) // break LOOP // } // } //} for _, tag := range observeDataTags { connects := r.data[tag] if connects == nil { connects = make(map[string]string) r.data[tag] = connects } r.data[tag][connId] = name } return err } func (r *route) Remove(connId string) error { r.mu.Lock() defer r.mu.Unlock() for _, connects := range r.data { delete(connects, connId) } return nil } func (r *route) GetForwardRoutes(tag byte) []string { r.mu.RLock() defer r.mu.RUnlock() var keys []string if connects := r.data[tag]; connects != nil { for k := range connects { keys = append(keys, k) } } return keys }