You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

327 lines
8.0 KiB

package network
import (
"context"
"errors"
"git.hpds.cc/Component/network/id"
"time"
"git.hpds.cc/Component/network/frame"
"git.hpds.cc/Component/network/log"
)
// ClientOption client options
type ClientOption func(*clientOptions)
// Client is the abstraction of a HPDS-Client. a HPDS-Client can be
// Protocol Gateway, Message Queue or StreamFunction.
type Client struct {
name string // name of the client
clientId string // id of the client
streamType StreamType // type of the dataStream
processor func(*frame.DataFrame) // function to invoke when data arrived
receiver func(*frame.BackFlowFrame) // function to invoke when data is processed
errorfn func(error) // function to invoke when error occured
opts *clientOptions
// ctx and ctxCancel manage the lifecycle of client.
ctx context.Context
ctxCancel context.CancelFunc
logger log.Logger
writeFrameChan chan frame.Frame
shutdownChan chan error
}
// NewClient creates a new HPDS-Client.
func NewClient(appName string, connType ClientType, opts ...ClientOption) *Client {
option := defaultClientOption()
for _, o := range opts {
o(option)
}
clientId := id.New()
if option.credential != nil {
log.Infof("%s use credential, credential_name: %s;", ClientLogPrefix, option.credential.Name())
}
ctx, ctxCancel := context.WithCancel(context.Background())
cli := new(Client)
if cli.logger == nil {
cli.logger = log.Default()
}
cli.name = appName
cli.clientId = clientId
cli.streamType = connType
cli.opts = option
cli.errorfn = func(err error) {
cli.logger.Errorf("%s client err, %s", ClientLogPrefix, err)
}
cli.writeFrameChan = make(chan frame.Frame)
cli.shutdownChan = make(chan error, 1)
cli.ctx = ctx
cli.ctxCancel = ctxCancel
return cli
}
// Connect connects to HPDS-MessageQueue.
func (c *Client) Connect(ctx context.Context, addr string) error {
controlStream, dataStream, err := c.openStream(ctx, addr)
if err != nil {
c.logger.Errorf("%s connect error, %s", ClientLogPrefix, err)
return err
}
go c.runBackground(ctx, addr, controlStream, dataStream)
return nil
}
func (c *Client) runBackground(ctx context.Context, addr string, controlStream ClientControlStream, dataStream DataStream) {
reconnection := make(chan struct{})
go c.processStream(controlStream, dataStream, reconnection)
for {
select {
case <-c.ctx.Done():
c.cleanStream(controlStream, nil)
return
case <-ctx.Done():
c.cleanStream(controlStream, ctx.Err())
return
case <-reconnection:
RECONNECT:
var err error
controlStream, dataStream, err = c.openStream(ctx, addr)
if err != nil {
c.logger.Errorf("%s client reconnect error, %s", ClientLogPrefix, err)
time.Sleep(time.Second)
goto RECONNECT
}
go c.processStream(controlStream, dataStream, reconnection)
}
}
}
// WriteFrame write frame to client.
func (c *Client) WriteFrame(f frame.Frame) error {
c.writeFrameChan <- f
return nil
}
func (c *Client) cleanStream(controlStream ClientControlStream, err error) {
errString := ""
if err != nil {
errString = err.Error()
c.logger.Errorf("%s client cancel with error, %s", ClientLogPrefix, err)
}
// controlStream is nil represents that client is not connected.
if controlStream == nil {
return
}
_ = controlStream.CloseWithError(0, errString)
}
// Close the client.
func (c *Client) Close() error {
// break runBackgroud() for-loop.
c.ctxCancel()
// non-blocking to return Wait().
select {
case c.shutdownChan <- nil:
default:
}
return nil
}
func (c *Client) openControlStream(ctx context.Context, addr string) (ClientControlStream, error) {
controlStream, err := OpenClientControlStream(ctx, addr, c.opts.tlsConfig, c.opts.quicConfig)
if err != nil {
return nil, err
}
if err := controlStream.Authenticate(c.opts.credential); err != nil {
return nil, err
}
return controlStream, nil
}
func (c *Client) openStream(ctx context.Context, addr string) (ClientControlStream, DataStream, error) {
controlStream, err := c.openControlStream(ctx, addr)
if err != nil {
return nil, nil, err
}
dataStream, err := c.openDataStream(ctx, controlStream)
if err != nil {
return nil, nil, err
}
return controlStream, dataStream, nil
}
func (c *Client) openDataStream(ctx context.Context, controlStream ClientControlStream) (DataStream, error) {
handshakeFrame := frame.NewHandshakeFrame(
c.name,
c.clientId,
byte(c.streamType),
c.opts.observeDataTags,
[]byte{}, // The stream does not require metadata currently.
)
dataStream, err := controlStream.OpenStream(ctx, handshakeFrame)
if err != nil {
return nil, err
}
return dataStream, nil
}
func (c *Client) processStream(controlStream ClientControlStream, dataStream DataStream, reconnection chan<- struct{}) {
defer func() {
_ = dataStream.Close()
}()
var (
controlStreamErrChan = c.receivingStreamClose(controlStream, dataStream)
readFrameChan = c.readFrame(dataStream)
)
for {
select {
case err := <-controlStreamErrChan:
c.shutdownWithError(err)
case result := <-readFrameChan:
if err := result.err; err != nil {
c.errorfn(err)
reconnection <- struct{}{}
return
}
c.handleFrame(result.frame)
case f := <-c.writeFrameChan:
err := dataStream.WriteFrame(f)
// restore DataFrame.
if d, ok := f.(*frame.DataFrame); ok {
d.Clean()
}
if err != nil {
c.errorfn(err)
reconnection <- struct{}{}
return
}
}
}
}
// Wait waits client error returning.
func (c *Client) Wait() error {
err := <-c.shutdownChan
return err
}
func (c *Client) shutdownWithError(err error) {
// non-blocking shutdown client.
select {
case c.shutdownChan <- err:
default:
}
}
type readResult struct {
frame frame.Frame
err error
}
func (c *Client) readFrame(dataStream DataStream) chan readResult {
readChan := make(chan readResult)
go func() {
for {
f, err := dataStream.ReadFrame()
readChan <- readResult{f, err}
if err != nil {
return
}
}
}()
return readChan
}
func (c *Client) handleFrame(f frame.Frame) {
switch ff := f.(type) {
case *frame.DataFrame:
if c.processor == nil {
c.logger.Warnf("%s client processor has not been set", ClientLogPrefix)
} else {
c.processor(ff)
}
case *frame.BackFlowFrame:
if c.receiver == nil {
c.logger.Warnf("%s client receiver has not been set", ClientLogPrefix)
} else {
c.receiver(ff)
}
default:
c.logger.Warnf("%s client data stream receive unexcepted frame, frame_type: %v", ClientLogPrefix, f)
}
}
func (c *Client) receivingStreamClose(controlStream ControlStream, dataStream DataStream) chan error {
closeStreamChan := make(chan error)
go func() {
for {
streamID, reason, err := controlStream.ReceiveStreamClose()
if err != nil {
closeStreamChan <- err
return
}
if streamID == c.clientId {
c.ctxCancel()
_ = dataStream.Close()
closeStreamChan <- errors.New(reason)
_ = controlStream.CloseWithError(0, reason)
return
}
}
}()
return closeStreamChan
}
// SetDataFrameObserver sets the data frame handler.
func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) {
c.processor = fn
c.logger.Debugf("%s SetDataFrameObserver", ClientLogPrefix)
}
// SetBackFlowFrameObserver sets the backflow frame handler.
func (c *Client) SetBackFlowFrameObserver(fn func(*frame.BackFlowFrame)) {
c.receiver = fn
c.logger.Debugf("%s SetBackFlowFrameObserver", ClientLogPrefix)
}
// SetObserveDataTags set the data tag list that will be observed.
// Deprecated: use network.WithObserveDataTags instead
func (c *Client) SetObserveDataTags(tag ...frame.Tag) {
c.opts.observeDataTags = append(c.opts.observeDataTags, tag...)
}
// SetErrorHandler set error handler
func (c *Client) SetErrorHandler(fn func(err error)) {
c.errorfn = fn
}
// ClientId return the client ID
func (c *Client) ClientId() string {
return c.clientId
}
// Logger get client's logger instance, you can customize this using `hpds.WithLogger`
func (c *Client) Logger() log.Logger {
return c.logger
}