协议网关、消息队列和流处理函数
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
3.5 KiB

package hpds_node
import (
"context"
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame"
"git.hpds.cc/Component/network/log"
)
const (
apLogPrefix = "\033[32m[hpds:access_point]\033[0m "
)
// AccessPoint is responsible for sending data to hpds.
type AccessPoint interface {
// Close will close the connection to HPDS-Mq.
Close() error
// Connect to HPDS-Mq.
Connect() error
// SetDataTag will set the tag of data when invoking Write().
SetDataTag(tag frame.Tag)
// Write the data to downstream.
Write(p []byte) (n int, err error)
// WriteWithTag will write data with specified tag, default transactionId is epoch time.
WriteWithTag(tag frame.Tag, data []byte) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
// SetReceiveHandler [Experimental] set to observe handler function
SetReceiveHandler(fn func(tag frame.Tag, data []byte))
// Broadcast the data to all downstream
Broadcast(data []byte) error
}
// hpds-AccessPoint
type accessPoint struct {
name string
mqEndpoint string
client *network.Client
tag frame.Tag
fn func(frame.Tag, []byte)
}
var _ AccessPoint = &accessPoint{}
// NewAccessPoint create a hpds-AccessPoint
func NewAccessPoint(name string, opts ...Option) AccessPoint {
options := NewOptions(opts...)
client := network.NewClient(name, network.ClientTypeSource, options.ClientOptions...)
return &accessPoint{
name: name,
mqEndpoint: options.MqAddr,
client: client,
}
}
// Write the data to downstream.
func (s *accessPoint) Write(data []byte) (int, error) {
err := s.WriteWithTag(s.tag, data)
if err != nil {
return 0, err
}
return len(data), nil
}
// SetDataTag will set the tag of data when invoking Write().
func (s *accessPoint) SetDataTag(tag frame.Tag) {
s.tag = tag
}
// Close will close the connection to MessageQueue.
func (s *accessPoint) Close() error {
if err := s.client.Close(); err != nil {
log.Errorf("%s Close(): %v", apLogPrefix, err)
return err
}
log.Debugf("%s is closed", apLogPrefix)
return nil
}
// Connect to YoMo-MessageQueue.
func (s *accessPoint) Connect() error {
// set backFlowFrame handler
s.client.SetBackFlowFrameObserver(func(frm *frame.BackFlowFrame) {
if s.fn != nil {
s.fn(frm.GetDataTag(), frm.GetCarriage())
}
})
err := s.client.Connect(context.Background(), s.mqEndpoint)
if err != nil {
log.Errorf("%s Connect() error: %s", apLogPrefix, err)
}
return err
}
// WriteWithTag will write data with specified tag, default transactionID is epoch time.
func (s *accessPoint) WriteWithTag(tag frame.Tag, data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(tag, data)
f.SetSourceId(s.client.ClientId())
log.Debugf("%s WriteWithTag: tid=%s, source_id=%s, data[%d]",
apLogPrefix, f.TransactionId(), f.SourceId(), len(data))
return s.client.WriteFrame(f)
}
// SetErrorHandler set the error handler function when server error occurs
func (s *accessPoint) SetErrorHandler(fn func(err error)) {
s.client.SetErrorHandler(fn)
}
// SetReceiveHandler [Experimental] set to observe handler function
func (s *accessPoint) SetReceiveHandler(fn func(frame.Tag, []byte)) {
s.fn = fn
log.Debugf("%s SetReceiveHandler(%v)", apLogPrefix, s.fn)
}
// Broadcast Write the data to all downstream
func (s *accessPoint) Broadcast(data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(s.tag, data)
f.SetSourceId(s.client.ClientId())
f.SetBroadcast(true)
log.Debugf("Broadcast", "data_frame", f.String())
return s.client.WriteFrame(f)
}