Compare commits
3 Commits
Author | SHA1 | Date |
---|---|---|
wangjian | bf7300c92a | |
wangjian | a4ea724bab | |
wangjian | abb93c020f |
22
client.go
22
client.go
|
@ -42,7 +42,7 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien
|
||||||
clientId := id.New()
|
clientId := id.New()
|
||||||
|
|
||||||
if option.credential != nil {
|
if option.credential != nil {
|
||||||
log.Infof("use credential, credential_name: %s;", option.credential.Name())
|
log.Infof("%s use credential, credential_name: %s;", ClientLogPrefix, option.credential.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, ctxCancel := context.WithCancel(context.Background())
|
ctx, ctxCancel := context.WithCancel(context.Background())
|
||||||
|
@ -56,7 +56,7 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien
|
||||||
cli.streamType = connType
|
cli.streamType = connType
|
||||||
cli.opts = option
|
cli.opts = option
|
||||||
cli.errorfn = func(err error) {
|
cli.errorfn = func(err error) {
|
||||||
cli.logger.Errorf("client err, %s", err)
|
cli.logger.Errorf("%s client err, %s", ClientLogPrefix, err)
|
||||||
}
|
}
|
||||||
cli.writeFrameChan = make(chan frame.Frame)
|
cli.writeFrameChan = make(chan frame.Frame)
|
||||||
cli.shutdownChan = make(chan error, 1)
|
cli.shutdownChan = make(chan error, 1)
|
||||||
|
@ -69,7 +69,7 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien
|
||||||
func (c *Client) Connect(ctx context.Context, addr string) error {
|
func (c *Client) Connect(ctx context.Context, addr string) error {
|
||||||
controlStream, dataStream, err := c.openStream(ctx, addr)
|
controlStream, dataStream, err := c.openStream(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("connect error, %s", err)
|
c.logger.Errorf("%s connect error, %s", ClientLogPrefix, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +96,7 @@ func (c *Client) runBackground(ctx context.Context, addr string, controlStream C
|
||||||
var err error
|
var err error
|
||||||
controlStream, dataStream, err = c.openStream(ctx, addr)
|
controlStream, dataStream, err = c.openStream(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("client reconnect error, %s", err)
|
c.logger.Errorf("%s client reconnect error, %s", ClientLogPrefix, err)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
goto RECONNECT
|
goto RECONNECT
|
||||||
}
|
}
|
||||||
|
@ -115,7 +115,7 @@ func (c *Client) cleanStream(controlStream ClientControlStream, err error) {
|
||||||
errString := ""
|
errString := ""
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errString = err.Error()
|
errString = err.Error()
|
||||||
c.logger.Errorf("client cancel with error, %s", err)
|
c.logger.Errorf("%s client cancel with error, %s", ClientLogPrefix, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// controlStream is nil represents that client is not connected.
|
// controlStream is nil represents that client is not connected.
|
||||||
|
@ -255,18 +255,18 @@ func (c *Client) handleFrame(f frame.Frame) {
|
||||||
switch ff := f.(type) {
|
switch ff := f.(type) {
|
||||||
case *frame.DataFrame:
|
case *frame.DataFrame:
|
||||||
if c.processor == nil {
|
if c.processor == nil {
|
||||||
c.logger.Warnf("client processor has not been set")
|
c.logger.Warnf("%s client processor has not been set", ClientLogPrefix)
|
||||||
} else {
|
} else {
|
||||||
c.processor(ff)
|
c.processor(ff)
|
||||||
}
|
}
|
||||||
case *frame.BackFlowFrame:
|
case *frame.BackFlowFrame:
|
||||||
if c.receiver == nil {
|
if c.receiver == nil {
|
||||||
c.logger.Warnf("client receiver has not been set")
|
c.logger.Warnf("%s client receiver has not been set", ClientLogPrefix)
|
||||||
} else {
|
} else {
|
||||||
c.receiver(ff)
|
c.receiver(ff)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
c.logger.Warnf("client data stream receive unexcepted frame, frame_type: %v", f)
|
c.logger.Warnf("%s client data stream receive unexcepted frame, frame_type: %v", ClientLogPrefix, f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -296,17 +296,17 @@ func (c *Client) receivingStreamClose(controlStream ControlStream, dataStream Da
|
||||||
// SetDataFrameObserver sets the data frame handler.
|
// SetDataFrameObserver sets the data frame handler.
|
||||||
func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) {
|
func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) {
|
||||||
c.processor = fn
|
c.processor = fn
|
||||||
c.logger.Debugf("SetDataFrameObserver")
|
c.logger.Debugf("%s SetDataFrameObserver", ClientLogPrefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetBackFlowFrameObserver sets the backflow frame handler.
|
// SetBackFlowFrameObserver sets the backflow frame handler.
|
||||||
func (c *Client) SetBackFlowFrameObserver(fn func(*frame.BackFlowFrame)) {
|
func (c *Client) SetBackFlowFrameObserver(fn func(*frame.BackFlowFrame)) {
|
||||||
c.receiver = fn
|
c.receiver = fn
|
||||||
c.logger.Debugf("SetBackFlowFrameObserver")
|
c.logger.Debugf("%s SetBackFlowFrameObserver", ClientLogPrefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetObserveDataTags set the data tag list that will be observed.
|
// SetObserveDataTags set the data tag list that will be observed.
|
||||||
// Deprecated: use yomo.WithObserveDataTags instead
|
// Deprecated: use network.WithObserveDataTags instead
|
||||||
func (c *Client) SetObserveDataTags(tag ...frame.Tag) {
|
func (c *Client) SetObserveDataTags(tag ...frame.Tag) {
|
||||||
c.opts.observeDataTags = append(c.opts.observeDataTags, tag...)
|
c.opts.observeDataTags = append(c.opts.observeDataTags, tag...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ func (c *Connector) Add(streamId string, stream DataStream) error {
|
||||||
|
|
||||||
c.streams.Store(streamId, stream)
|
c.streams.Store(streamId, stream)
|
||||||
|
|
||||||
log.Debugf("Connector add stream, stream_id: %s", streamId)
|
log.Debugf("%s Connector add stream, stream_id: %s", ParseFrameLogPrefix, streamId)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,7 +55,7 @@ func (c *Connector) Remove(streamId string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
c.streams.Delete(streamId)
|
c.streams.Delete(streamId)
|
||||||
log.Debugf("Connector remove stream, stream_id: %s", streamId)
|
log.Debugf("%s Connector remove stream, stream_id: %s", ParseFrameLogPrefix, streamId)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,13 @@ const (
|
||||||
ConnStateClosed ConnState = "Closed"
|
ConnStateClosed ConnState = "Closed"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Prefix is the prefix for logger.
|
||||||
|
const (
|
||||||
|
ClientLogPrefix = "\033[36m[network:client]\033[0m "
|
||||||
|
ServerLogPrefix = "\033[32m[network:server]\033[0m "
|
||||||
|
ParseFrameLogPrefix = "\033[36m[network:stream_parser]\033[0m "
|
||||||
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
rand.Seed(time.Now().Unix())
|
rand.Seed(time.Now().Unix())
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,7 @@ func newContext(dataStream DataStream) (c *Context) {
|
||||||
c = v.(*Context)
|
c = v.(*Context)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("stream_id: %s; stream_name: %s; stream_type: %s;", dataStream.ID(),
|
log.Infof("%s stream_id: %s; stream_name: %s; stream_type: %s;", ClientLogPrefix, dataStream.ID(),
|
||||||
dataStream.Name(), dataStream.StreamType().String(),
|
dataStream.Name(), dataStream.StreamType().String(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -105,13 +105,13 @@ func (c *Context) WithFrame(f frame.Frame) {
|
||||||
//
|
//
|
||||||
// TODO: ycode is not be transmitted.
|
// TODO: ycode is not be transmitted.
|
||||||
func (c *Context) CloseWithError(hCode hpds_err.ErrorCode, errString string) {
|
func (c *Context) CloseWithError(hCode hpds_err.ErrorCode, errString string) {
|
||||||
log.Warnf("Stream Close With error", "err_code", hCode.String(), "error", errString)
|
log.Warnf("%s Stream Close With error, err_code: %s; error: %s", ClientLogPrefix, hCode.String(), errString)
|
||||||
|
|
||||||
err := c.DataStream.CloseWithError(errString)
|
err := c.DataStream.CloseWithError(errString)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Errorf("Close DataStream error", err)
|
log.Errorf("%s Close DataStream error %v", ClientLogPrefix, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean cleans the Context,
|
// Clean cleans the Context,
|
||||||
|
|
|
@ -120,7 +120,7 @@ func logFormat() string {
|
||||||
|
|
||||||
func logLevel() Level {
|
func logLevel() Level {
|
||||||
envLevel := strings.ToLower(os.Getenv("HPDS_LOG_LEVEL"))
|
envLevel := strings.ToLower(os.Getenv("HPDS_LOG_LEVEL"))
|
||||||
level := ErrorLevel
|
level := DebugLevel
|
||||||
switch envLevel {
|
switch envLevel {
|
||||||
case "debug":
|
case "debug":
|
||||||
return DebugLevel
|
return DebugLevel
|
||||||
|
|
|
@ -2,11 +2,9 @@
|
||||||
package router
|
package router
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.hpds.cc/Component/network/frame"
|
"git.hpds.cc/Component/network/frame"
|
||||||
herr "git.hpds.cc/Component/network/hpds_err"
|
|
||||||
"git.hpds.cc/Component/network/metadata"
|
"git.hpds.cc/Component/network/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -61,19 +59,23 @@ func (r *defaultRoute) Add(connId string, name string, observeDataTags []frame.T
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("SFN[%s] does not exist in config functions", name)
|
//return fmt.Errorf("SFN[%s] does not exist in config functions", name)
|
||||||
|
//item := config.App{
|
||||||
|
// Name: name,
|
||||||
|
//}
|
||||||
|
r.functions = append(r.functions, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
LOOP:
|
//LOOP:
|
||||||
for _, conn := range r.data {
|
//for _, conn := range r.data {
|
||||||
for connId, n := range conn {
|
// for connId, n := range conn {
|
||||||
if n == name {
|
// if n == name {
|
||||||
err = herr.NewDuplicateNameError(connId, fmt.Errorf("SFN[%s] is already linked to another connection", name))
|
// err = herr.NewDuplicateNameError(connId, fmt.Errorf("SFN[%s] is already linked to another connection", name))
|
||||||
delete(conn, connId)
|
// delete(conn, connId)
|
||||||
break LOOP
|
// break LOOP
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
//}
|
||||||
|
|
||||||
for _, tag := range observeDataTags {
|
for _, tag := range observeDataTags {
|
||||||
conn := r.data[tag]
|
conn := r.data[tag]
|
||||||
|
|
76
server.go
76
server.go
|
@ -42,6 +42,7 @@ type Server struct {
|
||||||
afterHandlers []FrameHandler
|
afterHandlers []FrameHandler
|
||||||
connectionCloseHandlers []ConnectionHandler
|
connectionCloseHandlers []ConnectionHandler
|
||||||
listener Listener
|
listener Listener
|
||||||
|
logger log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer create a Server instance.
|
// NewServer create a Server instance.
|
||||||
|
@ -85,21 +86,21 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error {
|
||||||
if err := s.validateRouter(); err != nil {
|
if err := s.validateRouter(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
s.connector = NewConnector(ctx)
|
||||||
// listen the address
|
// listen the address
|
||||||
listener, err := newListener(conn, s.opts.tlsConfig, s.opts.quicConfig)
|
listener, err := newListener(conn, s.opts.tlsConfig, s.opts.quicConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("listener.Listen: err=%v", err)
|
log.Errorf("%s listener.Listen: err=%v", ServerLogPrefix, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.listener = listener
|
s.listener = listener
|
||||||
|
|
||||||
log.Printf("[%s][%d] Listening on: %s, QUIC: %v, AUTH: %s", s.name, os.Getpid(), listener.Addr(), listener.Versions(), s.authNames())
|
log.Printf("%s [%s][%d] Listening on: %s, QUIC: %v, AUTH: %s", ServerLogPrefix, s.name, os.Getpid(), listener.Addr(), listener.Versions(), s.authNames())
|
||||||
|
|
||||||
for {
|
for {
|
||||||
conn, err := s.listener.Accept(ctx)
|
conn, err := s.listener.Accept(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("listener accept connections error", err)
|
log.Errorf("%s listener accept connections error %v", ServerLogPrefix, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = s.opts.alpnHandler(conn.ConnectionState().TLS.NegotiatedProtocol)
|
err = s.opts.alpnHandler(conn.ConnectionState().TLS.NegotiatedProtocol)
|
||||||
|
@ -119,10 +120,10 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error {
|
||||||
// It response to client a AuthenticationAckFrame.
|
// It response to client a AuthenticationAckFrame.
|
||||||
err = controlStream.VerifyAuthentication(s.handleAuthenticationFrame)
|
err = controlStream.VerifyAuthentication(s.handleAuthenticationFrame)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("Authentication Failed", "error", err)
|
log.Warnf("%s Authentication Failed, error: %s", ServerLogPrefix, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Debugf("Authentication Success")
|
log.Debugf("%s Authentication Success", ServerLogPrefix)
|
||||||
|
|
||||||
go func(qConn quic.Connection) {
|
go func(qConn quic.Connection) {
|
||||||
streamGroup := NewStreamGroup(ctx, controlStream, s.connector)
|
streamGroup := NewStreamGroup(ctx, controlStream, s.connector)
|
||||||
|
@ -134,7 +135,7 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case err := <-s.runWithStreamGroup(streamGroup):
|
case err := <-s.runWithStreamGroup(streamGroup):
|
||||||
log.Errorf("Client Close, %v", err)
|
log.Errorf("%s Client Close, %v", ServerLogPrefix, err)
|
||||||
}
|
}
|
||||||
}(conn)
|
}(conn)
|
||||||
}
|
}
|
||||||
|
@ -183,8 +184,8 @@ func (s *Server) handleRoute(c *Context) error {
|
||||||
if e, ok := err.(hpds_err.DuplicateNameError); ok {
|
if e, ok := err.(hpds_err.DuplicateNameError); ok {
|
||||||
existsConnId := e.ConnId()
|
existsConnId := e.ConnId()
|
||||||
|
|
||||||
log.Debugf("StreamFunction Duplicate Name, error: %s; sfn_name: %s, old_stream_id: %s; current_stream_id: %s",
|
log.Debugf("%s StreamFunction Duplicate Name, error: %s; sfn_name: %s, old_stream_id: %s; current_stream_id: %s",
|
||||||
e.Error(), c.DataStream.Name(), existsConnId, c.StreamId())
|
ServerLogPrefix, e.Error(), c.DataStream.Name(), existsConnId, c.StreamId())
|
||||||
|
|
||||||
stream, ok, err := s.connector.Get(existsConnId)
|
stream, ok, err := s.connector.Get(existsConnId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -214,12 +215,11 @@ func (s *Server) handleStreamContext(c *Context) {
|
||||||
// start frame handlers
|
// start frame handlers
|
||||||
for _, handler := range s.startHandlers {
|
for _, handler := range s.startHandlers {
|
||||||
if err := handler(c); err != nil {
|
if err := handler(c); err != nil {
|
||||||
log.Errorf("startHandlers error: %v", err)
|
log.Errorf("%s startHandlers error: %v", ServerLogPrefix, err)
|
||||||
c.CloseWithError(hpds_err.ErrorCodeStartHandler, err.Error())
|
c.CloseWithError(hpds_err.ErrorCodeStartHandler, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check update for stream
|
// check update for stream
|
||||||
for {
|
for {
|
||||||
f, err := c.DataStream.ReadFrame()
|
f, err := c.DataStream.ReadFrame()
|
||||||
|
@ -228,26 +228,26 @@ func (s *Server) handleStreamContext(c *Context) {
|
||||||
if e, ok := err.(*quic.ApplicationError); ok {
|
if e, ok := err.(*quic.ApplicationError); ok {
|
||||||
if hpds_err.Is(e.ErrorCode, hpds_err.ErrorCodeClientAbort) {
|
if hpds_err.Is(e.ErrorCode, hpds_err.ErrorCodeClientAbort) {
|
||||||
// client abort
|
// client abort
|
||||||
log.Infof("client close the connection")
|
log.Infof("%s client close the connection", ServerLogPrefix)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
he := hpds_err.New(hpds_err.Parse(e.ErrorCode), err)
|
he := hpds_err.New(hpds_err.Parse(e.ErrorCode), err)
|
||||||
log.Errorf("read frame error: %v", he)
|
log.Errorf("%s read frame error: %v", ServerLogPrefix, he)
|
||||||
} else if err == io.EOF {
|
} else if err == io.EOF {
|
||||||
log.Infof("connection EOF")
|
log.Infof("%s connection EOF", ServerLogPrefix)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if errors.Is(err, net.ErrClosed) {
|
if errors.Is(err, net.ErrClosed) {
|
||||||
// if client close the connection, net.ErrClosed will be raise
|
// if client close the connection, net.ErrClosed will be raise
|
||||||
// by quic-go IdleTimeoutError after connection's KeepAlive config.
|
// by quic-go IdleTimeoutError after connection's KeepAlive config.
|
||||||
log.Warnf("connection error, error: %v", net.ErrClosed)
|
log.Warnf("%s connection error, error: %v", ServerLogPrefix, net.ErrClosed)
|
||||||
c.CloseWithError(hpds_err.ErrorCodeClosed, "net.ErrClosed")
|
c.CloseWithError(hpds_err.ErrorCodeClosed, "net.ErrClosed")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// any error occurred, we should close the stream
|
// any error occurred, we should close the stream
|
||||||
// after this, conn.AcceptStream() will raise the error
|
// after this, conn.AcceptStream() will raise the error
|
||||||
c.CloseWithError(hpds_err.ErrorCodeUnknown, err.Error())
|
c.CloseWithError(hpds_err.ErrorCodeUnknown, err.Error())
|
||||||
log.Warnf("connection close")
|
log.Warnf("%s connection close", ServerLogPrefix)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,21 +257,21 @@ func (s *Server) handleStreamContext(c *Context) {
|
||||||
// before frame handlers
|
// before frame handlers
|
||||||
for _, handler := range s.beforeHandlers {
|
for _, handler := range s.beforeHandlers {
|
||||||
if err := handler(c); err != nil {
|
if err := handler(c); err != nil {
|
||||||
log.Errorf("beforeFrameHandler error: %v", err)
|
log.Errorf("%s beforeFrameHandler error: %v", ServerLogPrefix, err)
|
||||||
c.CloseWithError(hpds_err.ErrorCodeBeforeHandler, err.Error())
|
c.CloseWithError(hpds_err.ErrorCodeBeforeHandler, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// main handler
|
// main handler
|
||||||
if err := s.mainFrameHandler(c); err != nil {
|
if err := s.mainFrameHandler(c); err != nil {
|
||||||
log.Errorf("mainFrameHandler error: %v", err)
|
log.Errorf("%s mainFrameHandler error: %v", ServerLogPrefix, err)
|
||||||
c.CloseWithError(hpds_err.ErrorCodeMainHandler, err.Error())
|
c.CloseWithError(hpds_err.ErrorCodeMainHandler, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// after frame handler
|
// after frame handler
|
||||||
for _, handler := range s.afterHandlers {
|
for _, handler := range s.afterHandlers {
|
||||||
if err := handler(c); err != nil {
|
if err := handler(c); err != nil {
|
||||||
log.Errorf("afterFrameHandler error: %v", err)
|
log.Errorf("%s afterFrameHandler error: %v", ServerLogPrefix, err)
|
||||||
c.CloseWithError(hpds_err.ErrorCodeAfterHandler, err.Error())
|
c.CloseWithError(hpds_err.ErrorCodeAfterHandler, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -293,7 +293,7 @@ func (s *Server) mainFrameHandler(c *Context) error {
|
||||||
_ = s.handleBackFlowFrame(c)
|
_ = s.handleBackFlowFrame(c)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
log.Errorf("err=%v, frame=%v", err, frame.Shortly(c.Frame.Encode()))
|
log.Errorf("%s err=%v, frame=%v", ServerLogPrefix, err, frame.Shortly(c.Frame.Encode()))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -302,9 +302,9 @@ func (s *Server) handleAuthenticationFrame(f auth.Object) (bool, error) {
|
||||||
ok := auth.Authenticate(s.opts.auths, f)
|
ok := auth.Authenticate(s.opts.auths, f)
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
log.Debugf("Successful authentication")
|
log.Debugf("%s Successful authentication", ServerLogPrefix)
|
||||||
} else {
|
} else {
|
||||||
log.Warnf("Authentication failed", "credential", f.AuthName())
|
log.Warnf("%s Authentication failed, credential: %s", ServerLogPrefix, f.AuthName())
|
||||||
}
|
}
|
||||||
|
|
||||||
return ok, nil
|
return ok, nil
|
||||||
|
@ -319,7 +319,7 @@ func (s *Server) handleDataFrame(c *Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warnf("handleDataFrame connector cannot find, from_conn_id: %s", fromId)
|
log.Warnf("%s handleDataFrame connector cannot find, from_conn_id: %s", ServerLogPrefix, fromId)
|
||||||
return fmt.Errorf("handleDataFrame connector cannot find %s", fromId)
|
return fmt.Errorf("handleDataFrame connector cannot find %s", fromId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -340,14 +340,14 @@ func (s *Server) handleDataFrame(c *Context) error {
|
||||||
// route
|
// route
|
||||||
route := s.router.Route(m)
|
route := s.router.Route(m)
|
||||||
if route == nil {
|
if route == nil {
|
||||||
log.Warnf("handleDataFrame route is nil")
|
log.Warnf("%s handleDataFrame route is nil", ServerLogPrefix)
|
||||||
return fmt.Errorf("handleDataFrame route is nil")
|
return fmt.Errorf("handleDataFrame route is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
// get stream function connection ids from route
|
// get stream function connection ids from route
|
||||||
connIDs := route.GetForwardRoutes(f.GetDataTag())
|
connIDs := route.GetForwardRoutes(f.GetDataTag())
|
||||||
|
|
||||||
log.Debugf("Data Routing Status, sfn_stream_ids: %v; connector: %v", connIDs, s.connector.GetSnapshot())
|
log.Debugf("%s Data Routing Status, sfn_stream_ids: %v; connector: %v", ServerLogPrefix, connIDs, s.connector.GetSnapshot())
|
||||||
|
|
||||||
for _, toId := range connIDs {
|
for _, toId := range connIDs {
|
||||||
conn, ok, err := s.connector.Get(toId)
|
conn, ok, err := s.connector.Get(toId)
|
||||||
|
@ -355,17 +355,17 @@ func (s *Server) handleDataFrame(c *Context) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Errorf("Can't find forward conn, error: conn is nil ;forward_conn_id: ", toId)
|
log.Errorf("%s Can't find forward conn, error: conn is nil ;forward_conn_id: ", ServerLogPrefix, toId)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
to := conn.Name()
|
to := conn.Name()
|
||||||
log.Infof("handleDataFrame, from_conn_name: %s; from_conn_id: %s; to_conn_name: %s; to_conn_id: %s; data_frame: %s",
|
log.Infof("%s handleDataFrame, from_conn_name: %s; from_conn_id: %s; to_conn_name: %s; to_conn_id: %s; data_frame: %s",
|
||||||
from.Name(), fromId, to, toId, f.String())
|
ServerLogPrefix, from.Name(), fromId, to, toId, f.String())
|
||||||
|
|
||||||
// write data frame to stream
|
// write data frame to stream
|
||||||
if err := conn.WriteFrame(f); err != nil {
|
if err := conn.WriteFrame(f); err != nil {
|
||||||
log.Errorf("handleDataFrame conn.Write, %v", err)
|
log.Errorf("%s handleDataFrame conn.Write, %v", ServerLogPrefix, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -386,9 +386,9 @@ func (s *Server) handleBackFlowFrame(c *Context) error {
|
||||||
// logger.Printf("%s handleBackFlowFrame tag:%#v --> source:%s, result=%s", ServerLogPrefix, tag, sourceId, carriage)
|
// logger.Printf("%s handleBackFlowFrame tag:%#v --> source:%s, result=%s", ServerLogPrefix, tag, sourceId, carriage)
|
||||||
for _, source := range sourceConnList {
|
for _, source := range sourceConnList {
|
||||||
if source != nil {
|
if source != nil {
|
||||||
log.Debugf("handleBackFlowFrame tag:%#v; source_conn_id: %s, back_flow_frame: %s", tag, sourceId, f.String())
|
log.Debugf("%s handleBackFlowFrame tag:%#v; source_conn_id: %s, back_flow_frame: %s", ServerLogPrefix, tag, sourceId, f.String())
|
||||||
if err := source.WriteFrame(bf); err != nil {
|
if err := source.WriteFrame(bf); err != nil {
|
||||||
log.Errorf("handleBackFlowFrame tag:%#v --> Protocol Gateway:%s, error=%v", tag, sourceId, err)
|
log.Errorf("%s handleBackFlowFrame tag:%#v --> Protocol Gateway:%s, error=%v", ServerLogPrefix, tag, sourceId, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -415,7 +415,7 @@ func (s *Server) DownStreams() map[string]frame.Writer {
|
||||||
func (s *Server) ConfigRouter(router router.Router) {
|
func (s *Server) ConfigRouter(router router.Router) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.router = router
|
s.router = router
|
||||||
log.Debugf("config router is %#v", router)
|
log.Debugf("%s config router is %#v", ServerLogPrefix, router)
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -424,7 +424,7 @@ func (s *Server) ConfigRouter(router router.Router) {
|
||||||
func (s *Server) ConfigMetadataBuilder(builder metadata.Builder) {
|
func (s *Server) ConfigMetadataBuilder(builder metadata.Builder) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.metadataBuilder = builder
|
s.metadataBuilder = builder
|
||||||
log.Debugf("config metadataBuilder is %#v", builder)
|
log.Debugf("%s config metadataBuilder is %#v", ServerLogPrefix, builder)
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -432,7 +432,7 @@ func (s *Server) ConfigMetadataBuilder(builder metadata.Builder) {
|
||||||
func (s *Server) ConfigAlpnHandler(h func(string) error) {
|
func (s *Server) ConfigAlpnHandler(h func(string) error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.opts.alpnHandler = h
|
s.opts.alpnHandler = h
|
||||||
log.Debugf("config alpnHandler")
|
log.Debugf("%s config alpnHandler", ServerLogPrefix)
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -448,11 +448,11 @@ func (s *Server) AddDownstreamServer(addr string, c *Client) {
|
||||||
func (s *Server) dispatchToDownStreams(c *Context) {
|
func (s *Server) dispatchToDownStreams(c *Context) {
|
||||||
stream, ok, err := s.connector.Get(c.StreamId())
|
stream, ok, err := s.connector.Get(c.StreamId())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Connector Get Error, %v", err)
|
log.Errorf("%s Connector Get Error, %v", ServerLogPrefix, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Debugf("dispatchTo Down Streams failed")
|
log.Debugf("%s dispatchTo Down Streams failed", ServerLogPrefix)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -463,7 +463,7 @@ func (s *Server) dispatchToDownStreams(c *Context) {
|
||||||
f.GetMetaFrame().SetMetadata(stream.Metadata())
|
f.GetMetaFrame().SetMetadata(stream.Metadata())
|
||||||
}
|
}
|
||||||
for addr, ds := range s.downStreams {
|
for addr, ds := range s.downStreams {
|
||||||
log.Infof("dispatching to, dispatch_addr: %s; tid: %s;", addr, "", f.TransactionId())
|
log.Infof("%s dispatching to, dispatch_addr: %s; tid: %s;", ServerLogPrefix, addr, "", f.TransactionId())
|
||||||
_ = ds.WriteFrame(f)
|
_ = ds.WriteFrame(f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -522,7 +522,7 @@ func (s *Server) authNames() []string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) doConnectionCloseHandlers(qConn quic.Connection) {
|
func (s *Server) doConnectionCloseHandlers(qConn quic.Connection) {
|
||||||
log.Debugf("QUIC Connection Closed")
|
log.Debugf("%s QUIC Connection Closed", ServerLogPrefix)
|
||||||
for _, h := range s.connectionCloseHandlers {
|
for _, h := range s.connectionCloseHandlers {
|
||||||
h(qConn)
|
h(qConn)
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ func defaultServerOptions() *serverOptions {
|
||||||
auths: map[string]auth.Authentication{},
|
auths: map[string]auth.Authentication{},
|
||||||
}
|
}
|
||||||
opts.alpnHandler = func(proto string) error {
|
opts.alpnHandler = func(proto string) error {
|
||||||
log.Infof("client alpn proto", "component", "server", "proto", proto)
|
log.Infof("%s client alpn proto %s, %s, %s, %s", ServerLogPrefix, "component", "server", "proto", proto)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return opts
|
return opts
|
||||||
|
|
Loading…
Reference in New Issue