136 lines
3.3 KiB
Go
136 lines
3.3 KiB
Go
package network
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"git.hpds.cc/Component/network/frame"
|
|
"git.hpds.cc/Component/network/log"
|
|
"sync"
|
|
)
|
|
|
|
// ErrConnectorClosed will be returned if the connector has been closed.
|
|
var ErrConnectorClosed = errors.New("hpdsMq: connector closed")
|
|
|
|
// The Connector class manages data streams and provides a centralized way to get and set streams.
|
|
type Connector struct {
|
|
// ctx and ctxCancel manage the lifescyle of Connector.
|
|
ctx context.Context
|
|
ctxCancel context.CancelFunc
|
|
|
|
streams sync.Map
|
|
}
|
|
|
|
// NewConnector returns an initial Connector.
|
|
func NewConnector(ctx context.Context) *Connector {
|
|
ctx, ctxCancel := context.WithCancel(ctx)
|
|
|
|
return &Connector{
|
|
ctx: ctx,
|
|
ctxCancel: ctxCancel,
|
|
}
|
|
}
|
|
|
|
// Add adds DataStream to Connector,
|
|
// If the streamID is the same twice, the new stream will replace the old stream.
|
|
func (c *Connector) Add(streamId string, stream DataStream) error {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return ErrConnectorClosed
|
|
default:
|
|
}
|
|
|
|
c.streams.Store(streamId, stream)
|
|
|
|
log.Debugf("%s Connector add stream, stream_id: %s", ParseFrameLogPrefix, streamId)
|
|
return nil
|
|
}
|
|
|
|
// Remove removes the DataStream with the specified streamID.
|
|
// If the Connector does not have a stream with the given streamID, no action is taken.
|
|
func (c *Connector) Remove(streamId string) error {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return ErrConnectorClosed
|
|
default:
|
|
}
|
|
|
|
c.streams.Delete(streamId)
|
|
log.Debugf("%s Connector remove stream, stream_id: %s", ParseFrameLogPrefix, streamId)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get retrieves the DataStream with the specified streamID.
|
|
// If the Connector does not have a stream with the given streamID, return nil and false.
|
|
func (c *Connector) Get(streamId string) (DataStream, bool, error) {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return nil, false, ErrConnectorClosed
|
|
default:
|
|
}
|
|
|
|
v, ok := c.streams.Load(streamId)
|
|
if !ok {
|
|
return nil, false, nil
|
|
}
|
|
|
|
stream := v.(DataStream)
|
|
|
|
return stream, true, nil
|
|
}
|
|
|
|
// GetSourceConns gets the streams with the specified source observe tag.
|
|
func (c *Connector) GetSourceConns(sourceId string, tag frame.Tag) ([]DataStream, error) {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return []DataStream{}, ErrConnectorClosed
|
|
default:
|
|
}
|
|
|
|
streams := make([]DataStream, 0)
|
|
|
|
c.streams.Range(func(key interface{}, val interface{}) bool {
|
|
stream := val.(DataStream)
|
|
|
|
for _, v := range stream.ObserveDataTags() {
|
|
if v == tag &&
|
|
stream.StreamType() == StreamTypeSource &&
|
|
stream.ID() == sourceId {
|
|
streams = append(streams, stream)
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
|
|
return streams, nil
|
|
}
|
|
|
|
// GetSnapshot returnsa snapshot of all streams.
|
|
// The resulting map uses streamID as the key and stream name as the value.
|
|
// This function is typically used to monitor the status of the Connector.
|
|
func (c *Connector) GetSnapshot() map[string]string {
|
|
result := make(map[string]string)
|
|
|
|
c.streams.Range(func(key interface{}, val interface{}) bool {
|
|
var (
|
|
streamID = key.(string)
|
|
stream = val.(DataStream)
|
|
)
|
|
result[streamID] = stream.Name()
|
|
return true
|
|
})
|
|
|
|
return result
|
|
}
|
|
|
|
// Close cleans all stream of Connector and reset Connector to closed status.
|
|
// The Connector can't be use after close.
|
|
func (c *Connector) Close() {
|
|
c.ctxCancel()
|
|
|
|
c.streams.Range(func(key, value any) bool {
|
|
c.streams.Delete(key)
|
|
return true
|
|
})
|
|
}
|