You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

135 lines
3.3 KiB

package network
import (
"context"
"errors"
"git.hpds.cc/Component/network/frame"
"git.hpds.cc/Component/network/log"
"sync"
)
// ErrConnectorClosed will be returned if the connector has been closed.
var ErrConnectorClosed = errors.New("hpdsMq: connector closed")
// The Connector class manages data streams and provides a centralized way to get and set streams.
type Connector struct {
// ctx and ctxCancel manage the lifescyle of Connector.
ctx context.Context
ctxCancel context.CancelFunc
streams sync.Map
}
// NewConnector returns an initial Connector.
func NewConnector(ctx context.Context) *Connector {
ctx, ctxCancel := context.WithCancel(ctx)
return &Connector{
ctx: ctx,
ctxCancel: ctxCancel,
}
}
// Add adds DataStream to Connector,
// If the streamID is the same twice, the new stream will replace the old stream.
func (c *Connector) Add(streamId string, stream DataStream) error {
select {
case <-c.ctx.Done():
return ErrConnectorClosed
default:
}
c.streams.Store(streamId, stream)
log.Debugf("%s Connector add stream, stream_id: %s", ParseFrameLogPrefix, streamId)
return nil
}
// Remove removes the DataStream with the specified streamID.
// If the Connector does not have a stream with the given streamID, no action is taken.
func (c *Connector) Remove(streamId string) error {
select {
case <-c.ctx.Done():
return ErrConnectorClosed
default:
}
c.streams.Delete(streamId)
log.Debugf("%s Connector remove stream, stream_id: %s", ParseFrameLogPrefix, streamId)
return nil
}
// Get retrieves the DataStream with the specified streamID.
// If the Connector does not have a stream with the given streamID, return nil and false.
func (c *Connector) Get(streamId string) (DataStream, bool, error) {
select {
case <-c.ctx.Done():
return nil, false, ErrConnectorClosed
default:
}
v, ok := c.streams.Load(streamId)
if !ok {
return nil, false, nil
}
stream := v.(DataStream)
return stream, true, nil
}
// GetSourceConns gets the streams with the specified source observe tag.
func (c *Connector) GetSourceConns(sourceId string, tag frame.Tag) ([]DataStream, error) {
select {
case <-c.ctx.Done():
return []DataStream{}, ErrConnectorClosed
default:
}
streams := make([]DataStream, 0)
c.streams.Range(func(key interface{}, val interface{}) bool {
stream := val.(DataStream)
for _, v := range stream.ObserveDataTags() {
if v == tag &&
stream.StreamType() == StreamTypeSource &&
stream.ID() == sourceId {
streams = append(streams, stream)
}
}
return true
})
return streams, nil
}
// GetSnapshot returnsa snapshot of all streams.
// The resulting map uses streamID as the key and stream name as the value.
// This function is typically used to monitor the status of the Connector.
func (c *Connector) GetSnapshot() map[string]string {
result := make(map[string]string)
c.streams.Range(func(key interface{}, val interface{}) bool {
var (
streamID = key.(string)
stream = val.(DataStream)
)
result[streamID] = stream.Name()
return true
})
return result
}
// Close cleans all stream of Connector and reset Connector to closed status.
// The Connector can't be use after close.
func (c *Connector) Close() {
c.ctxCancel()
c.streams.Range(func(key, value any) bool {
c.streams.Delete(key)
return true
})
}