|
|
|
@ -61,22 +61,19 @@ type streamFunction struct {
|
|
|
|
|
// Deprecated: use hpds.WithObserveDataTags instead
|
|
|
|
|
func (s *streamFunction) SetObserveDataTags(tag ...frame.Tag) { |
|
|
|
|
s.client.SetObserveDataTags(tag...) |
|
|
|
|
//co := network.WithObserveDataTags(tag...)
|
|
|
|
|
//s.client.
|
|
|
|
|
//s.client.SetObserveDataTags(tag...)
|
|
|
|
|
s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags) |
|
|
|
|
s.client.Logger().Debugf("%s SetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response.
|
|
|
|
|
func (s *streamFunction) SetHandler(fn network.AsyncHandler) error { |
|
|
|
|
s.fn = fn |
|
|
|
|
s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn) |
|
|
|
|
s.client.Logger().Debugf("%s SetHandler(%v)", streamFunctionLogPrefix, s.fn) |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *streamFunction) SetPipeHandler(fn network.PipeHandler) error { |
|
|
|
|
s.pfn = fn |
|
|
|
|
s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn) |
|
|
|
|
s.client.Logger().Debugf("%s SetHandler(%v)", streamFunctionLogPrefix, s.fn) |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -86,7 +83,7 @@ func (s *streamFunction) Connect() error {
|
|
|
|
|
s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix) |
|
|
|
|
// notify underlying network operations, when data with tag we observed arrived, invoke the func
|
|
|
|
|
s.client.SetDataFrameObserver(func(data *frame.DataFrame) { |
|
|
|
|
s.client.Logger().Debugf("%sreceive DataFrame, tag=%# x, carriage=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage()) |
|
|
|
|
s.client.Logger().Debugf("%s receive DataFrame, tag=%# x, carriage=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage()) |
|
|
|
|
s.onDataFrame(data.GetCarriage(), data.GetMetaFrame()) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
@ -104,7 +101,7 @@ func (s *streamFunction) Connect() error {
|
|
|
|
|
for { |
|
|
|
|
data := <-s.pOut |
|
|
|
|
if data != nil { |
|
|
|
|
s.client.Logger().Debugf("%spipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage) |
|
|
|
|
s.client.Logger().Debugf("%s pipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage) |
|
|
|
|
frm := frame.NewDataFrame() |
|
|
|
|
// todo: frm.SetTransactionId
|
|
|
|
|
frm.SetCarriage(data.Tag, data.Carriage) |
|
|
|
@ -116,7 +113,7 @@ func (s *streamFunction) Connect() error {
|
|
|
|
|
|
|
|
|
|
err := s.client.Connect(context.Background(), s.emitterEndpoint) |
|
|
|
|
if err != nil { |
|
|
|
|
s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err) |
|
|
|
|
s.client.Logger().Errorf("%s Connect() error: %s", streamFunctionLogPrefix, err) |
|
|
|
|
} |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -133,7 +130,7 @@ func (s *streamFunction) Close() error {
|
|
|
|
|
|
|
|
|
|
if s.client != nil { |
|
|
|
|
if err := s.client.Close(); err != nil { |
|
|
|
|
s.client.Logger().Errorf("%sClose(): %v", err) |
|
|
|
|
s.client.Logger().Errorf("%s Close(): %v", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -143,17 +140,17 @@ func (s *streamFunction) Close() error {
|
|
|
|
|
|
|
|
|
|
// when DataFrame we observed arrived, invoke the user's function
|
|
|
|
|
func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { |
|
|
|
|
s.client.Logger().Infof("%sonDataFrame ->[%s]", streamFunctionLogPrefix, s.name) |
|
|
|
|
s.client.Logger().Infof("%s onDataFrame ->[%s]", streamFunctionLogPrefix, s.name) |
|
|
|
|
|
|
|
|
|
if s.fn != nil { |
|
|
|
|
go func() { |
|
|
|
|
s.client.Logger().Debugf("%sexecute-start fn: data[%d]=%# x", streamFunctionLogPrefix, len(data), frame.Shortly(data)) |
|
|
|
|
s.client.Logger().Debugf("%s execute-start fn: data[%d]=%# x", streamFunctionLogPrefix, len(data), frame.Shortly(data)) |
|
|
|
|
// invoke serverless
|
|
|
|
|
tag, resp := s.fn(data) |
|
|
|
|
s.client.Logger().Debugf("%sexecute-done fn: tag=%#x, resp[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) |
|
|
|
|
s.client.Logger().Debugf("%s execute-done fn: tag=%#x, resp[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) |
|
|
|
|
// if resp is not nil, means the user's function has returned something, we should send it to the mq
|
|
|
|
|
if len(resp) != 0 { |
|
|
|
|
s.client.Logger().Debugf("%sstart WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) |
|
|
|
|
s.client.Logger().Debugf("%s start WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) |
|
|
|
|
// build a DataFrame
|
|
|
|
|
// TODO: seems we should implement a DeepCopy() of MetaFrame in the future
|
|
|
|
|
frm := frame.NewDataFrame() |
|
|
|
@ -166,10 +163,10 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
|
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
} else if s.pfn != nil { |
|
|
|
|
s.client.Logger().Debugf("%spipe fn receive: data[%d]=%# x", streamFunctionLogPrefix, len(data), data) |
|
|
|
|
s.client.Logger().Debugf("%s pipe fn receive: data[%d]=%# x", streamFunctionLogPrefix, len(data), data) |
|
|
|
|
s.pIn <- data |
|
|
|
|
} else { |
|
|
|
|
s.client.Logger().Warnf("%sStreamFunction is nil", streamFunctionLogPrefix) |
|
|
|
|
s.client.Logger().Warnf("%s StreamFunction is nil", streamFunctionLogPrefix) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|