diff --git a/client.go b/client.go index 39b92c0..2f6541d 100644 --- a/client.go +++ b/client.go @@ -42,7 +42,7 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien clientId := id.New() if option.credential != nil { - log.Infof("use credential, credential_name: %s;", option.credential.Name()) + log.Infof("%s use credential, credential_name: %s;", ClientLogPrefix, option.credential.Name()) } ctx, ctxCancel := context.WithCancel(context.Background()) @@ -56,7 +56,7 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien cli.streamType = connType cli.opts = option cli.errorfn = func(err error) { - cli.logger.Errorf("client err, %s", err) + cli.logger.Errorf("%s client err, %s", ClientLogPrefix, err) } cli.writeFrameChan = make(chan frame.Frame) cli.shutdownChan = make(chan error, 1) @@ -69,7 +69,7 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien func (c *Client) Connect(ctx context.Context, addr string) error { controlStream, dataStream, err := c.openStream(ctx, addr) if err != nil { - c.logger.Errorf("connect error, %s", err) + c.logger.Errorf("%s connect error, %s", ClientLogPrefix, err) return err } @@ -96,7 +96,7 @@ func (c *Client) runBackground(ctx context.Context, addr string, controlStream C var err error controlStream, dataStream, err = c.openStream(ctx, addr) if err != nil { - c.logger.Errorf("client reconnect error, %s", err) + c.logger.Errorf("%s client reconnect error, %s", ClientLogPrefix, err) time.Sleep(time.Second) goto RECONNECT } @@ -115,7 +115,7 @@ func (c *Client) cleanStream(controlStream ClientControlStream, err error) { errString := "" if err != nil { errString = err.Error() - c.logger.Errorf("client cancel with error, %s", err) + c.logger.Errorf("%s client cancel with error, %s", ClientLogPrefix, err) } // controlStream is nil represents that client is not connected. @@ -255,18 +255,18 @@ func (c *Client) handleFrame(f frame.Frame) { switch ff := f.(type) { case *frame.DataFrame: if c.processor == nil { - c.logger.Warnf("client processor has not been set") + c.logger.Warnf("%s client processor has not been set", ClientLogPrefix) } else { c.processor(ff) } case *frame.BackFlowFrame: if c.receiver == nil { - c.logger.Warnf("client receiver has not been set") + c.logger.Warnf("%s client receiver has not been set", ClientLogPrefix) } else { c.receiver(ff) } default: - c.logger.Warnf("client data stream receive unexcepted frame, frame_type: %v", f) + c.logger.Warnf("%s client data stream receive unexcepted frame, frame_type: %v", ClientLogPrefix, f) } } @@ -296,17 +296,17 @@ func (c *Client) receivingStreamClose(controlStream ControlStream, dataStream Da // SetDataFrameObserver sets the data frame handler. func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) { c.processor = fn - c.logger.Debugf("SetDataFrameObserver") + c.logger.Debugf("%s SetDataFrameObserver", ClientLogPrefix) } // SetBackFlowFrameObserver sets the backflow frame handler. func (c *Client) SetBackFlowFrameObserver(fn func(*frame.BackFlowFrame)) { c.receiver = fn - c.logger.Debugf("SetBackFlowFrameObserver") + c.logger.Debugf("%s SetBackFlowFrameObserver", ClientLogPrefix) } // SetObserveDataTags set the data tag list that will be observed. -// Deprecated: use yomo.WithObserveDataTags instead +// Deprecated: use network.WithObserveDataTags instead func (c *Client) SetObserveDataTags(tag ...frame.Tag) { c.opts.observeDataTags = append(c.opts.observeDataTags, tag...) } diff --git a/connector.go b/connector.go index 0834c08..e531a9d 100644 --- a/connector.go +++ b/connector.go @@ -41,7 +41,7 @@ func (c *Connector) Add(streamId string, stream DataStream) error { c.streams.Store(streamId, stream) - log.Debugf("Connector add stream, stream_id: %s", streamId) + log.Debugf("%s Connector add stream, stream_id: %s", ParseFrameLogPrefix, streamId) return nil } @@ -55,7 +55,7 @@ func (c *Connector) Remove(streamId string) error { } c.streams.Delete(streamId) - log.Debugf("Connector remove stream, stream_id: %s", streamId) + log.Debugf("%s Connector remove stream, stream_id: %s", ParseFrameLogPrefix, streamId) return nil } diff --git a/constant.go b/constant.go index 57bff5f..30f42eb 100644 --- a/constant.go +++ b/constant.go @@ -17,6 +17,13 @@ const ( ConnStateClosed ConnState = "Closed" ) +// Prefix is the prefix for logger. +const ( + ClientLogPrefix = "\033[36m[network:client]\033[0m " + ServerLogPrefix = "\033[32m[network:server]\033[0m " + ParseFrameLogPrefix = "\033[36m[network:stream_parser]\033[0m " +) + func init() { rand.Seed(time.Now().Unix()) } diff --git a/context.go b/context.go index f4b87ba..3d56d7e 100644 --- a/context.go +++ b/context.go @@ -84,7 +84,7 @@ func newContext(dataStream DataStream) (c *Context) { c = v.(*Context) } - log.Infof("stream_id: %s; stream_name: %s; stream_type: %s;", dataStream.ID(), + log.Infof("%s stream_id: %s; stream_name: %s; stream_type: %s;", ClientLogPrefix, dataStream.ID(), dataStream.Name(), dataStream.StreamType().String(), ) @@ -105,13 +105,13 @@ func (c *Context) WithFrame(f frame.Frame) { // // TODO: ycode is not be transmitted. func (c *Context) CloseWithError(hCode hpds_err.ErrorCode, errString string) { - log.Warnf("Stream Close With error", "err_code", hCode.String(), "error", errString) + log.Warnf("%s Stream Close With error, err_code: %s; error: %s", ClientLogPrefix, hCode.String(), errString) err := c.DataStream.CloseWithError(errString) if err == nil { return } - log.Errorf("Close DataStream error", err) + log.Errorf("%s Close DataStream error %v", ClientLogPrefix, err) } // Clean cleans the Context, diff --git a/log/logger.go b/log/logger.go index f9ebf06..ed7f28c 100644 --- a/log/logger.go +++ b/log/logger.go @@ -120,7 +120,7 @@ func logFormat() string { func logLevel() Level { envLevel := strings.ToLower(os.Getenv("HPDS_LOG_LEVEL")) - level := ErrorLevel + level := DebugLevel switch envLevel { case "debug": return DebugLevel diff --git a/server.go b/server.go index 4593290..080d192 100644 --- a/server.go +++ b/server.go @@ -42,6 +42,7 @@ type Server struct { afterHandlers []FrameHandler connectionCloseHandlers []ConnectionHandler listener Listener + logger log.Logger } // NewServer create a Server instance. @@ -85,21 +86,21 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { if err := s.validateRouter(); err != nil { return err } - + s.connector = NewConnector(ctx) // listen the address listener, err := newListener(conn, s.opts.tlsConfig, s.opts.quicConfig) if err != nil { - log.Errorf("listener.Listen: err=%v", err) + log.Errorf("%s listener.Listen: err=%v", ServerLogPrefix, err) return err } s.listener = listener - log.Printf("[%s][%d] Listening on: %s, QUIC: %v, AUTH: %s", s.name, os.Getpid(), listener.Addr(), listener.Versions(), s.authNames()) + log.Printf("%s [%s][%d] Listening on: %s, QUIC: %v, AUTH: %s", ServerLogPrefix, s.name, os.Getpid(), listener.Addr(), listener.Versions(), s.authNames()) for { conn, err := s.listener.Accept(ctx) if err != nil { - log.Errorf("listener accept connections error", err) + log.Errorf("%s listener accept connections error %v", ServerLogPrefix, err) return err } err = s.opts.alpnHandler(conn.ConnectionState().TLS.NegotiatedProtocol) @@ -119,10 +120,10 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { // It response to client a AuthenticationAckFrame. err = controlStream.VerifyAuthentication(s.handleAuthenticationFrame) if err != nil { - log.Warnf("Authentication Failed", "error", err) + log.Warnf("%s Authentication Failed, error: %s", ServerLogPrefix, err) continue } - log.Debugf("Authentication Success") + log.Debugf("%s Authentication Success", ServerLogPrefix) go func(qConn quic.Connection) { streamGroup := NewStreamGroup(ctx, controlStream, s.connector) @@ -134,7 +135,7 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { case <-ctx.Done(): return case err := <-s.runWithStreamGroup(streamGroup): - log.Errorf("Client Close, %v", err) + log.Errorf("%s Client Close, %v", ServerLogPrefix, err) } }(conn) } @@ -183,8 +184,8 @@ func (s *Server) handleRoute(c *Context) error { if e, ok := err.(hpds_err.DuplicateNameError); ok { existsConnId := e.ConnId() - log.Debugf("StreamFunction Duplicate Name, error: %s; sfn_name: %s, old_stream_id: %s; current_stream_id: %s", - e.Error(), c.DataStream.Name(), existsConnId, c.StreamId()) + log.Debugf("%s StreamFunction Duplicate Name, error: %s; sfn_name: %s, old_stream_id: %s; current_stream_id: %s", + ServerLogPrefix, e.Error(), c.DataStream.Name(), existsConnId, c.StreamId()) stream, ok, err := s.connector.Get(existsConnId) if err != nil { @@ -214,12 +215,11 @@ func (s *Server) handleStreamContext(c *Context) { // start frame handlers for _, handler := range s.startHandlers { if err := handler(c); err != nil { - log.Errorf("startHandlers error: %v", err) + log.Errorf("%s startHandlers error: %v", ServerLogPrefix, err) c.CloseWithError(hpds_err.ErrorCodeStartHandler, err.Error()) return } } - // check update for stream for { f, err := c.DataStream.ReadFrame() @@ -228,26 +228,26 @@ func (s *Server) handleStreamContext(c *Context) { if e, ok := err.(*quic.ApplicationError); ok { if hpds_err.Is(e.ErrorCode, hpds_err.ErrorCodeClientAbort) { // client abort - log.Infof("client close the connection") + log.Infof("%s client close the connection", ServerLogPrefix) break } he := hpds_err.New(hpds_err.Parse(e.ErrorCode), err) - log.Errorf("read frame error: %v", he) + log.Errorf("%s read frame error: %v", ServerLogPrefix, he) } else if err == io.EOF { - log.Infof("connection EOF") + log.Infof("%s connection EOF", ServerLogPrefix) break } if errors.Is(err, net.ErrClosed) { // if client close the connection, net.ErrClosed will be raise // by quic-go IdleTimeoutError after connection's KeepAlive config. - log.Warnf("connection error, error: %v", net.ErrClosed) + log.Warnf("%s connection error, error: %v", ServerLogPrefix, net.ErrClosed) c.CloseWithError(hpds_err.ErrorCodeClosed, "net.ErrClosed") break } // any error occurred, we should close the stream // after this, conn.AcceptStream() will raise the error c.CloseWithError(hpds_err.ErrorCodeUnknown, err.Error()) - log.Warnf("connection close") + log.Warnf("%s connection close", ServerLogPrefix) break } @@ -257,21 +257,21 @@ func (s *Server) handleStreamContext(c *Context) { // before frame handlers for _, handler := range s.beforeHandlers { if err := handler(c); err != nil { - log.Errorf("beforeFrameHandler error: %v", err) + log.Errorf("%s beforeFrameHandler error: %v", ServerLogPrefix, err) c.CloseWithError(hpds_err.ErrorCodeBeforeHandler, err.Error()) return } } // main handler if err := s.mainFrameHandler(c); err != nil { - log.Errorf("mainFrameHandler error: %v", err) + log.Errorf("%s mainFrameHandler error: %v", ServerLogPrefix, err) c.CloseWithError(hpds_err.ErrorCodeMainHandler, err.Error()) return } // after frame handler for _, handler := range s.afterHandlers { if err := handler(c); err != nil { - log.Errorf("afterFrameHandler error: %v", err) + log.Errorf("%s afterFrameHandler error: %v", ServerLogPrefix, err) c.CloseWithError(hpds_err.ErrorCodeAfterHandler, err.Error()) return } @@ -293,7 +293,7 @@ func (s *Server) mainFrameHandler(c *Context) error { _ = s.handleBackFlowFrame(c) } default: - log.Errorf("err=%v, frame=%v", err, frame.Shortly(c.Frame.Encode())) + log.Errorf("%s err=%v, frame=%v", ServerLogPrefix, err, frame.Shortly(c.Frame.Encode())) } return nil } @@ -302,9 +302,9 @@ func (s *Server) handleAuthenticationFrame(f auth.Object) (bool, error) { ok := auth.Authenticate(s.opts.auths, f) if ok { - log.Debugf("Successful authentication") + log.Debugf("%s Successful authentication", ServerLogPrefix) } else { - log.Warnf("Authentication failed", "credential", f.AuthName()) + log.Warnf("%s Authentication failed, credential: %s", ServerLogPrefix, f.AuthName()) } return ok, nil @@ -319,7 +319,7 @@ func (s *Server) handleDataFrame(c *Context) error { return err } if !ok { - log.Warnf("handleDataFrame connector cannot find, from_conn_id: %s", fromId) + log.Warnf("%s handleDataFrame connector cannot find, from_conn_id: %s", ServerLogPrefix, fromId) return fmt.Errorf("handleDataFrame connector cannot find %s", fromId) } @@ -340,14 +340,14 @@ func (s *Server) handleDataFrame(c *Context) error { // route route := s.router.Route(m) if route == nil { - log.Warnf("handleDataFrame route is nil") + log.Warnf("%s handleDataFrame route is nil", ServerLogPrefix) return fmt.Errorf("handleDataFrame route is nil") } // get stream function connection ids from route connIDs := route.GetForwardRoutes(f.GetDataTag()) - log.Debugf("Data Routing Status, sfn_stream_ids: %v; connector: %v", connIDs, s.connector.GetSnapshot()) + log.Debugf("%s Data Routing Status, sfn_stream_ids: %v; connector: %v", ServerLogPrefix, connIDs, s.connector.GetSnapshot()) for _, toId := range connIDs { conn, ok, err := s.connector.Get(toId) @@ -355,17 +355,17 @@ func (s *Server) handleDataFrame(c *Context) error { continue } if !ok { - log.Errorf("Can't find forward conn, error: conn is nil ;forward_conn_id: ", toId) + log.Errorf("%s Can't find forward conn, error: conn is nil ;forward_conn_id: ", ServerLogPrefix, toId) continue } to := conn.Name() - log.Infof("handleDataFrame, from_conn_name: %s; from_conn_id: %s; to_conn_name: %s; to_conn_id: %s; data_frame: %s", - from.Name(), fromId, to, toId, f.String()) + log.Infof("%s handleDataFrame, from_conn_name: %s; from_conn_id: %s; to_conn_name: %s; to_conn_id: %s; data_frame: %s", + ServerLogPrefix, from.Name(), fromId, to, toId, f.String()) // write data frame to stream if err := conn.WriteFrame(f); err != nil { - log.Errorf("handleDataFrame conn.Write, %v", err) + log.Errorf("%s handleDataFrame conn.Write, %v", ServerLogPrefix, err) } } @@ -386,9 +386,9 @@ func (s *Server) handleBackFlowFrame(c *Context) error { // logger.Printf("%s handleBackFlowFrame tag:%#v --> source:%s, result=%s", ServerLogPrefix, tag, sourceId, carriage) for _, source := range sourceConnList { if source != nil { - log.Debugf("handleBackFlowFrame tag:%#v; source_conn_id: %s, back_flow_frame: %s", tag, sourceId, f.String()) + log.Debugf("%s handleBackFlowFrame tag:%#v; source_conn_id: %s, back_flow_frame: %s", ServerLogPrefix, tag, sourceId, f.String()) if err := source.WriteFrame(bf); err != nil { - log.Errorf("handleBackFlowFrame tag:%#v --> Protocol Gateway:%s, error=%v", tag, sourceId, err) + log.Errorf("%s handleBackFlowFrame tag:%#v --> Protocol Gateway:%s, error=%v", ServerLogPrefix, tag, sourceId, err) return err } } @@ -415,7 +415,7 @@ func (s *Server) DownStreams() map[string]frame.Writer { func (s *Server) ConfigRouter(router router.Router) { s.mu.Lock() s.router = router - log.Debugf("config router is %#v", router) + log.Debugf("%s config router is %#v", ServerLogPrefix, router) s.mu.Unlock() } @@ -424,7 +424,7 @@ func (s *Server) ConfigRouter(router router.Router) { func (s *Server) ConfigMetadataBuilder(builder metadata.Builder) { s.mu.Lock() s.metadataBuilder = builder - log.Debugf("config metadataBuilder is %#v", builder) + log.Debugf("%s config metadataBuilder is %#v", ServerLogPrefix, builder) s.mu.Unlock() } @@ -432,7 +432,7 @@ func (s *Server) ConfigMetadataBuilder(builder metadata.Builder) { func (s *Server) ConfigAlpnHandler(h func(string) error) { s.mu.Lock() s.opts.alpnHandler = h - log.Debugf("config alpnHandler") + log.Debugf("%s config alpnHandler", ServerLogPrefix) s.mu.Unlock() } @@ -448,11 +448,11 @@ func (s *Server) AddDownstreamServer(addr string, c *Client) { func (s *Server) dispatchToDownStreams(c *Context) { stream, ok, err := s.connector.Get(c.StreamId()) if err != nil { - log.Errorf("Connector Get Error, %v", err) + log.Errorf("%s Connector Get Error, %v", ServerLogPrefix, err) return } if !ok { - log.Debugf("dispatchTo Down Streams failed") + log.Debugf("%s dispatchTo Down Streams failed", ServerLogPrefix) return } @@ -463,7 +463,7 @@ func (s *Server) dispatchToDownStreams(c *Context) { f.GetMetaFrame().SetMetadata(stream.Metadata()) } for addr, ds := range s.downStreams { - log.Infof("dispatching to, dispatch_addr: %s; tid: %s;", addr, "", f.TransactionId()) + log.Infof("%s dispatching to, dispatch_addr: %s; tid: %s;", ServerLogPrefix, addr, "", f.TransactionId()) _ = ds.WriteFrame(f) } } @@ -522,7 +522,7 @@ func (s *Server) authNames() []string { } func (s *Server) doConnectionCloseHandlers(qConn quic.Connection) { - log.Debugf("QUIC Connection Closed") + log.Debugf("%s QUIC Connection Closed", ServerLogPrefix) for _, h := range s.connectionCloseHandlers { h(qConn) } diff --git a/server_options.go b/server_options.go index 0fbdf29..6e4c068 100644 --- a/server_options.go +++ b/server_options.go @@ -32,7 +32,7 @@ func defaultServerOptions() *serverOptions { auths: map[string]auth.Authentication{}, } opts.alpnHandler = func(proto string) error { - log.Infof("client alpn proto", "component", "server", "proto", proto) + log.Infof("%s client alpn proto %s, %s, %s, %s", ServerLogPrefix, "component", "server", "proto", proto) return nil } return opts