You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
163 lines
4.7 KiB
163 lines
4.7 KiB
package network |
|
|
|
import ( |
|
"context" |
|
"github.com/quic-go/quic-go" |
|
"io" |
|
"sync" |
|
"sync/atomic" |
|
|
|
"git.hpds.cc/Component/network/frame" |
|
) |
|
|
|
// DataStream wraps the specific io streams (typically quic.Stream) to transfer frames. |
|
// DataStream be used to read and write frames, and be managed by Connector. |
|
type DataStream interface { |
|
// Context returns context.Context to manages DataStream lifecycle. |
|
Context() context.Context |
|
// Name returns the name of the stream, which is set by clients. |
|
Name() string |
|
// ID represents the dataStream ID, the ID is an unique string. |
|
ID() string |
|
// StreamType represents dataStream type (Source | SFN | UpstreamEmitter). |
|
StreamType() StreamType |
|
// Metadata returns the extra info of the application |
|
Metadata() []byte |
|
// Close real close DataStream, |
|
// The controlStream calls this function, If you want close a dataStream, to use |
|
// the CloseWithError api. |
|
io.Closer |
|
// CloseWithError close DataStream with an error string, |
|
// This function do not real close the underlying stream, It notices controlStream to |
|
// close itself, The controlStream must close underlying stream after receive CloseStreamFrame. |
|
CloseWithError(string) error |
|
// ReadWriter writes or reads frame to underlying stream. |
|
// Writing and Reading are both goroutine-safely handle frames to peer side. |
|
// ReadWriter returns stream closed error if stream is closed. |
|
frame.ReadWriter |
|
// ObserveDataTags observed data tags. |
|
// TODO: There maybe a sorted list, we can find tag quickly. |
|
ObserveDataTags() []frame.Tag |
|
} |
|
|
|
// TODO: dataStream sync.Pool wrap. |
|
type dataStream struct { |
|
name string |
|
id string |
|
streamType StreamType |
|
metadata []byte |
|
observed []frame.Tag |
|
|
|
closed atomic.Bool |
|
// mu protected stream write and close |
|
// because of quic stream write and close is not goroutinue-safely. |
|
mu sync.Mutex |
|
stream quic.Stream |
|
controlStream ControlStream |
|
} |
|
|
|
// newDataStream constructures dataStream. |
|
func newDataStream( |
|
name string, |
|
id string, |
|
streamType StreamType, |
|
metadata []byte, |
|
stream quic.Stream, |
|
observed []frame.Tag, |
|
controlStream ControlStream, |
|
) DataStream { |
|
return &dataStream{ |
|
name: name, |
|
id: id, |
|
streamType: streamType, |
|
metadata: metadata, |
|
stream: stream, |
|
observed: observed, |
|
controlStream: controlStream, |
|
} |
|
} |
|
|
|
// DataStream implements. |
|
func (s *dataStream) Context() context.Context { return s.stream.Context() } |
|
func (s *dataStream) ID() string { return s.id } |
|
func (s *dataStream) Name() string { return s.name } |
|
func (s *dataStream) Metadata() []byte { return s.metadata } |
|
func (s *dataStream) StreamType() StreamType { return s.streamType } |
|
func (s *dataStream) ObserveDataTags() []frame.Tag { return s.observed } |
|
|
|
func (s *dataStream) WriteFrame(frm frame.Frame) error { |
|
if s.closed.Load() { |
|
return io.EOF |
|
} |
|
|
|
s.mu.Lock() |
|
defer s.mu.Unlock() |
|
_, err := s.stream.Write(frm.Encode()) |
|
return err |
|
} |
|
|
|
func (s *dataStream) ReadFrame() (frame.Frame, error) { |
|
if s.closed.Load() { |
|
return nil, io.EOF |
|
} |
|
return ParseFrame(s.stream) |
|
} |
|
|
|
func (s *dataStream) Close() error { |
|
s.mu.Lock() |
|
defer s.mu.Unlock() |
|
|
|
// Close the stream truly, |
|
// This function should be called after controlStream receive a closeStreamFrame. |
|
return s.stream.Close() |
|
} |
|
|
|
func (s *dataStream) CloseWithError(errString string) error { |
|
if s.closed.Load() { |
|
return nil |
|
} |
|
s.closed.Store(true) |
|
|
|
s.mu.Lock() |
|
defer s.mu.Unlock() |
|
|
|
// Only notice client-side controlStream the stream has been closed. |
|
// The controlStream reads closeStreamFrame and to close dataStream. |
|
return s.controlStream.CloseStream(s.id, errString) |
|
} |
|
|
|
const ( |
|
// StreamTypeNone is stream type "None". |
|
// "None" stream is not supposed to be in the yomo system. |
|
StreamTypeNone StreamType = 0xFF |
|
|
|
// StreamTypeSource is stream type "Source". |
|
// "Source" type stream sends data to "Stream Function" stream generally. |
|
StreamTypeSource StreamType = 0x5F |
|
|
|
// StreamTypeUpstreamEmitter is connection type "Upstream Emitter". |
|
// "Upstream Emitter" type stream sends data from "Source" to other Emitter node. |
|
// With "Upstream Emitter", the yomo can run in mesh mode. |
|
StreamTypeUpstreamEmitter StreamType = 0x5E |
|
|
|
// StreamTypeStreamFunction is stream type "Stream Function". |
|
// "Stream Function" handles data from source. |
|
StreamTypeStreamFunction StreamType = 0x5D |
|
) |
|
|
|
// StreamType represents the stream type. |
|
type StreamType byte |
|
|
|
// String returns string for StreamType. |
|
func (c StreamType) String() string { |
|
switch c { |
|
case StreamTypeSource: |
|
return "Source" |
|
case StreamTypeUpstreamEmitter: |
|
return "Upstream Emitter" |
|
case StreamTypeStreamFunction: |
|
return "Stream Function" |
|
default: |
|
return "None" |
|
} |
|
}
|
|
|