You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
135 lines
3.3 KiB
135 lines
3.3 KiB
package network |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"git.hpds.cc/Component/network/frame" |
|
"git.hpds.cc/Component/network/log" |
|
"sync" |
|
) |
|
|
|
// ErrConnectorClosed will be returned if the connector has been closed. |
|
var ErrConnectorClosed = errors.New("hpdsMq: connector closed") |
|
|
|
// The Connector class manages data streams and provides a centralized way to get and set streams. |
|
type Connector struct { |
|
// ctx and ctxCancel manage the lifescyle of Connector. |
|
ctx context.Context |
|
ctxCancel context.CancelFunc |
|
|
|
streams sync.Map |
|
} |
|
|
|
// NewConnector returns an initial Connector. |
|
func NewConnector(ctx context.Context) *Connector { |
|
ctx, ctxCancel := context.WithCancel(ctx) |
|
|
|
return &Connector{ |
|
ctx: ctx, |
|
ctxCancel: ctxCancel, |
|
} |
|
} |
|
|
|
// Add adds DataStream to Connector, |
|
// If the streamID is the same twice, the new stream will replace the old stream. |
|
func (c *Connector) Add(streamId string, stream DataStream) error { |
|
select { |
|
case <-c.ctx.Done(): |
|
return ErrConnectorClosed |
|
default: |
|
} |
|
|
|
c.streams.Store(streamId, stream) |
|
|
|
log.Debugf("%s Connector add stream, stream_id: %s", ParseFrameLogPrefix, streamId) |
|
return nil |
|
} |
|
|
|
// Remove removes the DataStream with the specified streamID. |
|
// If the Connector does not have a stream with the given streamID, no action is taken. |
|
func (c *Connector) Remove(streamId string) error { |
|
select { |
|
case <-c.ctx.Done(): |
|
return ErrConnectorClosed |
|
default: |
|
} |
|
|
|
c.streams.Delete(streamId) |
|
log.Debugf("%s Connector remove stream, stream_id: %s", ParseFrameLogPrefix, streamId) |
|
|
|
return nil |
|
} |
|
|
|
// Get retrieves the DataStream with the specified streamID. |
|
// If the Connector does not have a stream with the given streamID, return nil and false. |
|
func (c *Connector) Get(streamId string) (DataStream, bool, error) { |
|
select { |
|
case <-c.ctx.Done(): |
|
return nil, false, ErrConnectorClosed |
|
default: |
|
} |
|
|
|
v, ok := c.streams.Load(streamId) |
|
if !ok { |
|
return nil, false, nil |
|
} |
|
|
|
stream := v.(DataStream) |
|
|
|
return stream, true, nil |
|
} |
|
|
|
// GetSourceConns gets the streams with the specified source observe tag. |
|
func (c *Connector) GetSourceConns(sourceId string, tag frame.Tag) ([]DataStream, error) { |
|
select { |
|
case <-c.ctx.Done(): |
|
return []DataStream{}, ErrConnectorClosed |
|
default: |
|
} |
|
|
|
streams := make([]DataStream, 0) |
|
|
|
c.streams.Range(func(key interface{}, val interface{}) bool { |
|
stream := val.(DataStream) |
|
|
|
for _, v := range stream.ObserveDataTags() { |
|
if v == tag && |
|
stream.StreamType() == StreamTypeSource && |
|
stream.ID() == sourceId { |
|
streams = append(streams, stream) |
|
} |
|
} |
|
return true |
|
}) |
|
|
|
return streams, nil |
|
} |
|
|
|
// GetSnapshot returnsa snapshot of all streams. |
|
// The resulting map uses streamID as the key and stream name as the value. |
|
// This function is typically used to monitor the status of the Connector. |
|
func (c *Connector) GetSnapshot() map[string]string { |
|
result := make(map[string]string) |
|
|
|
c.streams.Range(func(key interface{}, val interface{}) bool { |
|
var ( |
|
streamID = key.(string) |
|
stream = val.(DataStream) |
|
) |
|
result[streamID] = stream.Name() |
|
return true |
|
}) |
|
|
|
return result |
|
} |
|
|
|
// Close cleans all stream of Connector and reset Connector to closed status. |
|
// The Connector can't be use after close. |
|
func (c *Connector) Close() { |
|
c.ctxCancel() |
|
|
|
c.streams.Range(func(key, value any) bool { |
|
c.streams.Delete(key) |
|
return true |
|
}) |
|
}
|
|
|