|
|
|
@ -83,7 +83,7 @@ func (s *streamFunction) Connect() error {
|
|
|
|
|
s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix) |
|
|
|
|
// notify underlying network operations, when data with tag we observed arrived, invoke the func
|
|
|
|
|
s.client.SetDataFrameObserver(func(data *frame.DataFrame) { |
|
|
|
|
s.client.Logger().Debugf("%s receive DataFrame, tag=%# x, carriage=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage()) |
|
|
|
|
s.client.Logger().Debugf("%s receive DataFrame, tag=%# x, carriage length=%d", streamFunctionLogPrefix, data.Tag(), len(data.GetCarriage())) |
|
|
|
|
s.onDataFrame(data.GetCarriage(), data.GetMetaFrame()) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
@ -144,13 +144,13 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
|
|
|
|
|
|
|
|
|
|
if s.fn != nil { |
|
|
|
|
go func() { |
|
|
|
|
s.client.Logger().Debugf("%s execute-start fn: data[%d]=%# x", streamFunctionLogPrefix, len(data), frame.Shortly(data)) |
|
|
|
|
s.client.Logger().Debugf("%s execute-start fn: data[%d]", streamFunctionLogPrefix, len(data)) //, frame.Shortly(data)
|
|
|
|
|
// invoke serverless
|
|
|
|
|
tag, resp := s.fn(data) |
|
|
|
|
s.client.Logger().Debugf("%s execute-done fn: tag=%#x, resp[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) |
|
|
|
|
s.client.Logger().Debugf("%s execute-done fn: tag=%#x, resp[%d]", streamFunctionLogPrefix, tag, len(resp)) |
|
|
|
|
// if resp is not nil, means the user's function has returned something, we should send it to the mq
|
|
|
|
|
if len(resp) != 0 { |
|
|
|
|
s.client.Logger().Debugf("%s start WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) |
|
|
|
|
s.client.Logger().Debugf("%s start WriteFrame(): tag=%#x, data[%d]", streamFunctionLogPrefix, tag, len(resp)) |
|
|
|
|
// build a DataFrame
|
|
|
|
|
// TODO: seems we should implement a DeepCopy() of MetaFrame in the future
|
|
|
|
|
frm := frame.NewDataFrame() |
|
|
|
@ -163,7 +163,7 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
|
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
} else if s.pfn != nil { |
|
|
|
|
s.client.Logger().Debugf("%s pipe fn receive: data[%d]=%# x", streamFunctionLogPrefix, len(data), data) |
|
|
|
|
s.client.Logger().Debugf("%s pipe fn receive: data[%d]", streamFunctionLogPrefix, len(data)) |
|
|
|
|
s.pIn <- data |
|
|
|
|
} else { |
|
|
|
|
s.client.Logger().Warnf("%s StreamFunction is nil", streamFunctionLogPrefix) |
|
|
|
|