hpds_node/stream_func.go

187 lines
6.1 KiB
Go

package hpds_node
import (
"context"
"git.hpds.cc/Component/network"
"git.hpds.cc/Component/network/frame"
)
const (
streamFunctionLogPrefix = "\033[31m[hpds:stream_function]\033[0m "
)
// StreamFunction defines serverless streaming functions.
type StreamFunction interface {
// SetObserveDataTags set the data tag list that will be observed
// Deprecated: use hpds.WithObserveDataTags instead
SetObserveDataTags(tag ...frame.Tag)
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response
SetHandler(fn network.AsyncHandler) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
// SetPipeHandler set the pipe handler function
SetPipeHandler(fn network.PipeHandler) error
// Connect create a connection to the messageQueue
Connect() error
// Close will close the connection
Close() error
// Write Send a data to mq.
Write(tag frame.Tag, carriage []byte) error
}
// NewStreamFunction create a stream function.
func NewStreamFunction(name string, opts ...Option) StreamFunction {
options := NewOptions(opts...)
client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...)
sf := &streamFunction{
name: name,
emitterEndpoint: options.MqAddr,
client: client,
observeDataTags: make([]frame.Tag, 0),
}
return sf
}
var _ StreamFunction = &streamFunction{}
// streamFunction implements StreamFunction interface.
type streamFunction struct {
name string
emitterEndpoint string
client *network.Client
observeDataTags []frame.Tag // tag list that will be observed
fn network.AsyncHandler // user's function which will be invoked when data arrived
pfn network.PipeHandler
pIn chan []byte
pOut chan *frame.PayloadFrame
}
// SetObserveDataTags set the data tag list that will be observed.
// Deprecated: use hpds.WithObserveDataTags instead
func (s *streamFunction) SetObserveDataTags(tag ...frame.Tag) {
s.client.SetObserveDataTags(tag...)
//co := network.WithObserveDataTags(tag...)
//s.client.
//s.client.SetObserveDataTags(tag...)
s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags)
}
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response.
func (s *streamFunction) SetHandler(fn network.AsyncHandler) error {
s.fn = fn
s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn)
return nil
}
func (s *streamFunction) SetPipeHandler(fn network.PipeHandler) error {
s.pfn = fn
s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn)
return nil
}
// Connect create a connection to the mq, when data arrived, the data will be passed to the
// handler which set by SetHandler method.
func (s *streamFunction) Connect() error {
s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix)
// notify underlying network operations, when data with tag we observed arrived, invoke the func
s.client.SetDataFrameObserver(func(data *frame.DataFrame) {
s.client.Logger().Debugf("%sreceive DataFrame, tag=%# x, carriage=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage())
s.onDataFrame(data.GetCarriage(), data.GetMetaFrame())
})
if s.pfn != nil {
s.pIn = make(chan []byte)
s.pOut = make(chan *frame.PayloadFrame)
// handle user's pipe function
go func() {
s.pfn(s.pIn, s.pOut)
}()
// send user's pipe function outputs to mq
go func() {
for {
data := <-s.pOut
if data != nil {
s.client.Logger().Debugf("%spipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage)
frm := frame.NewDataFrame()
// todo: frm.SetTransactionId
frm.SetCarriage(data.Tag, data.Carriage)
_ = s.client.WriteFrame(frm)
}
}
}()
}
err := s.client.Connect(context.Background(), s.emitterEndpoint)
if err != nil {
s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err)
}
return err
}
// Close will close the connection.
func (s *streamFunction) Close() error {
if s.pIn != nil {
close(s.pIn)
}
if s.pOut != nil {
close(s.pOut)
}
if s.client != nil {
if err := s.client.Close(); err != nil {
s.client.Logger().Errorf("%sClose(): %v", err)
return err
}
}
return nil
}
// when DataFrame we observed arrived, invoke the user's function
func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
s.client.Logger().Infof("%sonDataFrame ->[%s]", streamFunctionLogPrefix, s.name)
if s.fn != nil {
go func() {
s.client.Logger().Debugf("%sexecute-start fn: data[%d]=%# x", streamFunctionLogPrefix, len(data), frame.Shortly(data))
// invoke serverless
tag, resp := s.fn(data)
s.client.Logger().Debugf("%sexecute-done fn: tag=%#x, resp[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp))
// if resp is not nil, means the user's function has returned something, we should send it to the mq
if len(resp) != 0 {
s.client.Logger().Debugf("%sstart WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp))
// build a DataFrame
// TODO: seems we should implement a DeepCopy() of MetaFrame in the future
frm := frame.NewDataFrame()
// reuse transactionId
frm.SetTransactionId(metaFrame.TransactionId())
// reuse sourceId
frm.SetSourceId(metaFrame.SourceId())
frm.SetCarriage(tag, resp)
_ = s.client.WriteFrame(frm)
}
}()
} else if s.pfn != nil {
s.client.Logger().Debugf("%spipe fn receive: data[%d]=%# x", streamFunctionLogPrefix, len(data), data)
s.pIn <- data
} else {
s.client.Logger().Warnf("%sStreamFunction is nil", streamFunctionLogPrefix)
}
}
// Send a DataFrame to mq.
func (s *streamFunction) Write(tag frame.Tag, carriage []byte) error {
fm := frame.NewDataFrame()
fm.SetCarriage(tag, carriage)
return s.client.WriteFrame(fm)
}
// SetErrorHandler set the error handler function when server error occurs
func (s *streamFunction) SetErrorHandler(fn func(err error)) {
s.client.SetErrorHandler(fn)
}