package hpds_node import ( "context" "git.hpds.cc/Component/network" "git.hpds.cc/Component/network/frame" ) const ( streamFunctionLogPrefix = "\033[31m[hpds:stream_function]\033[0m " ) // StreamFunction defines serverless streaming functions. type StreamFunction interface { // SetObserveDataTags set the data tag list that will be observed // Deprecated: use hpds.WithObserveDataTags instead SetObserveDataTags(tag ...byte) // SetHandler set the handler function, which accept the raw bytes data and return the tag & response SetHandler(fn network.AsyncHandler) error // SetErrorHandler set the error handler function when server error occurs SetErrorHandler(fn func(err error)) // SetPipeHandler set the pipe handler function SetPipeHandler(fn network.PipeHandler) error // Connect create a connection to the messageQueue Connect() error // Close will close the connection Close() error // Write Send a data to mq. Write(tag byte, carriage []byte) error } // NewStreamFunction create a stream function. func NewStreamFunction(name string, opts ...Option) StreamFunction { options := NewOptions(opts...) client := network.NewClient(name, network.ClientTypeStreamFunction, options.ClientOptions...) sf := &streamFunction{ name: name, zipperEndpoint: options.MqAddr, client: client, observeDataTags: make([]byte, 0), } return sf } var _ StreamFunction = &streamFunction{} // streamFunction implements StreamFunction interface. type streamFunction struct { name string zipperEndpoint string client *network.Client observeDataTags []byte // tag list that will be observed fn network.AsyncHandler // user's function which will be invoked when data arrived pfn network.PipeHandler pIn chan []byte pOut chan *frame.PayloadFrame } // SetObserveDataTags set the data tag list that will be observed. // Deprecated: use hpds.WithObserveDataTags instead func (s *streamFunction) SetObserveDataTags(tag ...byte) { s.client.SetObserveDataTags(tag...) s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags) } // SetHandler set the handler function, which accept the raw bytes data and return the tag & response. func (s *streamFunction) SetHandler(fn network.AsyncHandler) error { s.fn = fn s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn) return nil } func (s *streamFunction) SetPipeHandler(fn network.PipeHandler) error { s.pfn = fn s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn) return nil } // Connect create a connection to the mq, when data arrived, the data will be passed to the // handler which set by SetHandler method. func (s *streamFunction) Connect() error { s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix) // notify underlying network operations, when data with tag we observed arrived, invoke the func s.client.SetDataFrameObserver(func(data *frame.DataFrame) { s.client.Logger().Debugf("%sreceive DataFrame, tag=%# x, carriage=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage()) s.onDataFrame(data.GetCarriage(), data.GetMetaFrame()) }) if s.pfn != nil { s.pIn = make(chan []byte) s.pOut = make(chan *frame.PayloadFrame) // handle user's pipe function go func() { s.pfn(s.pIn, s.pOut) }() // send user's pipe function outputs to mq go func() { for { data := <-s.pOut if data != nil { s.client.Logger().Debugf("%spipe fn send: tag=%#x, data=%# x", streamFunctionLogPrefix, data.Tag, data.Carriage) frame := frame.NewDataFrame() // todo: frame.SetTransactionId frame.SetCarriage(data.Tag, data.Carriage) s.client.WriteFrame(frame) } } }() } err := s.client.Connect(context.Background(), s.zipperEndpoint) if err != nil { s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err) } return err } // Close will close the connection. func (s *streamFunction) Close() error { if s.pIn != nil { close(s.pIn) } if s.pOut != nil { close(s.pOut) } if s.client != nil { if err := s.client.Close(); err != nil { s.client.Logger().Errorf("%sClose(): %v", err) return err } } return nil } // when DataFrame we observed arrived, invoke the user's function func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { s.client.Logger().Infof("%sonDataFrame ->[%s]", streamFunctionLogPrefix, s.name) if s.fn != nil { go func() { s.client.Logger().Debugf("%sexecute-start fn: data[%d]=%# x", streamFunctionLogPrefix, len(data), frame.Shortly(data)) // invoke serverless tag, resp := s.fn(data) s.client.Logger().Debugf("%sexecute-done fn: tag=%#x, resp[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) // if resp is not nil, means the user's function has returned something, we should send it to the mq if len(resp) != 0 { s.client.Logger().Debugf("%sstart WriteFrame(): tag=%#x, data[%d]=%# x", streamFunctionLogPrefix, tag, len(resp), frame.Shortly(resp)) // build a DataFrame // TODO: seems we should implement a DeepCopy() of MetaFrame in the future frame := frame.NewDataFrame() // reuse transactionId frame.SetTransactionId(metaFrame.TransactionId()) // reuse sourceId frame.SetSourceId(metaFrame.SourceId()) frame.SetCarriage(tag, resp) s.client.WriteFrame(frame) } }() } else if s.pfn != nil { s.client.Logger().Debugf("%spipe fn receive: data[%d]=%# x", streamFunctionLogPrefix, len(data), data) s.pIn <- data } else { s.client.Logger().Warnf("%sStreamFunction is nil", streamFunctionLogPrefix) } } // Send a DataFrame to mq. func (s *streamFunction) Write(tag byte, carriage []byte) error { fm := frame.NewDataFrame() fm.SetCarriage(tag, carriage) return s.client.WriteFrame(fm) } // SetErrorHandler set the error handler function when server error occurs func (s *streamFunction) SetErrorHandler(fn func(err error)) { s.client.SetErrorHandler(fn) }