network/data_stream.go

164 lines
4.7 KiB
Go
Raw Permalink Normal View History

2023-04-05 16:15:59 +08:00
package network
import (
"context"
"github.com/quic-go/quic-go"
"io"
"sync"
"sync/atomic"
"git.hpds.cc/Component/network/frame"
)
// DataStream wraps the specific io streams (typically quic.Stream) to transfer frames.
// DataStream be used to read and write frames, and be managed by Connector.
type DataStream interface {
// Context returns context.Context to manages DataStream lifecycle.
Context() context.Context
// Name returns the name of the stream, which is set by clients.
Name() string
// ID represents the dataStream ID, the ID is an unique string.
ID() string
// StreamType represents dataStream type (Source | SFN | UpstreamEmitter).
StreamType() StreamType
// Metadata returns the extra info of the application
Metadata() []byte
// Close real close DataStream,
// The controlStream calls this function, If you want close a dataStream, to use
// the CloseWithError api.
io.Closer
// CloseWithError close DataStream with an error string,
// This function do not real close the underlying stream, It notices controlStream to
// close itself, The controlStream must close underlying stream after receive CloseStreamFrame.
CloseWithError(string) error
// ReadWriter writes or reads frame to underlying stream.
// Writing and Reading are both goroutine-safely handle frames to peer side.
// ReadWriter returns stream closed error if stream is closed.
frame.ReadWriter
// ObserveDataTags observed data tags.
// TODO: There maybe a sorted list, we can find tag quickly.
ObserveDataTags() []frame.Tag
}
// TODO: dataStream sync.Pool wrap.
type dataStream struct {
name string
id string
streamType StreamType
metadata []byte
observed []frame.Tag
closed atomic.Bool
// mu protected stream write and close
// because of quic stream write and close is not goroutinue-safely.
mu sync.Mutex
stream quic.Stream
controlStream ControlStream
}
// newDataStream constructures dataStream.
func newDataStream(
name string,
id string,
streamType StreamType,
metadata []byte,
stream quic.Stream,
observed []frame.Tag,
controlStream ControlStream,
) DataStream {
return &dataStream{
name: name,
id: id,
streamType: streamType,
metadata: metadata,
stream: stream,
observed: observed,
controlStream: controlStream,
}
}
// DataStream implements.
func (s *dataStream) Context() context.Context { return s.stream.Context() }
func (s *dataStream) ID() string { return s.id }
func (s *dataStream) Name() string { return s.name }
func (s *dataStream) Metadata() []byte { return s.metadata }
func (s *dataStream) StreamType() StreamType { return s.streamType }
func (s *dataStream) ObserveDataTags() []frame.Tag { return s.observed }
func (s *dataStream) WriteFrame(frm frame.Frame) error {
if s.closed.Load() {
return io.EOF
}
s.mu.Lock()
defer s.mu.Unlock()
_, err := s.stream.Write(frm.Encode())
return err
}
func (s *dataStream) ReadFrame() (frame.Frame, error) {
if s.closed.Load() {
return nil, io.EOF
}
return ParseFrame(s.stream)
}
func (s *dataStream) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
// Close the stream truly,
// This function should be called after controlStream receive a closeStreamFrame.
return s.stream.Close()
}
func (s *dataStream) CloseWithError(errString string) error {
if s.closed.Load() {
return nil
}
s.closed.Store(true)
s.mu.Lock()
defer s.mu.Unlock()
// Only notice client-side controlStream the stream has been closed.
// The controlStream reads closeStreamFrame and to close dataStream.
return s.controlStream.CloseStream(s.id, errString)
}
const (
// StreamTypeNone is stream type "None".
// "None" stream is not supposed to be in the yomo system.
StreamTypeNone StreamType = 0xFF
// StreamTypeSource is stream type "Source".
// "Source" type stream sends data to "Stream Function" stream generally.
StreamTypeSource StreamType = 0x5F
// StreamTypeUpstreamEmitter is connection type "Upstream Emitter".
// "Upstream Emitter" type stream sends data from "Source" to other Emitter node.
// With "Upstream Emitter", the yomo can run in mesh mode.
StreamTypeUpstreamEmitter StreamType = 0x5E
// StreamTypeStreamFunction is stream type "Stream Function".
// "Stream Function" handles data from source.
StreamTypeStreamFunction StreamType = 0x5D
)
// StreamType represents the stream type.
type StreamType byte
// String returns string for StreamType.
func (c StreamType) String() string {
switch c {
case StreamTypeSource:
return "Source"
case StreamTypeUpstreamEmitter:
return "Upstream Emitter"
case StreamTypeStreamFunction:
return "Stream Function"
default:
return "None"
}
}