package hpds_node import ( "context" "git.hpds.cc/Component/network" "git.hpds.cc/Component/network/frame" ) const ( streamFunctionLogPrefix = "\033[31m[hpds:stream_function]\033[0m " ) // StreamFunction defines serverless streaming functions. type StreamFunction interface { // SetObserveDataTags set the data tag list that will be observed // Deprecated: use hpds.WithObserveDataTags instead SetObserveDataTags(tag ...frame.Tag) // SetHandler set the handler function, which accept the raw bytes data and return the tag & response SetHandler(fn network.AsyncHandler) error // SetErrorHandler set the error handler function when server error occurs SetErrorHandler(fn func(err error)) // SetPipeHandler set the pipe handler function SetPipeHandler(fn network.PipeHandler) error // Connect create a connection to the messageQueue Connect() error // Close will close the connection Close() error // Write Send a data to mq. Write(tag frame.Tag, carriage []byte) error } // NewStreamFunction create a stream function. func NewStreamFunction(name string, opts ...Option) StreamFunction { options := NewOptions(opts...) client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...) sf := &streamFunction{ name: name, emitterEndpoint: options.MqAddr, client: client, observeDataTags: make([]frame.Tag, 0), } return sf } var _ StreamFunction = &streamFunction{} // streamFunction implements StreamFunction interface. type streamFunction struct { name string emitterEndpoint string client *network.Client observeDataTags []frame.Tag // tag list that will be observed fn network.AsyncHandler // user's function which will be invoked when data arrived pfn network.PipeHandler pIn chan []byte pOut chan *frame.PayloadFrame } // SetObserveDataTags set the data tag list that will be observed. // Deprecated: use hpds.WithObserveDataTags instead func (s *streamFunction) SetObserveDataTags(tag ...frame.Tag) { s.client.SetObserveDataTags(tag...) s.client.Logger().Debugf("%s SetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags) } // SetHandler set the handler function, which accept the raw bytes data and return the tag & response. func (s *streamFunction) SetHandler(fn network.AsyncHandler) error { s.fn = fn s.client.Logger().Debugf("%s SetHandler(%v)", streamFunctionLogPrefix, s.fn) return nil } func (s *streamFunction) SetPipeHandler(fn network.PipeHandler) error { s.pfn = fn s.client.Logger().Debugf("%s SetHandler(%v)", streamFunctionLogPrefix, s.fn) return nil } // Connect create a connection to the mq, when data arrived, the data will be passed to the // handler which set by SetHandler method. func (s *streamFunction) Connect() error { s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix) // notify underlying network operations, when data with tag we observed arrived, invoke the func s.client.SetDataFrameObserver(func(data *frame.DataFrame) { s.client.Logger().Debugf("%s receive DataFrame, tag=%# x, carriage length=%d", streamFunctionLogPrefix, data.Tag(), len(data.GetCarriage())) s.onDataFrame(data.GetCarriage(), data.GetMetaFrame()) }) if s.pfn != nil { s.pIn = make(chan []byte) s.pOut = make(chan *frame.PayloadFrame) // handle user's pipe function go func() { s.pfn(s.pIn, s.pOut) }() // send user's pipe function outputs to mq go func() { for { data := <-s.pOut if data != nil { s.client.Logger().Debugf("%s pipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage) frm := frame.NewDataFrame() // todo: frm.SetTransactionId frm.SetCarriage(data.Tag, data.Carriage) _ = s.client.WriteFrame(frm) } } }() } err := s.client.Connect(context.Background(), s.emitterEndpoint) if err != nil { s.client.Logger().Errorf("%s Connect() error: %s", streamFunctionLogPrefix, err) } return err } // Close will close the connection. func (s *streamFunction) Close() error { if s.pIn != nil { close(s.pIn) } if s.pOut != nil { close(s.pOut) } if s.client != nil { if err := s.client.Close(); err != nil { s.client.Logger().Errorf("%s Close(): %v", err) return err } } return nil } // when DataFrame we observed arrived, invoke the user's function func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { s.client.Logger().Infof("%s onDataFrame ->[%s]", streamFunctionLogPrefix, s.name) if s.fn != nil { go func() { s.client.Logger().Debugf("%s execute-start fn: data[%d]", streamFunctionLogPrefix, len(data)) //, frame.Shortly(data) // invoke serverless tag, resp := s.fn(data) s.client.Logger().Debugf("%s execute-done fn: tag=%#x, resp[%d]", streamFunctionLogPrefix, tag, len(resp)) // if resp is not nil, means the user's function has returned something, we should send it to the mq if len(resp) != 0 { s.client.Logger().Debugf("%s start WriteFrame(): tag=%#x, data[%d]", streamFunctionLogPrefix, tag, len(resp)) // build a DataFrame // TODO: seems we should implement a DeepCopy() of MetaFrame in the future frm := frame.NewDataFrame() // reuse transactionId frm.SetTransactionId(metaFrame.TransactionId()) // reuse sourceId frm.SetSourceId(metaFrame.SourceId()) frm.SetCarriage(tag, resp) _ = s.client.WriteFrame(frm) } }() } else if s.pfn != nil { s.client.Logger().Debugf("%s pipe fn receive: data[%d]", streamFunctionLogPrefix, len(data)) s.pIn <- data } else { s.client.Logger().Warnf("%s StreamFunction is nil", streamFunctionLogPrefix) } } // Send a DataFrame to mq. func (s *streamFunction) Write(tag frame.Tag, carriage []byte) error { fm := frame.NewDataFrame() fm.SetCarriage(tag, carriage) return s.client.WriteFrame(fm) } // SetErrorHandler set the error handler function when server error occurs func (s *streamFunction) SetErrorHandler(fn func(err error)) { s.client.SetErrorHandler(fn) }