1、修改bug,如果连接报错,就会拒绝服务的问题
This commit is contained in:
parent
44824a7e9f
commit
ff095c2312
103
router.go
103
router.go
|
@ -1,103 +0,0 @@
|
||||||
package hpds_node
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.hpds.cc/Component/network"
|
|
||||||
"git.hpds.cc/pavement/hpds_node/config"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type router struct {
|
|
||||||
r *route
|
|
||||||
}
|
|
||||||
|
|
||||||
func newRouter(functions []config.App) network.Router {
|
|
||||||
return &router{r: newRoute(functions)}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *router) Route(metadata network.Metadata) network.Route {
|
|
||||||
return r.r
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *router) Clean() {
|
|
||||||
r.r = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type route struct {
|
|
||||||
functions []config.App
|
|
||||||
data map[byte]map[string]string
|
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func newRoute(functions []config.App) *route {
|
|
||||||
return &route{
|
|
||||||
functions: functions,
|
|
||||||
data: make(map[byte]map[string]string),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *route) Add(connId string, name string, observeDataTags []byte) (err error) {
|
|
||||||
r.mu.Lock()
|
|
||||||
defer r.mu.Unlock()
|
|
||||||
|
|
||||||
ok := false
|
|
||||||
for _, v := range r.functions {
|
|
||||||
if v.Name == name {
|
|
||||||
ok = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
//return fmt.Errorf("SFN[%s] does not exist in config functions", name)
|
|
||||||
//如果不存在,自动增加
|
|
||||||
item := config.App{
|
|
||||||
Name: name,
|
|
||||||
}
|
|
||||||
r.functions = append(r.functions, item)
|
|
||||||
}
|
|
||||||
//去除只能订阅一次的问题
|
|
||||||
//LOOP:
|
|
||||||
//for _, connects := range r.data {
|
|
||||||
// for connKey, n := range connects {
|
|
||||||
// if n == name {
|
|
||||||
// err = hpds_err.NewDuplicateNameError(connKey, fmt.Errorf("node:Stream Function[%s] is already linked to another connection", name))
|
|
||||||
// delete(connects, connKey)
|
|
||||||
// break LOOP
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
for _, tag := range observeDataTags {
|
|
||||||
connects := r.data[tag]
|
|
||||||
if connects == nil {
|
|
||||||
connects = make(map[string]string)
|
|
||||||
r.data[tag] = connects
|
|
||||||
}
|
|
||||||
r.data[tag][connId] = name
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *route) Remove(connId string) error {
|
|
||||||
r.mu.Lock()
|
|
||||||
defer r.mu.Unlock()
|
|
||||||
|
|
||||||
for _, connects := range r.data {
|
|
||||||
delete(connects, connId)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *route) GetForwardRoutes(tag byte) []string {
|
|
||||||
r.mu.RLock()
|
|
||||||
defer r.mu.RUnlock()
|
|
||||||
|
|
||||||
var keys []string
|
|
||||||
if connects := r.data[tag]; connects != nil {
|
|
||||||
for k := range connects {
|
|
||||||
keys = append(keys, k)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return keys
|
|
||||||
}
|
|
Loading…
Reference in New Issue