wangjian
1 year ago
47 changed files with 2408 additions and 1226 deletions
@ -1,28 +0,0 @@
|
||||
package network |
||||
|
||||
const ( |
||||
// ClientTypeNone is connection type "None".
|
||||
ClientTypeNone ClientType = 0xFF |
||||
// ClientTypeProtocolGateway is connection type "Protocol Gateway".
|
||||
ClientTypeProtocolGateway ClientType = 0x5F |
||||
// ClientTypeMessageQueue is connection type "Message Queue".
|
||||
ClientTypeMessageQueue ClientType = 0x5E |
||||
// ClientTypeStreamFunction is connection type "Stream Function".
|
||||
ClientTypeStreamFunction ClientType = 0x5D |
||||
) |
||||
|
||||
// ClientType represents the connection type.
|
||||
type ClientType byte |
||||
|
||||
func (c ClientType) String() string { |
||||
switch c { |
||||
case ClientTypeProtocolGateway: |
||||
return "Source" |
||||
case ClientTypeMessageQueue: |
||||
return "Message Queue" |
||||
case ClientTypeStreamFunction: |
||||
return "Stream Function" |
||||
default: |
||||
return "None" |
||||
} |
||||
} |
@ -1,87 +1,135 @@
|
||||
package network |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"git.hpds.cc/Component/network/frame" |
||||
"git.hpds.cc/Component/network/log" |
||||
"sync" |
||||
) |
||||
|
||||
var _ Connector = &connector{} |
||||
|
||||
// Connector is an interface to manage the connections and applications.
|
||||
type Connector interface { |
||||
// Add a connection.
|
||||
Add(connId string, conn Connection) |
||||
// Remove a connection.
|
||||
Remove(connId string) |
||||
// Get a connection by connection id.
|
||||
Get(connId string) Connection |
||||
// GetSnapshot gets the snapshot of all connections.
|
||||
GetSnapshot() map[string]string |
||||
// GetProtocolGatewayConnections gets the connections by Protocol Gateway observe tags.
|
||||
GetProtocolGatewayConnections(sourceId string, tags byte) []Connection |
||||
// Clean the connector.
|
||||
Clean() |
||||
} |
||||
// ErrConnectorClosed will be returned if the connector has been closed.
|
||||
var ErrConnectorClosed = errors.New("hpdsMq: connector closed") |
||||
|
||||
type connector struct { |
||||
conns sync.Map |
||||
} |
||||
// The Connector class manages data streams and provides a centralized way to get and set streams.
|
||||
type Connector struct { |
||||
// ctx and ctxCancel manage the lifescyle of Connector.
|
||||
ctx context.Context |
||||
ctxCancel context.CancelFunc |
||||
|
||||
func newConnector() Connector { |
||||
return &connector{conns: sync.Map{}} |
||||
streams sync.Map |
||||
} |
||||
|
||||
// Add a connection.
|
||||
func (c *connector) Add(connID string, conn Connection) { |
||||
log.Debugf("%sconnector add: connId=%s", ServerLogPrefix, connID) |
||||
c.conns.Store(connID, conn) |
||||
// NewConnector returns an initial Connector.
|
||||
func NewConnector(ctx context.Context) *Connector { |
||||
ctx, ctxCancel := context.WithCancel(ctx) |
||||
|
||||
return &Connector{ |
||||
ctx: ctx, |
||||
ctxCancel: ctxCancel, |
||||
} |
||||
} |
||||
|
||||
// Remove a connection.
|
||||
func (c *connector) Remove(connID string) { |
||||
log.Debugf("%sconnector remove: connId=%s", ServerLogPrefix, connID) |
||||
c.conns.Delete(connID) |
||||
// Add adds DataStream to Connector,
|
||||
// If the streamID is the same twice, the new stream will replace the old stream.
|
||||
func (c *Connector) Add(streamId string, stream DataStream) error { |
||||
select { |
||||
case <-c.ctx.Done(): |
||||
return ErrConnectorClosed |
||||
default: |
||||
} |
||||
|
||||
c.streams.Store(streamId, stream) |
||||
|
||||
log.Debugf("Connector add stream, stream_id: %s", streamId) |
||||
return nil |
||||
} |
||||
|
||||
// Get a connection by connection id.
|
||||
func (c *connector) Get(connID string) Connection { |
||||
log.Debugf("%sconnector get connection: connId=%s", ServerLogPrefix, connID) |
||||
if conn, ok := c.conns.Load(connID); ok { |
||||
return conn.(Connection) |
||||
// Remove removes the DataStream with the specified streamID.
|
||||
// If the Connector does not have a stream with the given streamID, no action is taken.
|
||||
func (c *Connector) Remove(streamId string) error { |
||||
select { |
||||
case <-c.ctx.Done(): |
||||
return ErrConnectorClosed |
||||
default: |
||||
} |
||||
|
||||
c.streams.Delete(streamId) |
||||
log.Debugf("Connector remove stream, stream_id: %s", streamId) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// GetProtocolGatewayConnections gets the Protocol Gateway connection by tag.
|
||||
func (c *connector) GetProtocolGatewayConnections(sourceId string, tag byte) []Connection { |
||||
conns := make([]Connection, 0) |
||||
// Get retrieves the DataStream with the specified streamID.
|
||||
// If the Connector does not have a stream with the given streamID, return nil and false.
|
||||
func (c *Connector) Get(streamId string) (DataStream, bool, error) { |
||||
select { |
||||
case <-c.ctx.Done(): |
||||
return nil, false, ErrConnectorClosed |
||||
default: |
||||
} |
||||
|
||||
v, ok := c.streams.Load(streamId) |
||||
if !ok { |
||||
return nil, false, nil |
||||
} |
||||
|
||||
c.conns.Range(func(key interface{}, val interface{}) bool { |
||||
conn := val.(Connection) |
||||
for _, v := range conn.ObserveDataTags() { |
||||
if v == tag && conn.ClientType() == ClientTypeProtocolGateway && conn.ClientId() == sourceId { |
||||
conns = append(conns, conn) |
||||
stream := v.(DataStream) |
||||
|
||||
return stream, true, nil |
||||
} |
||||
|
||||
// GetSourceConns gets the streams with the specified source observe tag.
|
||||
func (c *Connector) GetSourceConns(sourceId string, tag frame.Tag) ([]DataStream, error) { |
||||
select { |
||||
case <-c.ctx.Done(): |
||||
return []DataStream{}, ErrConnectorClosed |
||||
default: |
||||
} |
||||
|
||||
streams := make([]DataStream, 0) |
||||
|
||||
c.streams.Range(func(key interface{}, val interface{}) bool { |
||||
stream := val.(DataStream) |
||||
|
||||
for _, v := range stream.ObserveDataTags() { |
||||
if v == tag && |
||||
stream.StreamType() == StreamTypeSource && |
||||
stream.ID() == sourceId { |
||||
streams = append(streams, stream) |
||||
} |
||||
} |
||||
return true |
||||
}) |
||||
|
||||
return conns |
||||
return streams, nil |
||||
} |
||||
|
||||
// GetSnapshot gets the snapshot of all connections.
|
||||
func (c *connector) GetSnapshot() map[string]string { |
||||
// GetSnapshot returnsa snapshot of all streams.
|
||||
// The resulting map uses streamID as the key and stream name as the value.
|
||||
// This function is typically used to monitor the status of the Connector.
|
||||
func (c *Connector) GetSnapshot() map[string]string { |
||||
result := make(map[string]string) |
||||
c.conns.Range(func(key interface{}, val interface{}) bool { |
||||
connID := key.(string) |
||||
conn := val.(Connection) |
||||
result[connID] = conn.Name() |
||||
|
||||
c.streams.Range(func(key interface{}, val interface{}) bool { |
||||
var ( |
||||
streamID = key.(string) |
||||
stream = val.(DataStream) |
||||
) |
||||
result[streamID] = stream.Name() |
||||
return true |
||||
}) |
||||
|
||||
return result |
||||
} |
||||
|
||||
// Clean the connector.
|
||||
func (c *connector) Clean() { |
||||
c.conns = sync.Map{} |
||||
// Close cleans all stream of Connector and reset Connector to closed status.
|
||||
// The Connector can't be use after close.
|
||||
func (c *Connector) Close() { |
||||
c.ctxCancel() |
||||
|
||||
c.streams.Range(func(key, value any) bool { |
||||
c.streams.Delete(key) |
||||
return true |
||||
}) |
||||
} |
||||
|
@ -1,191 +1,137 @@
|
||||
package network |
||||
|
||||
import ( |
||||
"context" |
||||
"git.hpds.cc/Component/network/hpds_err" |
||||
"git.hpds.cc/Component/network/log" |
||||
"io" |
||||
"sync" |
||||
"time" |
||||
|
||||
"git.hpds.cc/Component/network/frame" |
||||
"github.com/lucas-clemente/quic-go" |
||||
) |
||||
|
||||
var ctxPool sync.Pool |
||||
|
||||
// Context for Network Server.
|
||||
type Context struct { |
||||
// Conn is the connection of client.
|
||||
Conn quic.Connection |
||||
connId string |
||||
// Stream is the long-lived connection between client and server.
|
||||
Stream io.ReadWriteCloser |
||||
// DataStream is the stream used for reading and writing frames.
|
||||
DataStream DataStream |
||||
|
||||
// Frame receives from client.
|
||||
Frame frame.Frame |
||||
// Keys store the key/value pairs in context.
|
||||
Keys map[string]interface{} |
||||
|
||||
// mu is used to protect Keys from concurrent read and write operations.
|
||||
mu sync.RWMutex |
||||
// Keys stores the key/value pairs in context, It is Lazy initialized.
|
||||
Keys map[string]any |
||||
} |
||||
|
||||
func newContext(conn quic.Connection, stream quic.Stream) *Context { |
||||
return &Context{ |
||||
Conn: conn, |
||||
connId: conn.RemoteAddr().String(), |
||||
Stream: stream, |
||||
// keys: make(map[string]interface{}),
|
||||
} |
||||
} |
||||
|
||||
// WithFrame sets a frame to context.
|
||||
func (c *Context) WithFrame(f frame.Frame) *Context { |
||||
c.Frame = f |
||||
return c |
||||
} |
||||
|
||||
// Clean the context.
|
||||
func (c *Context) Clean() { |
||||
log.Debugf("%sconn[%s] context clean", ServerLogPrefix, c.connId) |
||||
c.Stream = nil |
||||
c.Frame = nil |
||||
c.Keys = nil |
||||
c.Conn = nil |
||||
} |
||||
|
||||
// CloseWithError closes the stream and cleans the context.
|
||||
func (c *Context) CloseWithError(code hpds_err.ErrorCode, msg string) { |
||||
log.Debugf("%sconn[%s] context close, errCode=%#x, msg=%s", ServerLogPrefix, c.connId, code, msg) |
||||
if c.Stream != nil { |
||||
_ = c.Stream.Close() |
||||
} |
||||
if c.Conn != nil { |
||||
_ = c.Conn.CloseWithError(quic.ApplicationErrorCode(code), msg) |
||||
} |
||||
c.Clean() |
||||
} |
||||
|
||||
// ConnId get quic connection id
|
||||
func (c *Context) ConnId() string { |
||||
return c.connId |
||||
} |
||||
|
||||
// Set a key/value pair to context.
|
||||
func (c *Context) Set(key string, value interface{}) { |
||||
// Set is used to store a new key/value pair exclusively for this context.
|
||||
// It also lazy initializes c.Keys if it was not used previously.
|
||||
func (c *Context) Set(key string, value any) { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
|
||||
if c.Keys == nil { |
||||
c.Keys = make(map[string]interface{}) |
||||
c.Keys = make(map[string]any) |
||||
} |
||||
|
||||
c.Keys[key] = value |
||||
c.mu.Unlock() |
||||
} |
||||
|
||||
// Get the value by a specified key.
|
||||
func (c *Context) Get(key string) (value interface{}, exists bool) { |
||||
// Get returns the value for the given key, ie: (value, true).
|
||||
// If the value does not exist it returns (nil, false)
|
||||
func (c *Context) Get(key string) (any, bool) { |
||||
c.mu.RLock() |
||||
value, exists = c.Keys[key] |
||||
c.mu.RUnlock() |
||||
return |
||||
} |
||||
defer c.mu.RUnlock() |
||||
|
||||
// GetString gets a string value by a specified key.
|
||||
func (c *Context) GetString(key string) (s string) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
s, _ = val.(string) |
||||
} |
||||
return |
||||
value, ok := c.Keys[key] |
||||
return value, ok |
||||
} |
||||
|
||||
// GetBool gets a bool value by a specified key.
|
||||
func (c *Context) GetBool(key string) (b bool) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
b, _ = val.(bool) |
||||
} |
||||
return |
||||
} |
||||
var _ context.Context = &Context{} |
||||
|
||||
// GetInt gets an int value by a specified key.
|
||||
func (c *Context) GetInt(key string) (i int) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
i, _ = val.(int) |
||||
} |
||||
return |
||||
} |
||||
// Done returns nil (chan which will wait forever) when c.Stream.Context() has no Context.
|
||||
func (c *Context) Done() <-chan struct{} { return c.DataStream.Context().Done() } |
||||
|
||||
// GetInt64 gets an int64 value by a specified key.
|
||||
func (c *Context) GetInt64(key string) (i64 int64) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
i64, _ = val.(int64) |
||||
} |
||||
return |
||||
} |
||||
// Deadline returns that there is no deadline (ok==false) when c.Stream has no Context.
|
||||
func (c *Context) Deadline() (deadline time.Time, ok bool) { return c.DataStream.Context().Deadline() } |
||||
|
||||
// GetUint gets an uint value by a specified key.
|
||||
func (c *Context) GetUint(key string) (ui uint) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
ui, _ = val.(uint) |
||||
} |
||||
return |
||||
} |
||||
// Err returns nil when c.Request has no Context.
|
||||
func (c *Context) Err() error { return c.DataStream.Context().Err() } |
||||
|
||||
// GetUint64 gets an uint64 value by a specified key.
|
||||
func (c *Context) GetUint64(key string) (ui64 uint64) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
ui64, _ = val.(uint64) |
||||
// Value returns the value associated with this context for key, or nil
|
||||
// if no value is associated with key. Successive calls to Value with
|
||||
// the same key returns the same result.
|
||||
func (c *Context) Value(key any) any { |
||||
if keyAsString, ok := key.(string); ok { |
||||
if val, exists := c.Keys[keyAsString]; exists { |
||||
return val |
||||
} |
||||
} |
||||
return |
||||
// There always returns nil, because quic.Stream.Context is not be allowed modify.
|
||||
return c.DataStream.Context().Value(key) |
||||
} |
||||
|
||||
// GetFloat64 gets a float64 value by a specified key.
|
||||
func (c *Context) GetFloat64(key string) (f64 float64) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
f64, _ = val.(float64) |
||||
// newContext returns a yomo context,
|
||||
// The context implements standard library `context.Context` interface,
|
||||
// The lifecycle of Context is equal to stream's that be passed in.
|
||||
func newContext(dataStream DataStream) (c *Context) { |
||||
v := ctxPool.Get() |
||||
if v == nil { |
||||
c = new(Context) |
||||
} else { |
||||
c = v.(*Context) |
||||
} |
||||
return |
||||
} |
||||
|
||||
// GetTime gets a time.Time value by a specified key.
|
||||
func (c *Context) GetTime(key string) (t time.Time) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
t, _ = val.(time.Time) |
||||
} |
||||
log.Infof("stream_id: %s; stream_name: %s; stream_type: %s;", dataStream.ID(), |
||||
dataStream.Name(), dataStream.StreamType().String(), |
||||
) |
||||
|
||||
c.DataStream = dataStream |
||||
return |
||||
} |
||||
|
||||
// GetDuration gets a time.Duration value by a specified key.
|
||||
func (c *Context) GetDuration(key string) (d time.Duration) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
d, _ = val.(time.Duration) |
||||
} |
||||
return |
||||
// WithFrame sets a frame to context.
|
||||
//
|
||||
// TODO: delete frame from context due to different lifecycle between stream and stream.
|
||||
func (c *Context) WithFrame(f frame.Frame) { |
||||
c.Frame = f |
||||
} |
||||
|
||||
// GetStringSlice gets a []string value by a specified key.
|
||||
func (c *Context) GetStringSlice(key string) (ss []string) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
ss, _ = val.([]string) |
||||
// CloseWithError close dataStream in se error,
|
||||
// It tells controlStream which dataStream should be closed and close dataStream with
|
||||
// returning error message to client side stream.
|
||||
//
|
||||
// TODO: ycode is not be transmitted.
|
||||
func (c *Context) CloseWithError(hCode hpds_err.ErrorCode, errString string) { |
||||
log.Warnf("Stream Close With error", "err_code", hCode.String(), "error", errString) |
||||
|
||||
err := c.DataStream.CloseWithError(errString) |
||||
if err == nil { |
||||
return |
||||
} |
||||
return |
||||
log.Errorf("Close DataStream error", err) |
||||
} |
||||
|
||||
// GetStringMap gets a map[string]interface{} value by a specified key.
|
||||
func (c *Context) GetStringMap(key string) (sm map[string]interface{}) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
sm, _ = val.(map[string]interface{}) |
||||
} |
||||
return |
||||
// Clean cleans the Context,
|
||||
// Context is not available after called Clean,
|
||||
//
|
||||
// Warining: do not use any Context api after Clean, It maybe cause an error.
|
||||
func (c *Context) Clean() { |
||||
c.reset() |
||||
ctxPool.Put(c) |
||||
} |
||||
|
||||
// GetStringMapString gets a map[string]string value by a specified key.
|
||||
func (c *Context) GetStringMapString(key string) (sms map[string]string) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
sms, _ = val.(map[string]string) |
||||
func (c *Context) reset() { |
||||
c.DataStream = nil |
||||
c.Frame = nil |
||||
for k := range c.Keys { |
||||
delete(c.Keys, k) |
||||
} |
||||
return |
||||
} |
||||
|
||||
// GetStringMapStringSlice gets a map[string][]string value by a specified key.
|
||||
func (c *Context) GetStringMapStringSlice(key string) (smss map[string][]string) { |
||||
if val, ok := c.Get(key); ok && val != nil { |
||||
smss, _ = val.(map[string][]string) |
||||
} |
||||
return |
||||
// StreamId gets dataStream ID.
|
||||
func (c *Context) StreamId() string { |
||||
return c.DataStream.ID() |
||||
} |
||||
|
@ -0,0 +1,267 @@
|
||||
package network |
||||
|
||||
import ( |
||||
"context" |
||||
"crypto/tls" |
||||
"errors" |
||||
"fmt" |
||||
"git.hpds.cc/Component/network/auth" |
||||
"git.hpds.cc/Component/network/frame" |
||||
"github.com/quic-go/quic-go" |
||||
) |
||||
|
||||
// ControlStream defines the interface for controlling a stream.
|
||||
type ControlStream interface { |
||||
// CloseStream notifies the peer's control stream to close the data stream with the given streamID and error message.
|
||||
CloseStream(streamId string, errString string) error |
||||
// ReceiveStreamClose is received from the peer's control stream to close the data stream according to streamID and error message.
|
||||
ReceiveStreamClose() (streamId string, errString string, err error) |
||||
// CloseWithError closes the control stream.
|
||||
CloseWithError(code uint64, errString string) error |
||||
} |
||||
|
||||
// ServerControlStream defines the interface of server side control stream.
|
||||
type ServerControlStream interface { |
||||
ControlStream |
||||
|
||||
// VerifyAuthentication verify the Authentication from client side.
|
||||
VerifyAuthentication(verifyFunc func(auth.Object) (bool, error)) error |
||||
// AcceptStream accepts data stream from the request of client.
|
||||
AcceptStream(context.Context) (DataStream, error) |
||||
} |
||||
|
||||
// ClientControlStream defines the interface of client side control stream.
|
||||
type ClientControlStream interface { |
||||
ControlStream |
||||
|
||||
// Authenticate with credential, the credential will be sent to ServerControlStream to authenticate the client.
|
||||
Authenticate(*auth.Credential) error |
||||
// OpenStream request a ServerControlStream to create a new data stream.
|
||||
OpenStream(context.Context, *frame.HandshakeFrame) (DataStream, error) |
||||
} |
||||
|
||||
var _ ServerControlStream = &serverControlStream{} |
||||
|
||||
type serverControlStream struct { |
||||
conn quic.Connection |
||||
stream frame.ReadWriter |
||||
} |
||||
|
||||
// NewServerControlStream returns ServerControlStream from quic Connection and the first stream of this Connection.
|
||||
func NewServerControlStream(qConn quic.Connection, stream frame.ReadWriter) ServerControlStream { |
||||
return &serverControlStream{ |
||||
conn: qConn, |
||||
stream: stream, |
||||
} |
||||
} |
||||
|
||||
func (ss *serverControlStream) ReceiveStreamClose() (streamId string, errReason string, err error) { |
||||
return receiveStreamClose(ss.stream) |
||||
} |
||||
|
||||
func (ss *serverControlStream) CloseStream(streamId string, errString string) error { |
||||
return closeStream(ss.stream, streamId, errString) |
||||
} |
||||
|
||||
func (ss *serverControlStream) AcceptStream(context.Context) (DataStream, error) { |
||||
f, err := ss.stream.ReadFrame() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
switch ff := f.(type) { |
||||
case *frame.HandshakeFrame: |
||||
stream, err := ss.conn.OpenStreamSync(context.Background()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
_, err = stream.Write(frame.NewHandshakeAckFrame(ff.ID()).Encode()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
dataStream := newDataStream( |
||||
ff.Name(), |
||||
ff.ID(), |
||||
StreamType(ff.StreamType()), |
||||
ff.Metadata(), |
||||
stream, |
||||
ff.ObserveDataTags(), |
||||
ss, |
||||
) |
||||
return dataStream, nil |
||||
default: |
||||
return nil, fmt.Errorf("yomo: control stream read unexpected frame %s", f.Type()) |
||||
} |
||||
} |
||||
|
||||
func (ss *serverControlStream) CloseWithError(code uint64, errString string) error { |
||||
return closeWithError(ss.conn, code, errString) |
||||
} |
||||
|
||||
func (ss *serverControlStream) VerifyAuthentication(verifyFunc func(auth.Object) (bool, error)) error { |
||||
first, err := ss.stream.ReadFrame() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
received, ok := first.(*frame.AuthenticationFrame) |
||||
if !ok { |
||||
return fmt.Errorf("yomo: read unexpected frame while waiting for authentication, frame read: %s", received.Type().String()) |
||||
} |
||||
ok, err = verifyFunc(received) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if !ok { |
||||
return ss.stream.WriteFrame( |
||||
frame.NewAuthenticationRespFrame( |
||||
false, |
||||
fmt.Sprintf("yomo: authentication failed, client credential name is %s", received.AuthName()), |
||||
), |
||||
) |
||||
} |
||||
return ss.stream.WriteFrame(frame.NewAuthenticationRespFrame(true, "")) |
||||
} |
||||
|
||||
var _ ClientControlStream = &clientControlStream{} |
||||
|
||||
type clientControlStream struct { |
||||
conn quic.Connection |
||||
stream frame.ReadWriter |
||||
} |
||||
|
||||
// OpenClientControlStream opens ClientControlStream from addr.
|
||||
func OpenClientControlStream( |
||||
ctx context.Context, addr string, |
||||
tlsConfig *tls.Config, quicConfig *quic.Config, |
||||
) (ClientControlStream, error) { |
||||
conn, err := quic.DialAddrContext(ctx, addr, tlsConfig, quicConfig) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
stream, err := conn.OpenStream() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return NewClientControlStream(conn, NewFrameStream(stream)), nil |
||||
} |
||||
|
||||
// NewClientControlStream returns ClientControlStream from quic Connection and the first stream form the Connection.
|
||||
func NewClientControlStream(qConn quic.Connection, stream frame.ReadWriter) ClientControlStream { |
||||
return &clientControlStream{ |
||||
conn: qConn, |
||||
stream: stream, |
||||
} |
||||
} |
||||
|
||||
func (cs *clientControlStream) ReceiveStreamClose() (streamId string, errReason string, err error) { |
||||
return receiveStreamClose(cs.stream) |
||||
} |
||||
|
||||
func (cs *clientControlStream) CloseStream(streamId string, errString string) error { |
||||
return closeStream(cs.stream, streamId, errString) |
||||
} |
||||
|
||||
func (cs *clientControlStream) Authenticate(cred *auth.Credential) error { |
||||
if err := cs.stream.WriteFrame( |
||||
frame.NewAuthenticationFrame(cred.Name(), cred.Payload())); err != nil { |
||||
return err |
||||
} |
||||
received, err := cs.stream.ReadFrame() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
resp, ok := received.(*frame.AuthenticationRespFrame) |
||||
if !ok { |
||||
return fmt.Errorf( |
||||
"yomo: read unexcept frame during waiting authentication resp, frame readed: %s", |
||||
received.Type().String(), |
||||
) |
||||
} |
||||
if !resp.OK() { |
||||
return errors.New(resp.Reason()) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// dataStreamAcked drain HandshakeAckFrame from stream.
|
||||
func dataStreamAcked(stream DataStream) error { |
||||
first, err := stream.ReadFrame() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
f, ok := first.(*frame.HandshakeAckFrame) |
||||
if !ok { |
||||
return fmt.Errorf("yomo: data stream read first frame should be HandshakeAckFrame, but got %s", first.Type().String()) |
||||
} |
||||
|
||||
if f.StreamId() != stream.ID() { |
||||
return fmt.Errorf("yomo: data stream ack exception, stream id did not match") |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (cs *clientControlStream) OpenStream(ctx context.Context, hf *frame.HandshakeFrame) (DataStream, error) { |
||||
err := cs.stream.WriteFrame(frame.NewHandshakeFrame( |
||||
hf.Name(), |
||||
hf.ID(), |
||||
hf.StreamType(), |
||||
hf.ObserveDataTags(), |
||||
hf.Metadata(), |
||||
)) |
||||
|
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
quicStream, err := cs.conn.AcceptStream(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
dataStream := newDataStream( |
||||
hf.Name(), |
||||
hf.ID(), |
||||
StreamType(hf.StreamType()), |
||||
hf.Metadata(), |
||||
quicStream, |
||||
hf.ObserveDataTags(), |
||||
cs, |
||||
) |
||||
|
||||
if err := dataStreamAcked(dataStream); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return dataStream, nil |
||||
} |
||||
|
||||
func (cs *clientControlStream) CloseWithError(code uint64, errString string) error { |
||||
return closeWithError(cs.conn, code, errString) |
||||
} |
||||
|
||||
func closeStream(controlStream frame.Writer, streamID string, errString string) error { |
||||
f := frame.NewCloseStreamFrame(streamID, errString) |
||||
return controlStream.WriteFrame(f) |
||||
} |
||||
|
||||
func receiveStreamClose(controlStream frame.Reader) (streamID string, errString string, err error) { |
||||
f, err := controlStream.ReadFrame() |
||||
if err != nil { |
||||
return "", "", err |
||||
} |
||||
ff, ok := f.(*frame.CloseStreamFrame) |
||||
if !ok { |
||||
return "", "", errors.New("yomo: control stream only transmit close stream frame") |
||||
} |
||||
return ff.StreamID(), ff.Reason(), nil |
||||
} |
||||
|
||||
func closeWithError(qConn quic.Connection, code uint64, errString string) error { |
||||
return qConn.CloseWithError( |
||||
quic.ApplicationErrorCode(code), |
||||
errString, |
||||
) |
||||
} |
@ -0,0 +1,163 @@
|
||||
package network |
||||
|
||||
import ( |
||||
"context" |
||||
"github.com/quic-go/quic-go" |
||||
"io" |
||||
"sync" |
||||
"sync/atomic" |
||||
|
||||
"git.hpds.cc/Component/network/frame" |
||||
) |
||||
|
||||
// DataStream wraps the specific io streams (typically quic.Stream) to transfer frames.
|
||||
// DataStream be used to read and write frames, and be managed by Connector.
|
||||
type DataStream interface { |
||||
// Context returns context.Context to manages DataStream lifecycle.
|
||||
Context() context.Context |
||||
// Name returns the name of the stream, which is set by clients.
|
||||
Name() string |
||||
// ID represents the dataStream ID, the ID is an unique string.
|
||||
ID() string |
||||
// StreamType represents dataStream type (Source | SFN | UpstreamEmitter).
|
||||
StreamType() StreamType |
||||
// Metadata returns the extra info of the application
|
||||
Metadata() []byte |
||||
// Close real close DataStream,
|
||||
// The controlStream calls this function, If you want close a dataStream, to use
|
||||
// the CloseWithError api.
|
||||
io.Closer |
||||
// CloseWithError close DataStream with an error string,
|
||||
// This function do not real close the underlying stream, It notices controlStream to
|
||||
// close itself, The controlStream must close underlying stream after receive CloseStreamFrame.
|
||||
CloseWithError(string) error |
||||
// ReadWriter writes or reads frame to underlying stream.
|
||||
// Writing and Reading are both goroutine-safely handle frames to peer side.
|
||||
// ReadWriter returns stream closed error if stream is closed.
|
||||
frame.ReadWriter |
||||
// ObserveDataTags observed data tags.
|
||||
// TODO: There maybe a sorted list, we can find tag quickly.
|
||||
ObserveDataTags() []frame.Tag |
||||
} |
||||
|
||||
// TODO: dataStream sync.Pool wrap.
|
||||
type dataStream struct { |
||||
name string |
||||
id string |
||||
streamType StreamType |
||||
metadata []byte |
||||
observed []frame.Tag |
||||
|
||||
closed atomic.Bool |
||||
// mu protected stream write and close
|
||||
// because of quic stream write and close is not goroutinue-safely.
|
||||
mu sync.Mutex |
||||
stream quic.Stream |
||||
controlStream ControlStream |
||||
} |
||||
|
||||
// newDataStream constructures dataStream.
|
||||
func newDataStream( |
||||
name string, |
||||
id string, |
||||
streamType StreamType, |
||||
metadata []byte, |
||||
stream quic.Stream, |
||||
observed []frame.Tag, |
||||
controlStream ControlStream, |
||||
) DataStream { |
||||
return &dataStream{ |
||||
name: name, |
||||
id: id, |
||||
streamType: streamType, |
||||
metadata: metadata, |
||||
stream: stream, |
||||
observed: observed, |
||||
controlStream: controlStream, |
||||
} |
||||
} |
||||
|
||||
// DataStream implements.
|
||||
func (s *dataStream) Context() context.Context { return s.stream.Context() } |
||||
func (s *dataStream) ID() string { return s.id } |
||||
func (s *dataStream) Name() string { return s.name } |
||||
func (s *dataStream) Metadata() []byte { return s.metadata } |
||||
func (s *dataStream) StreamType() StreamType { return s.streamType } |
||||
func (s *dataStream) ObserveDataTags() []frame.Tag { return s.observed } |
||||
|
||||
func (s *dataStream) WriteFrame(frm frame.Frame) error { |
||||
if s.closed.Load() { |
||||
return io.EOF |
||||
} |
||||
|
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
_, err := s.stream.Write(frm.Encode()) |
||||
return err |
||||
} |
||||
|
||||
func (s *dataStream) ReadFrame() (frame.Frame, error) { |
||||
if s.closed.Load() { |
||||
return nil, io.EOF |
||||
} |
||||
return ParseFrame(s.stream) |
||||
} |
||||
|
||||
func (s *dataStream) Close() error { |
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
|
||||
// Close the stream truly,
|
||||
// This function should be called after controlStream receive a closeStreamFrame.
|
||||
return s.stream.Close() |
||||
} |
||||
|
||||
func (s *dataStream) CloseWithError(errString string) error { |
||||
if s.closed.Load() { |
||||
return nil |
||||
} |
||||
s.closed.Store(true) |
||||
|
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
|
||||
// Only notice client-side controlStream the stream has been closed.
|
||||
// The controlStream reads closeStreamFrame and to close dataStream.
|
||||
return s.controlStream.CloseStream(s.id, errString) |
||||
} |
||||
|
||||
const ( |
||||
// StreamTypeNone is stream type "None".
|
||||
// "None" stream is not supposed to be in the yomo system.
|
||||
StreamTypeNone StreamType = 0xFF |
||||
|
||||
// StreamTypeSource is stream type "Source".
|
||||
// "Source" type stream sends data to "Stream Function" stream generally.
|
||||
StreamTypeSource StreamType = 0x5F |
||||
|
||||
// StreamTypeUpstreamEmitter is connection type "Upstream Emitter".
|
||||
// "Upstream Emitter" type stream sends data from "Source" to other Emitter node.
|
||||
// With "Upstream Emitter", the yomo can run in mesh mode.
|
||||
StreamTypeUpstreamEmitter StreamType = 0x5E |
||||
|
||||
// StreamTypeStreamFunction is stream type "Stream Function".
|
||||
// "Stream Function" handles data from source.
|
||||
StreamTypeStreamFunction StreamType = 0x5D |
||||
) |
||||
|
||||
// StreamType represents the stream type.
|
||||
type StreamType byte |
||||
|
||||
// String returns string for StreamType.
|
||||
func (c StreamType) String() string { |
||||
switch c { |
||||
case StreamTypeSource: |
||||
return "Source" |
||||
case StreamTypeUpstreamEmitter: |
||||
return "Upstream Emitter" |
||||
case StreamTypeStreamFunction: |
||||
return "Stream Function" |
||||
default: |
||||
return "None" |
||||
} |
||||
} |
@ -0,0 +1,84 @@
|
||||
package frame |
||||
|
||||
import ( |
||||
coder "git.hpds.cc/Component/mq_coder" |
||||
) |
||||
|
||||
// AuthenticationFrame is used to authenticate the client,
|
||||
// Once the connection is established, the client immediately, sends information
|
||||
// to the server, server gets the way to authenticate according to authName and
|
||||
// use authPayload to do a authentication.
|
||||
//
|
||||
// AuthenticationFrame is a coder encoded.
|
||||
type AuthenticationFrame struct { |
||||
authName string |
||||
authPayload string |
||||
} |
||||
|
||||
// NewAuthenticationFrame creates a new AuthenticationFrame.
|
||||
func NewAuthenticationFrame(authName string, authPayload string) *AuthenticationFrame { |
||||
return &AuthenticationFrame{ |
||||
authName: authName, |
||||
authPayload: authPayload, |
||||
} |
||||
} |
||||
|
||||
// Type returns the type of AuthenticationFrame.
|
||||
func (h *AuthenticationFrame) Type() Type { |
||||
return TagOfAuthenticationFrame |
||||
} |
||||
|
||||
// Encode encodes AuthenticationFrame to bytes in coder codec.
|
||||
func (h *AuthenticationFrame) Encode() []byte { |
||||
// auth
|
||||
authNameBlock := coder.NewPrimitivePacketEncoder(byte(TagOfAuthenticationName)) |
||||
authNameBlock.SetStringValue(h.authName) |
||||
authPayloadBlock := coder.NewPrimitivePacketEncoder(byte(TagOfAuthenticationPayload)) |
||||
authPayloadBlock.SetStringValue(h.authPayload) |
||||
// authentication frame
|
||||
authentication := coder.NewNodePacketEncoder(byte(h.Type())) |
||||
authentication.AddPrimitivePacket(authNameBlock) |
||||
authentication.AddPrimitivePacket(authPayloadBlock) |
||||
|
||||
return authentication.Encode() |
||||
} |
||||
|
||||
// DecodeToAuthenticationFrame decodes coder encoded bytes to AuthenticationFrame.
|
||||
func DecodeToAuthenticationFrame(buf []byte) (*AuthenticationFrame, error) { |
||||
node := coder.NodePacket{} |
||||
_, err := coder.DecodeToNodePacket(buf, &node) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
authentication := &AuthenticationFrame{} |
||||
|
||||
// auth
|
||||
if authNameBlock, ok := node.PrimitivePackets[byte(TagOfAuthenticationName)]; ok { |
||||
authName, err := authNameBlock.ToUTF8String() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
authentication.authName = authName |
||||
} |
||||
if authPayloadBlock, ok := node.PrimitivePackets[byte(TagOfAuthenticationPayload)]; ok { |
||||
authPayload, err := authPayloadBlock.ToUTF8String() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
authentication.authPayload = authPayload |
||||
} |
||||
|
||||
return authentication, nil |
||||
} |
||||
|
||||
// AuthPayload returns authentication payload.
|
||||
func (h *AuthenticationFrame) AuthPayload() string { |
||||
return h.authPayload |
||||
} |
||||
|
||||
// AuthName returns authentication name,
|
||||
// server finds the mode of authentication in AuthName.
|
||||
func (h *AuthenticationFrame) AuthName() string { |
||||
return h.authName |
||||
} |
@ -0,0 +1,23 @@
|
||||
package frame |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
func TestAuthenticationFrame(t *testing.T) { |
||||
m := NewAuthenticationFrame("token", "a") |
||||
assert.Equal(t, []byte{ |
||||
0x80 | byte(TagOfAuthenticationFrame), 0xa, |
||||
byte(TagOfAuthenticationName), 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, |
||||
byte(TagOfAuthenticationPayload), 0x01, 0x61, |
||||
}, |
||||
m.Encode(), |
||||
) |
||||
|
||||
authenticate, err := DecodeToAuthenticationFrame(m.Encode()) |
||||
assert.NoError(t, err) |
||||
assert.EqualValues(t, "token", authenticate.AuthName()) |
||||
assert.EqualValues(t, "a", authenticate.AuthPayload()) |
||||
} |
@ -0,0 +1,77 @@
|
||||
package frame |
||||
|
||||
import ( |
||||
coder "git.hpds.cc/Component/mq_coder" |
||||
) |
||||
|
||||
// AuthenticationRespFrame is the response of Authentication.
|
||||
// AuthenticationRespFrame is a coder encoded bytes.
|
||||
type AuthenticationRespFrame struct { |
||||
ok bool |
||||
reason string |
||||
} |
||||
|
||||
// OK returns if Authentication is success.
|
||||
func (f *AuthenticationRespFrame) OK() bool { return f.ok } |
||||
|
||||
// Reason returns the failed reason of Authentication.
|
||||
func (f *AuthenticationRespFrame) Reason() string { return f.reason } |
||||
|
||||
// NewAuthenticationRespFrame returns a AuthenticationRespFrame.
|
||||
func NewAuthenticationRespFrame(ok bool, reason string) *AuthenticationRespFrame { |
||||
return &AuthenticationRespFrame{ |
||||
ok: ok, |
||||
reason: reason, |
||||
} |
||||
} |
||||
|
||||
// Type gets the type of the AuthenticationRespFrame.
|
||||
func (f *AuthenticationRespFrame) Type() Type { |
||||
return TagOfAuthenticationAckFrame |
||||
} |
||||
|
||||
// Encode encodes AuthenticationRespFrame to coder encoded bytes.
|
||||
func (f *AuthenticationRespFrame) Encode() []byte { |
||||
// ok
|
||||
okBlock := coder.NewPrimitivePacketEncoder(byte(TagOfAuthenticationAckOk)) |
||||
okBlock.SetBoolValue(f.ok) |
||||
// reason
|
||||
reasonBlock := coder.NewPrimitivePacketEncoder(byte(TagOfAuthenticationAckReason)) |
||||
reasonBlock.SetStringValue(f.reason) |
||||
// frame
|
||||
ack := coder.NewNodePacketEncoder(byte(f.Type())) |
||||
ack.AddPrimitivePacket(okBlock) |
||||
ack.AddPrimitivePacket(reasonBlock) |
||||
|
||||
return ack.Encode() |
||||
} |
||||
|
||||
// DecodeToAuthenticationRespFrame decodes coder encoded bytes to AuthenticationRespFrame.
|
||||
func DecodeToAuthenticationRespFrame(buf []byte) (*AuthenticationRespFrame, error) { |
||||
node := coder.NodePacket{} |
||||
_, err := coder.DecodeToNodePacket(buf, &node) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
f := &AuthenticationRespFrame{} |
||||
|
||||
// ok
|
||||
if okBlock, ok := node.PrimitivePackets[byte(TagOfAuthenticationAckOk)]; ok { |
||||
ok, err := okBlock.ToBool() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
f.ok = ok |
||||
} |
||||
// reason
|
||||
if reasonBlock, ok := node.PrimitivePackets[byte(TagOfAuthenticationAckReason)]; ok { |
||||
reason, err := reasonBlock.ToUTF8String() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
f.reason = reason |
||||
} |
||||
|
||||
return f, nil |
||||
} |
@ -0,0 +1,20 @@
|
||||
package frame |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
func TestAuthenticationAckFrame(t *testing.T) { |
||||
f := NewAuthenticationRespFrame(false, "aabbcc") |
||||
|
||||
bytes := f.Encode() |
||||
assert.Equal(t, []byte{0x91, 0xb, 0x12, 0x1, 0x0, 0x13, 0x6, 0x61, 0x61, 0x62, 0x62, 0x63, 0x63}, bytes) |
||||
|
||||
got, err := DecodeToAuthenticationRespFrame(bytes) |
||||
assert.Equal(t, f, got) |
||||
assert.NoError(t, err) |
||||
assert.EqualValues(t, false, f.OK()) |
||||
assert.EqualValues(t, "aabbcc", f.Reason()) |
||||
} |
@ -0,0 +1,33 @@
|
||||
package frame |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
func TestBackFlowFrameEncode(t *testing.T) { |
||||
var ( |
||||
tag = Tag(22) |
||||
carriage = []byte("hello backflow") |
||||
) |
||||
f := NewBackFlowFrame(tag, []byte{}) |
||||
|
||||
f.SetCarriage(carriage) |
||||
|
||||
assert.Equal(t, TagOfBackFlowFrame, f.Type()) |
||||
assert.Equal(t, f.GetCarriage(), carriage) |
||||
assert.Equal(t, f.GetDataTag(), tag) |
||||
assert.Equal(t, []byte{0xad, 0x13, 0x1, 0x1, 0x16, 0x2, 0xe, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x62, 0x61, 0x63, 0x6b, 0x66, 0x6c, 0x6f, 0x77}, f.Encode()) |
||||
} |
||||
|
||||
func TestBackflowFrameDecode(t *testing.T) { |
||||
f := NewBackFlowFrame(Tag(22), []byte("hello backflow")) |
||||
|
||||
buf := f.Encode() |
||||
|
||||
df, err := DecodeToBackFlowFrame(buf) |
||||
|
||||
assert.NoError(t, err) |
||||
assert.Equal(t, df, f) |
||||
} |
@ -0,0 +1,78 @@
|
||||
package frame |
||||
|
||||
import ( |
||||
coder "git.hpds.cc/Component/mq_coder" |
||||
) |
||||
|
||||
// CloseStreamFrame is used to close a dataStream, controlStream
|
||||
// receives CloseStreamFrame and closes dataStream according to the Frame.
|
||||
// CloseStreamFrame is a coder encoded bytes.
|
||||
type CloseStreamFrame struct { |
||||
streamID string |
||||
reason string |
||||
} |
||||
|
||||
// StreamID returns the ID of the stream to be closed.
|
||||
func (f *CloseStreamFrame) StreamID() string { return f.streamID } |
||||
|
||||
// Reason returns the close reason.
|
||||
func (f *CloseStreamFrame) Reason() string { return f.reason } |
||||
|
||||
// NewCloseStreamFrame returns a CloseStreamFrame.
|
||||
func NewCloseStreamFrame(streamID, reason string) *CloseStreamFrame { |
||||
return &CloseStreamFrame{ |
||||
streamID: streamID, |
||||
reason: reason, |
||||
} |
||||
} |
||||
|
||||
// Type gets the type of the CloseStreamFrame.
|
||||
func (f *CloseStreamFrame) Type() Type { |
||||
return TagOfCloseStreamFrame |
||||
} |
||||
|
||||
// Encode encodes CloseStreamFrame to coder encoded bytes.
|
||||
func (f *CloseStreamFrame) Encode() []byte { |
||||
// id
|
||||
idBlock := coder.NewPrimitivePacketEncoder(byte(TagOfCloseStreamID)) |
||||
idBlock.SetStringValue(f.streamID) |
||||
// reason
|
||||
reasonBlock := coder.NewPrimitivePacketEncoder(byte(TagOfCloseStreamReason)) |
||||
reasonBlock.SetStringValue(f.reason) |
||||
// frame
|
||||
ack := coder.NewNodePacketEncoder(byte(f.Type())) |
||||
ack.AddPrimitivePacket(idBlock) |
||||
ack.AddPrimitivePacket(reasonBlock) |
||||
|
||||
return ack.Encode() |
||||
} |
||||
|
||||
// DecodeToCloseStreamFrame decodes coder encoded bytes to CloseStreamFrame.
|
||||
func DecodeToCloseStreamFrame(buf []byte) (*CloseStreamFrame, error) { |
||||
node := coder.NodePacket{} |
||||
_, err := coder.DecodeToNodePacket(buf, &node) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
f := &CloseStreamFrame{} |
||||
|
||||
// id
|
||||
if idBlock, ok := node.PrimitivePackets[byte(TagOfCloseStreamID)]; ok { |
||||
id, err := idBlock.ToUTF8String() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
f.streamID = id |
||||
} |
||||
// reason
|
||||
if reasonBlock, ok := node.PrimitivePackets[byte(TagOfCloseStreamReason)]; ok { |
||||
reason, err := reasonBlock.ToUTF8String() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
f.reason = reason |
||||
} |
||||
|
||||
return f, nil |
||||
} |
@ -0,0 +1,20 @@
|
||||
package frame |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
func TestCloseStreamFrame(t *testing.T) { |
||||
f := NewCloseStreamFrame("eeffgg", "aabbcc") |
||||
|
||||
bytes := f.Encode() |
||||
assert.Equal(t, []byte{0x94, 0x10, 0x15, 0x6, 0x65, 0x65, 0x66, 0x66, 0x67, 0x67, 0x16, 0x6, 0x61, 0x61, 0x62, 0x62, 0x63, 0x63}, bytes) |
||||
|
||||
got, err := DecodeToCloseStreamFrame(bytes) |
||||
assert.Equal(t, f, got) |
||||
assert.NoError(t, err) |
||||
assert.EqualValues(t, "eeffgg", f.StreamID()) |
||||
assert.EqualValues(t, "aabbcc", f.Reason()) |
||||
} |
@ -0,0 +1,20 @@
|
||||
package frame |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
func TestGoawayFrameEncode(t *testing.T) { |
||||
f := NewGoawayFrame("goaway") |
||||
assert.Equal(t, "goaway", f.Message()) |
||||
assert.Equal(t, []byte{0x80 | byte(TagOfGoawayFrame), 0x8, 0x2, 0x6, 0x67, 0x6f, 0x61, 0x77, 0x61, 0x79}, f.Encode()) |
||||
} |
||||
|
||||
func TestGoawayFrameDecode(t *testing.T) { |
||||
buf := []byte{0x80 | byte(TagOfGoawayFrame), 0x8, 0x2, 0x6, 0x67, 0x6f, 0x61, 0x77, 0x61, 0x79} |
||||
f, err := DecodeToGoawayFrame(buf) |
||||
assert.NoError(t, err) |
||||
assert.Equal(t, []byte{0x80 | byte(TagOfGoawayFrame), 0x8, 0x2, 0x6, 0x67, 0x6f, 0x61, 0x77, 0x61, 0x79}, f.Encode()) |
||||
} |
@ -0,0 +1,59 @@
|
||||
package frame |
||||
|
||||
import ( |
||||
coder "git.hpds.cc/Component/mq_coder" |
||||
) |
||||
|
||||
// HandshakeAckFrame is used to ack handshake, It is always that the first frame
|
||||
// is HandshakeAckFrame after client acquire a new stream.
|
||||
// HandshakeAckFrame is a coder encoded bytes.
|
||||
type HandshakeAckFrame struct { |
||||
streamId string |
||||
} |
||||
|
||||
// NewHandshakeAckFrame returns a HandshakeAckFrame.
|
||||
func NewHandshakeAckFrame(streamId string) *HandshakeAckFrame { |
||||
return &HandshakeAckFrame{streamId} |
||||
} |
||||
|
||||
// Type gets the type of the HandshakeAckFrame.
|
||||
func (f *HandshakeAckFrame) Type() Type { |
||||
return TagOfHandshakeAckFrame |
||||
} |
||||
|
||||
// StreamId returns the id of stream be acked.
|
||||
func (f *HandshakeAckFrame) StreamId() string { |
||||
return f.streamId |
||||
} |
||||
|
||||
// Encode encodes HandshakeAckFrame to coder encoded bytes.
|
||||
func (f *HandshakeAckFrame) Encode() []byte { |
||||
ack := coder.NewNodePacketEncoder(byte(f.Type())) |
||||
// streamId
|
||||
streamIDBlock := coder.NewPrimitivePacketEncoder(byte(TagOfHandshakeAckStreamId)) |
||||
streamIDBlock.SetStringValue(f.streamId) |
||||
|
||||
ack.AddPrimitivePacket(streamIDBlock) |
||||
|
||||
return ack.Encode() |
||||
} |
||||
|
||||
// DecodeToHandshakeAckFrame decodes coder encoded bytes to HandshakeAckFrame
|
||||
func DecodeToHandshakeAckFrame(buf []byte) (*HandshakeAckFrame, error) { |
||||
node := coder.NodePacket{} |
||||
_, err := coder.DecodeToNodePacket(buf, &node) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
ack := &HandshakeAckFrame{} |
||||
// streamID
|
||||
if streamIDBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeAckStreamId)]; ok { |
||||
streamId, err := streamIDBlock.ToUTF8String() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
ack.streamId = streamId |
||||
} |
||||
return ack, nil |
||||
} |
@ -0,0 +1,25 @@
|
||||
package frame |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
var handShakeAckTestBuf = []byte{0x80 | byte(TagOfHandshakeAckFrame), 0x8, 0x28, 0x6, 0x74, 0x68, 0x65, 0x2d, 0x69, 0x64} |
||||
|
||||
var testStreamID = "the-id" |
||||
|
||||
func TestHandshakeAckFrameEncode(t *testing.T) { |
||||
f := NewHandshakeAckFrame(testStreamID) |
||||
assert.Equal(t, TagOfHandshakeAckFrame, f.Type()) |
||||
assert.Equal(t, handShakeAckTestBuf, f.Encode()) |
||||
} |
||||
|
||||
func TestHandshakeAckFrameDecode(t *testing.T) { |
||||
f, err := DecodeToHandshakeAckFrame(handShakeAckTestBuf) |
||||
assert.NoError(t, err) |
||||
assert.Equal(t, TagOfHandshakeAckFrame, f.Type()) |
||||
assert.Equal(t, testStreamID, f.StreamId()) |
||||
assert.Equal(t, handShakeAckTestBuf, f.Encode()) |
||||
} |
@ -1,17 +0,0 @@
|
||||
package network |
||||
|
||||
import "git.hpds.cc/Component/network/frame" |
||||
|
||||
// Metadata is used for storing extra info of the application
|
||||
type Metadata interface { |
||||
// Encode is the serialize method
|
||||
Encode() []byte |
||||
} |
||||
|
||||
// MetadataBuilder is the builder of Metadata
|
||||
type MetadataBuilder interface { |
||||
// Build will return a Metadata instance according to the handshake frame passed in
|
||||
Build(f *frame.HandshakeFrame) (Metadata, error) |
||||
// Decode is the deserialize method
|
||||
Decode(buf []byte) (Metadata, error) |
||||
} |
@ -0,0 +1,38 @@
|
||||
// Package metadata provides a default implements of `Metadata`.
|
||||
package metadata |
||||
|
||||
import ( |
||||
"git.hpds.cc/Component/network/frame" |
||||
) |
||||
|
||||
var _ Metadata = &Default{} |
||||
|
||||
// Default returns an implement of `Metadata`,
|
||||
// the default `Metadata` do not store anything.
|
||||
type Default struct{} |
||||
|
||||
// Encode returns nil, It indicates the application do not have metadata.
|
||||
func (m *Default) Encode() []byte { |
||||
return nil |
||||
} |
||||
|
||||
type defaultBuilder struct { |
||||
m *Default |
||||
} |
||||
|
||||
// DefaultBuilder returns an implement of `Builder`,
|
||||
// the default builder only return default `Metadata`, the default `Metadata`
|
||||
// do not store anything.
|
||||
func DefaultBuilder() Builder { |
||||
return &defaultBuilder{ |
||||
m: &Default{}, |
||||
} |
||||
} |
||||
|
||||
func (builder *defaultBuilder) Build(f *frame.HandshakeFrame) (Metadata, error) { |
||||
return builder.m, nil |
||||
} |
||||
|
||||
func (builder *defaultBuilder) Decode(buf []byte) (Metadata, error) { |
||||
return builder.m, nil |
||||
} |
@ -0,0 +1,21 @@
|
||||
package metadata |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
func TestMetadata(t *testing.T) { |
||||
builder := DefaultBuilder() |
||||
|
||||
m, err := builder.Build(nil) |
||||
|
||||
assert.NoError(t, err) |
||||
assert.Equal(t, []uint8([]byte(nil)), m.Encode()) |
||||
|
||||
de, err := builder.Decode([]byte{}) |
||||
|
||||
assert.NoError(t, err) |
||||
assert.Equal(t, m, de) |
||||
} |
@ -0,0 +1,21 @@
|
||||
// Package metadata defines `Metadata` and the `Builder`.
|
||||
package metadata |
||||
|
||||
import "git.hpds.cc/Component/network/frame" |
||||
|
||||
// Metadata is used for storing extra info of the application.
|
||||
type Metadata interface { |
||||
// Encode is the serialize method,
|
||||
// That represents the Metadata can be transmitted.
|
||||
Encode() []byte |
||||
} |
||||
|
||||
// Builder is the builder of Metadata.
|
||||
// the metadata usually be built from `HandshakeFrame`,
|
||||
// and It can be decode as byte array for io transmission.
|
||||
type Builder interface { |
||||
// Build returns a Metadata instance according to the handshake frame passed in.
|
||||
Build(f *frame.HandshakeFrame) (Metadata, error) |
||||
// Decode is the deserialize method
|
||||
Decode(buf []byte) (Metadata, error) |
||||
} |
@ -0,0 +1,112 @@
|
||||
// Package router providers a default implement of `router` and `Route`.
|
||||
package router |
||||
|
||||
import ( |
||||
"fmt" |
||||
"sync" |
||||
|
||||
"git.hpds.cc/Component/network/frame" |
||||
herr "git.hpds.cc/Component/network/hpds_err" |
||||
"git.hpds.cc/Component/network/metadata" |
||||
) |
||||
|
||||
// DefaultRouter providers a default implement of `router`,
|
||||
// It routes the data according to obverse tag or connId.
|
||||
type DefaultRouter struct { |
||||
r *defaultRoute |
||||
} |
||||
|
||||
// Default return the DefaultRouter.
|
||||
func Default(functions []string) Router { |
||||
return &DefaultRouter{r: newRoute(functions)} |
||||
} |
||||
|
||||
// Route get route from metadata.
|
||||
func (r *DefaultRouter) Route(metadata metadata.Metadata) Route { |
||||
return r.r |
||||
} |
||||
|
||||
// Clean router.
|
||||
func (r *DefaultRouter) Clean() { |
||||
r.r.mu.Lock() |
||||
defer r.r.mu.Unlock() |
||||
|
||||
for key := range r.r.data { |
||||
delete(r.r.data, key) |
||||
} |
||||
} |
||||
|
||||
type defaultRoute struct { |
||||
functions []string |
||||
data map[frame.Tag]map[string]string |
||||
mu sync.RWMutex |
||||
} |
||||
|
||||
func newRoute(functions []string) *defaultRoute { |
||||
return &defaultRoute{ |
||||
functions: functions, |
||||
data: make(map[frame.Tag]map[string]string), |
||||
} |
||||
} |
||||
|
||||
func (r *defaultRoute) Add(connId string, name string, observeDataTags []frame.Tag) (err error) { |
||||
r.mu.Lock() |
||||
defer r.mu.Unlock() |
||||
|
||||
ok := false |
||||
for _, v := range r.functions { |
||||
if v == name { |
||||
ok = true |
||||
break |
||||
} |
||||
} |
||||
if !ok { |
||||
return fmt.Errorf("SFN[%s] does not exist in config functions", name) |
||||
} |
||||
|
||||
LOOP: |
||||
for _, conn := range r.data { |
||||
for connId, n := range conn { |
||||
if n == name { |
||||
err = herr.NewDuplicateNameError(connId, fmt.Errorf("SFN[%s] is already linked to another connection", name)) |
||||
delete(conn, connId) |
||||
break LOOP |
||||
} |
||||
} |
||||
} |
||||
|
||||
for _, tag := range observeDataTags { |
||||
conn := r.data[tag] |
||||
if conn == nil { |
||||
conn = make(map[string]string) |
||||
r.data[tag] = conn |
||||
} |
||||
r.data[tag][connId] = name |
||||
} |
||||
|
||||
return err |
||||
} |
||||
|
||||
func (r *defaultRoute) Remove(connId string) error { |
||||
r.mu.Lock() |
||||
defer r.mu.Unlock() |
||||
|
||||
for _, conn := range r.data { |
||||
delete(conn, connId) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (r *defaultRoute) GetForwardRoutes(tag frame.Tag) []string { |
||||
r.mu.RLock() |
||||
defer r.mu.RUnlock() |
||||
|
||||
var keys []string |
||||
if conn := r.data[tag]; conn != nil { |
||||
for k := range conn { |
||||
keys = append(keys, k) |
||||
} |
||||
} |
||||
return keys |
||||
} |
@ -0,0 +1,40 @@
|
||||
package router |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"git.hpds.cc/Component/network/frame" |
||||
"git.hpds.cc/Component/network/metadata" |
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
func TestRouter(t *testing.T) { |
||||
router := Default([]string{"sfn-1"}) |
||||
|
||||
m := &metadata.Default{} |
||||
|
||||
route := router.Route(m) |
||||
|
||||
err := route.Add("conn-1", "sfn-1", []frame.Tag{frame.Tag(1)}) |
||||
assert.NoError(t, err) |
||||
|
||||
ids := route.GetForwardRoutes(frame.Tag(1)) |
||||
assert.Equal(t, []string{"conn-1"}, ids) |
||||
|
||||
err = route.Add("conn-2", "sfn-2", []frame.Tag{frame.Tag(2)}) |
||||
assert.EqualError(t, err, "SFN[sfn-2] does not exist in config functions") |
||||
|
||||
err = route.Add("conn-3", "sfn-1", []frame.Tag{frame.Tag(1)}) |
||||
assert.EqualError(t, err, "SFN[sfn-1] is already linked to another connection") |
||||
|
||||
err = route.Remove("conn-1") |
||||
assert.NoError(t, err) |
||||
|
||||
ids = route.GetForwardRoutes(frame.Tag(1)) |
||||
assert.Equal(t, []string{"conn-3"}, ids) |
||||
|
||||
router.Clean() |
||||
|
||||
ids = route.GetForwardRoutes(frame.Tag(1)) |
||||
assert.Equal(t, []string(nil), ids) |
||||
} |
@ -0,0 +1,57 @@
|
||||
package network |
||||
|
||||
import ( |
||||
"context" |
||||
"sync" |
||||
) |
||||
|
||||
// StreamGroup is the group of stream includes ControlStream amd DataStream.
|
||||
// One Connection has many DataStream and only one ControlStream, ControlStream authenticates
|
||||
// Connection and recevies HandshakeFrame and CloseStreamFrame to create DataStream or close
|
||||
// stream. the ControlStream always the first stream established between server and client.
|
||||
type StreamGroup struct { |
||||
ctx context.Context |
||||
controlStream ServerControlStream |
||||
connector *Connector |
||||
group sync.WaitGroup |
||||
} |
||||
|
||||
// NewStreamGroup returns StreamGroup.
|
||||
func NewStreamGroup(ctx context.Context, controlStream ServerControlStream, connector *Connector) *StreamGroup { |
||||
group := &StreamGroup{ |
||||
ctx: ctx, |
||||
controlStream: controlStream, |
||||
connector: connector, |
||||
} |
||||
return group |
||||
} |
||||
|
||||
// Run run contextFunc with connector.
|
||||
// Run continus Accepts DataStream and create a Context to run with contextFunc.
|
||||
// TODO: run in aop model, like setMetadata -> handleRoute -> before -> handle -> after.
|
||||
func (g *StreamGroup) Run(contextFunc func(c *Context)) error { |
||||
for { |
||||
dataStream, err := g.controlStream.AcceptStream(g.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
g.group.Add(1) |
||||
_ = g.connector.Add(dataStream.ID(), dataStream) |
||||
|
||||
go func() { |
||||
defer func() { |
||||
g.group.Done() |
||||
_ = g.connector.Remove(dataStream.ID()) |
||||
}() |
||||
|
||||
c := newContext(dataStream) |
||||
defer c.Clean() |
||||
|
||||
contextFunc(c) |
||||
}() |
||||
} |
||||
} |
||||
|
||||
// Wait waits all dataStream down.
|
||||
func (g *StreamGroup) Wait() { g.group.Wait() } |
Loading…
Reference in new issue