You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
137 lines
3.7 KiB
137 lines
3.7 KiB
package network |
|
|
|
import ( |
|
"context" |
|
"git.hpds.cc/Component/network/hpds_err" |
|
"git.hpds.cc/Component/network/log" |
|
"sync" |
|
"time" |
|
|
|
"git.hpds.cc/Component/network/frame" |
|
) |
|
|
|
var ctxPool sync.Pool |
|
|
|
// Context for Network Server. |
|
type Context struct { |
|
// DataStream is the stream used for reading and writing frames. |
|
DataStream DataStream |
|
|
|
// Frame receives from client. |
|
Frame frame.Frame |
|
|
|
// mu is used to protect Keys from concurrent read and write operations. |
|
mu sync.RWMutex |
|
// Keys stores the key/value pairs in context, It is Lazy initialized. |
|
Keys map[string]any |
|
} |
|
|
|
// Set is used to store a new key/value pair exclusively for this context. |
|
// It also lazy initializes c.Keys if it was not used previously. |
|
func (c *Context) Set(key string, value any) { |
|
c.mu.Lock() |
|
defer c.mu.Unlock() |
|
|
|
if c.Keys == nil { |
|
c.Keys = make(map[string]any) |
|
} |
|
|
|
c.Keys[key] = value |
|
} |
|
|
|
// Get returns the value for the given key, ie: (value, true). |
|
// If the value does not exist it returns (nil, false) |
|
func (c *Context) Get(key string) (any, bool) { |
|
c.mu.RLock() |
|
defer c.mu.RUnlock() |
|
|
|
value, ok := c.Keys[key] |
|
return value, ok |
|
} |
|
|
|
var _ context.Context = &Context{} |
|
|
|
// Done returns nil (chan which will wait forever) when c.Stream.Context() has no Context. |
|
func (c *Context) Done() <-chan struct{} { return c.DataStream.Context().Done() } |
|
|
|
// Deadline returns that there is no deadline (ok==false) when c.Stream has no Context. |
|
func (c *Context) Deadline() (deadline time.Time, ok bool) { return c.DataStream.Context().Deadline() } |
|
|
|
// Err returns nil when c.Request has no Context. |
|
func (c *Context) Err() error { return c.DataStream.Context().Err() } |
|
|
|
// Value returns the value associated with this context for key, or nil |
|
// if no value is associated with key. Successive calls to Value with |
|
// the same key returns the same result. |
|
func (c *Context) Value(key any) any { |
|
if keyAsString, ok := key.(string); ok { |
|
if val, exists := c.Keys[keyAsString]; exists { |
|
return val |
|
} |
|
} |
|
// There always returns nil, because quic.Stream.Context is not be allowed modify. |
|
return c.DataStream.Context().Value(key) |
|
} |
|
|
|
// newContext returns a yomo context, |
|
// The context implements standard library `context.Context` interface, |
|
// The lifecycle of Context is equal to stream's that be passed in. |
|
func newContext(dataStream DataStream) (c *Context) { |
|
v := ctxPool.Get() |
|
if v == nil { |
|
c = new(Context) |
|
} else { |
|
c = v.(*Context) |
|
} |
|
|
|
log.Infof("%s stream_id: %s; stream_name: %s; stream_type: %s;", ClientLogPrefix, dataStream.ID(), |
|
dataStream.Name(), dataStream.StreamType().String(), |
|
) |
|
|
|
c.DataStream = dataStream |
|
return |
|
} |
|
|
|
// WithFrame sets a frame to context. |
|
// |
|
// TODO: delete frame from context due to different lifecycle between stream and stream. |
|
func (c *Context) WithFrame(f frame.Frame) { |
|
c.Frame = f |
|
} |
|
|
|
// CloseWithError close dataStream in se error, |
|
// It tells controlStream which dataStream should be closed and close dataStream with |
|
// returning error message to client side stream. |
|
// |
|
// TODO: ycode is not be transmitted. |
|
func (c *Context) CloseWithError(hCode hpds_err.ErrorCode, errString string) { |
|
log.Warnf("%s Stream Close With error, err_code: %s; error: %s", ClientLogPrefix, hCode.String(), errString) |
|
|
|
err := c.DataStream.CloseWithError(errString) |
|
if err == nil { |
|
return |
|
} |
|
log.Errorf("%s Close DataStream error %v", ClientLogPrefix, err) |
|
} |
|
|
|
// Clean cleans the Context, |
|
// Context is not available after called Clean, |
|
// |
|
// Warining: do not use any Context api after Clean, It maybe cause an error. |
|
func (c *Context) Clean() { |
|
c.reset() |
|
ctxPool.Put(c) |
|
} |
|
|
|
func (c *Context) reset() { |
|
c.DataStream = nil |
|
c.Frame = nil |
|
for k := range c.Keys { |
|
delete(c.Keys, k) |
|
} |
|
} |
|
|
|
// StreamId gets dataStream ID. |
|
func (c *Context) StreamId() string { |
|
return c.DataStream.ID() |
|
}
|
|
|