You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
183 lines
5.9 KiB
183 lines
5.9 KiB
package hpds_node |
|
|
|
import ( |
|
"context" |
|
"git.hpds.cc/Component/network" |
|
"git.hpds.cc/Component/network/frame" |
|
) |
|
|
|
const ( |
|
streamFunctionLogPrefix = "\033[31m[hpds:stream_function]\033[0m " |
|
) |
|
|
|
// StreamFunction defines serverless streaming functions. |
|
type StreamFunction interface { |
|
// SetObserveDataTags set the data tag list that will be observed |
|
// Deprecated: use hpds.WithObserveDataTags instead |
|
SetObserveDataTags(tag ...frame.Tag) |
|
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response |
|
SetHandler(fn network.AsyncHandler) error |
|
// SetErrorHandler set the error handler function when server error occurs |
|
SetErrorHandler(fn func(err error)) |
|
// SetPipeHandler set the pipe handler function |
|
SetPipeHandler(fn network.PipeHandler) error |
|
// Connect create a connection to the messageQueue |
|
Connect() error |
|
// Close will close the connection |
|
Close() error |
|
// Write Send a data to mq. |
|
Write(tag frame.Tag, carriage []byte) error |
|
} |
|
|
|
// NewStreamFunction create a stream function. |
|
func NewStreamFunction(name string, opts ...Option) StreamFunction { |
|
options := NewOptions(opts...) |
|
client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...) |
|
sf := &streamFunction{ |
|
name: name, |
|
emitterEndpoint: options.MqAddr, |
|
client: client, |
|
observeDataTags: make([]frame.Tag, 0), |
|
} |
|
|
|
return sf |
|
} |
|
|
|
var _ StreamFunction = &streamFunction{} |
|
|
|
// streamFunction implements StreamFunction interface. |
|
type streamFunction struct { |
|
name string |
|
emitterEndpoint string |
|
client *network.Client |
|
observeDataTags []frame.Tag // tag list that will be observed |
|
fn network.AsyncHandler // user's function which will be invoked when data arrived |
|
pfn network.PipeHandler |
|
pIn chan []byte |
|
pOut chan *frame.PayloadFrame |
|
} |
|
|
|
// SetObserveDataTags set the data tag list that will be observed. |
|
// Deprecated: use hpds.WithObserveDataTags instead |
|
func (s *streamFunction) SetObserveDataTags(tag ...frame.Tag) { |
|
s.client.SetObserveDataTags(tag...) |
|
s.client.Logger().Debugf("%s SetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags) |
|
} |
|
|
|
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response. |
|
func (s *streamFunction) SetHandler(fn network.AsyncHandler) error { |
|
s.fn = fn |
|
s.client.Logger().Debugf("%s SetHandler(%v)", streamFunctionLogPrefix, s.fn) |
|
return nil |
|
} |
|
|
|
func (s *streamFunction) SetPipeHandler(fn network.PipeHandler) error { |
|
s.pfn = fn |
|
s.client.Logger().Debugf("%s SetHandler(%v)", streamFunctionLogPrefix, s.fn) |
|
return nil |
|
} |
|
|
|
// Connect create a connection to the mq, when data arrived, the data will be passed to the |
|
// handler which set by SetHandler method. |
|
func (s *streamFunction) Connect() error { |
|
s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix) |
|
// notify underlying network operations, when data with tag we observed arrived, invoke the func |
|
s.client.SetDataFrameObserver(func(data *frame.DataFrame) { |
|
s.client.Logger().Debugf("%s receive DataFrame, tag=%# x, carriage length=%d", streamFunctionLogPrefix, data.Tag(), len(data.GetCarriage())) |
|
s.onDataFrame(data.GetCarriage(), data.GetMetaFrame()) |
|
}) |
|
|
|
if s.pfn != nil { |
|
s.pIn = make(chan []byte) |
|
s.pOut = make(chan *frame.PayloadFrame) |
|
|
|
// handle user's pipe function |
|
go func() { |
|
s.pfn(s.pIn, s.pOut) |
|
}() |
|
|
|
// send user's pipe function outputs to mq |
|
go func() { |
|
for { |
|
data := <-s.pOut |
|
if data != nil { |
|
s.client.Logger().Debugf("%s pipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage) |
|
frm := frame.NewDataFrame() |
|
// todo: frm.SetTransactionId |
|
frm.SetCarriage(data.Tag, data.Carriage) |
|
_ = s.client.WriteFrame(frm) |
|
} |
|
} |
|
}() |
|
} |
|
|
|
err := s.client.Connect(context.Background(), s.emitterEndpoint) |
|
if err != nil { |
|
s.client.Logger().Errorf("%s Connect() error: %s", streamFunctionLogPrefix, err) |
|
} |
|
return err |
|
} |
|
|
|
// Close will close the connection. |
|
func (s *streamFunction) Close() error { |
|
if s.pIn != nil { |
|
close(s.pIn) |
|
} |
|
|
|
if s.pOut != nil { |
|
close(s.pOut) |
|
} |
|
|
|
if s.client != nil { |
|
if err := s.client.Close(); err != nil { |
|
s.client.Logger().Errorf("%s Close(): %v", err) |
|
return err |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// when DataFrame we observed arrived, invoke the user's function |
|
func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { |
|
s.client.Logger().Infof("%s onDataFrame ->[%s]", streamFunctionLogPrefix, s.name) |
|
|
|
if s.fn != nil { |
|
go func() { |
|
s.client.Logger().Debugf("%s execute-start fn: data[%d]", streamFunctionLogPrefix, len(data)) //, frame.Shortly(data) |
|
// invoke serverless |
|
tag, resp := s.fn(data) |
|
s.client.Logger().Debugf("%s execute-done fn: tag=%#x, resp[%d]", streamFunctionLogPrefix, tag, len(resp)) |
|
// if resp is not nil, means the user's function has returned something, we should send it to the mq |
|
if len(resp) != 0 { |
|
s.client.Logger().Debugf("%s start WriteFrame(): tag=%#x, data[%d]", streamFunctionLogPrefix, tag, len(resp)) |
|
// build a DataFrame |
|
// TODO: seems we should implement a DeepCopy() of MetaFrame in the future |
|
frm := frame.NewDataFrame() |
|
// reuse transactionId |
|
frm.SetTransactionId(metaFrame.TransactionId()) |
|
// reuse sourceId |
|
frm.SetSourceId(metaFrame.SourceId()) |
|
frm.SetCarriage(tag, resp) |
|
_ = s.client.WriteFrame(frm) |
|
} |
|
}() |
|
} else if s.pfn != nil { |
|
s.client.Logger().Debugf("%s pipe fn receive: data[%d]", streamFunctionLogPrefix, len(data)) |
|
s.pIn <- data |
|
} else { |
|
s.client.Logger().Warnf("%s StreamFunction is nil", streamFunctionLogPrefix) |
|
} |
|
} |
|
|
|
// Send a DataFrame to mq. |
|
func (s *streamFunction) Write(tag frame.Tag, carriage []byte) error { |
|
fm := frame.NewDataFrame() |
|
fm.SetCarriage(tag, carriage) |
|
return s.client.WriteFrame(fm) |
|
} |
|
|
|
// SetErrorHandler set the error handler function when server error occurs |
|
func (s *streamFunction) SetErrorHandler(fn func(err error)) { |
|
s.client.SetErrorHandler(fn) |
|
}
|
|
|