network/connector.go

136 lines
3.2 KiB
Go
Raw Permalink Normal View History

2022-10-11 17:36:09 +08:00
package network
import (
2023-04-05 16:15:59 +08:00
"context"
"errors"
"git.hpds.cc/Component/network/frame"
2022-10-11 17:36:09 +08:00
"git.hpds.cc/Component/network/log"
"sync"
)
2023-04-05 16:15:59 +08:00
// ErrConnectorClosed will be returned if the connector has been closed.
var ErrConnectorClosed = errors.New("hpdsMq: connector closed")
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
// The Connector class manages data streams and provides a centralized way to get and set streams.
type Connector struct {
// ctx and ctxCancel manage the lifescyle of Connector.
ctx context.Context
ctxCancel context.CancelFunc
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
streams sync.Map
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
// NewConnector returns an initial Connector.
func NewConnector(ctx context.Context) *Connector {
ctx, ctxCancel := context.WithCancel(ctx)
return &Connector{
ctx: ctx,
ctxCancel: ctxCancel,
}
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
// Add adds DataStream to Connector,
// If the streamID is the same twice, the new stream will replace the old stream.
func (c *Connector) Add(streamId string, stream DataStream) error {
select {
case <-c.ctx.Done():
return ErrConnectorClosed
default:
}
c.streams.Store(streamId, stream)
log.Debugf("Connector add stream, stream_id: %s", streamId)
return nil
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
// Remove removes the DataStream with the specified streamID.
// If the Connector does not have a stream with the given streamID, no action is taken.
func (c *Connector) Remove(streamId string) error {
select {
case <-c.ctx.Done():
return ErrConnectorClosed
default:
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
c.streams.Delete(streamId)
log.Debugf("Connector remove stream, stream_id: %s", streamId)
2022-10-11 17:36:09 +08:00
return nil
}
2023-04-05 16:15:59 +08:00
// Get retrieves the DataStream with the specified streamID.
// If the Connector does not have a stream with the given streamID, return nil and false.
func (c *Connector) Get(streamId string) (DataStream, bool, error) {
select {
case <-c.ctx.Done():
return nil, false, ErrConnectorClosed
default:
}
v, ok := c.streams.Load(streamId)
if !ok {
return nil, false, nil
}
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
stream := v.(DataStream)
return stream, true, nil
}
// GetSourceConns gets the streams with the specified source observe tag.
func (c *Connector) GetSourceConns(sourceId string, tag frame.Tag) ([]DataStream, error) {
select {
case <-c.ctx.Done():
return []DataStream{}, ErrConnectorClosed
default:
}
streams := make([]DataStream, 0)
c.streams.Range(func(key interface{}, val interface{}) bool {
stream := val.(DataStream)
for _, v := range stream.ObserveDataTags() {
if v == tag &&
stream.StreamType() == StreamTypeSource &&
stream.ID() == sourceId {
streams = append(streams, stream)
2022-10-11 17:36:09 +08:00
}
}
return true
})
2023-04-05 16:15:59 +08:00
return streams, nil
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
// GetSnapshot returnsa snapshot of all streams.
// The resulting map uses streamID as the key and stream name as the value.
// This function is typically used to monitor the status of the Connector.
func (c *Connector) GetSnapshot() map[string]string {
2022-10-11 17:36:09 +08:00
result := make(map[string]string)
2023-04-05 16:15:59 +08:00
c.streams.Range(func(key interface{}, val interface{}) bool {
var (
streamID = key.(string)
stream = val.(DataStream)
)
result[streamID] = stream.Name()
2022-10-11 17:36:09 +08:00
return true
})
2023-04-05 16:15:59 +08:00
2022-10-11 17:36:09 +08:00
return result
}
2023-04-05 16:15:59 +08:00
// Close cleans all stream of Connector and reset Connector to closed status.
// The Connector can't be use after close.
func (c *Connector) Close() {
c.ctxCancel()
c.streams.Range(func(key, value any) bool {
c.streams.Delete(key)
return true
})
2022-10-11 17:36:09 +08:00
}