You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
327 lines
8.0 KiB
327 lines
8.0 KiB
package network |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"git.hpds.cc/Component/network/id" |
|
"time" |
|
|
|
"git.hpds.cc/Component/network/frame" |
|
"git.hpds.cc/Component/network/log" |
|
) |
|
|
|
// ClientOption client options |
|
type ClientOption func(*clientOptions) |
|
|
|
// Client is the abstraction of a HPDS-Client. a HPDS-Client can be |
|
// Protocol Gateway, Message Queue or StreamFunction. |
|
type Client struct { |
|
name string // name of the client |
|
clientId string // id of the client |
|
streamType StreamType // type of the dataStream |
|
processor func(*frame.DataFrame) // function to invoke when data arrived |
|
receiver func(*frame.BackFlowFrame) // function to invoke when data is processed |
|
errorfn func(error) // function to invoke when error occured |
|
opts *clientOptions |
|
|
|
// ctx and ctxCancel manage the lifecycle of client. |
|
ctx context.Context |
|
ctxCancel context.CancelFunc |
|
logger log.Logger |
|
writeFrameChan chan frame.Frame |
|
shutdownChan chan error |
|
} |
|
|
|
// NewClient creates a new HPDS-Client. |
|
func NewClient(appName string, connType ClientType, opts ...ClientOption) *Client { |
|
option := defaultClientOption() |
|
|
|
for _, o := range opts { |
|
o(option) |
|
} |
|
clientId := id.New() |
|
|
|
if option.credential != nil { |
|
log.Infof("%s use credential, credential_name: %s;", ClientLogPrefix, option.credential.Name()) |
|
} |
|
|
|
ctx, ctxCancel := context.WithCancel(context.Background()) |
|
|
|
cli := new(Client) |
|
if cli.logger == nil { |
|
cli.logger = log.Default() |
|
} |
|
cli.name = appName |
|
cli.clientId = clientId |
|
cli.streamType = connType |
|
cli.opts = option |
|
cli.errorfn = func(err error) { |
|
cli.logger.Errorf("%s client err, %s", ClientLogPrefix, err) |
|
} |
|
cli.writeFrameChan = make(chan frame.Frame) |
|
cli.shutdownChan = make(chan error, 1) |
|
cli.ctx = ctx |
|
cli.ctxCancel = ctxCancel |
|
return cli |
|
} |
|
|
|
// Connect connects to HPDS-MessageQueue. |
|
func (c *Client) Connect(ctx context.Context, addr string) error { |
|
controlStream, dataStream, err := c.openStream(ctx, addr) |
|
if err != nil { |
|
c.logger.Errorf("%s connect error, %s", ClientLogPrefix, err) |
|
return err |
|
} |
|
|
|
go c.runBackground(ctx, addr, controlStream, dataStream) |
|
|
|
return nil |
|
} |
|
|
|
func (c *Client) runBackground(ctx context.Context, addr string, controlStream ClientControlStream, dataStream DataStream) { |
|
reconnection := make(chan struct{}) |
|
|
|
go c.processStream(controlStream, dataStream, reconnection) |
|
|
|
for { |
|
select { |
|
case <-c.ctx.Done(): |
|
c.cleanStream(controlStream, nil) |
|
return |
|
case <-ctx.Done(): |
|
c.cleanStream(controlStream, ctx.Err()) |
|
return |
|
case <-reconnection: |
|
RECONNECT: |
|
var err error |
|
controlStream, dataStream, err = c.openStream(ctx, addr) |
|
if err != nil { |
|
c.logger.Errorf("%s client reconnect error, %s", ClientLogPrefix, err) |
|
time.Sleep(time.Second) |
|
goto RECONNECT |
|
} |
|
go c.processStream(controlStream, dataStream, reconnection) |
|
} |
|
} |
|
} |
|
|
|
// WriteFrame write frame to client. |
|
func (c *Client) WriteFrame(f frame.Frame) error { |
|
c.writeFrameChan <- f |
|
return nil |
|
} |
|
|
|
func (c *Client) cleanStream(controlStream ClientControlStream, err error) { |
|
errString := "" |
|
if err != nil { |
|
errString = err.Error() |
|
c.logger.Errorf("%s client cancel with error, %s", ClientLogPrefix, err) |
|
} |
|
|
|
// controlStream is nil represents that client is not connected. |
|
if controlStream == nil { |
|
return |
|
} |
|
|
|
_ = controlStream.CloseWithError(0, errString) |
|
} |
|
|
|
// Close the client. |
|
func (c *Client) Close() error { |
|
// break runBackgroud() for-loop. |
|
c.ctxCancel() |
|
|
|
// non-blocking to return Wait(). |
|
select { |
|
case c.shutdownChan <- nil: |
|
default: |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (c *Client) openControlStream(ctx context.Context, addr string) (ClientControlStream, error) { |
|
controlStream, err := OpenClientControlStream(ctx, addr, c.opts.tlsConfig, c.opts.quicConfig) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
if err := controlStream.Authenticate(c.opts.credential); err != nil { |
|
return nil, err |
|
} |
|
|
|
return controlStream, nil |
|
} |
|
|
|
func (c *Client) openStream(ctx context.Context, addr string) (ClientControlStream, DataStream, error) { |
|
controlStream, err := c.openControlStream(ctx, addr) |
|
if err != nil { |
|
return nil, nil, err |
|
} |
|
dataStream, err := c.openDataStream(ctx, controlStream) |
|
if err != nil { |
|
return nil, nil, err |
|
} |
|
|
|
return controlStream, dataStream, nil |
|
} |
|
|
|
func (c *Client) openDataStream(ctx context.Context, controlStream ClientControlStream) (DataStream, error) { |
|
handshakeFrame := frame.NewHandshakeFrame( |
|
c.name, |
|
c.clientId, |
|
byte(c.streamType), |
|
c.opts.observeDataTags, |
|
[]byte{}, // The stream does not require metadata currently. |
|
) |
|
dataStream, err := controlStream.OpenStream(ctx, handshakeFrame) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return dataStream, nil |
|
} |
|
|
|
func (c *Client) processStream(controlStream ClientControlStream, dataStream DataStream, reconnection chan<- struct{}) { |
|
defer func() { |
|
_ = dataStream.Close() |
|
}() |
|
|
|
var ( |
|
controlStreamErrChan = c.receivingStreamClose(controlStream, dataStream) |
|
readFrameChan = c.readFrame(dataStream) |
|
) |
|
for { |
|
select { |
|
case err := <-controlStreamErrChan: |
|
c.shutdownWithError(err) |
|
case result := <-readFrameChan: |
|
if err := result.err; err != nil { |
|
c.errorfn(err) |
|
reconnection <- struct{}{} |
|
return |
|
} |
|
c.handleFrame(result.frame) |
|
case f := <-c.writeFrameChan: |
|
err := dataStream.WriteFrame(f) |
|
// restore DataFrame. |
|
if d, ok := f.(*frame.DataFrame); ok { |
|
d.Clean() |
|
} |
|
if err != nil { |
|
c.errorfn(err) |
|
reconnection <- struct{}{} |
|
return |
|
} |
|
} |
|
} |
|
} |
|
|
|
// Wait waits client error returning. |
|
func (c *Client) Wait() error { |
|
err := <-c.shutdownChan |
|
return err |
|
} |
|
|
|
func (c *Client) shutdownWithError(err error) { |
|
// non-blocking shutdown client. |
|
select { |
|
case c.shutdownChan <- err: |
|
default: |
|
} |
|
} |
|
|
|
type readResult struct { |
|
frame frame.Frame |
|
err error |
|
} |
|
|
|
func (c *Client) readFrame(dataStream DataStream) chan readResult { |
|
readChan := make(chan readResult) |
|
go func() { |
|
for { |
|
f, err := dataStream.ReadFrame() |
|
readChan <- readResult{f, err} |
|
if err != nil { |
|
return |
|
} |
|
} |
|
}() |
|
|
|
return readChan |
|
} |
|
|
|
func (c *Client) handleFrame(f frame.Frame) { |
|
switch ff := f.(type) { |
|
case *frame.DataFrame: |
|
if c.processor == nil { |
|
c.logger.Warnf("%s client processor has not been set", ClientLogPrefix) |
|
} else { |
|
c.processor(ff) |
|
} |
|
case *frame.BackFlowFrame: |
|
if c.receiver == nil { |
|
c.logger.Warnf("%s client receiver has not been set", ClientLogPrefix) |
|
} else { |
|
c.receiver(ff) |
|
} |
|
default: |
|
c.logger.Warnf("%s client data stream receive unexcepted frame, frame_type: %v", ClientLogPrefix, f) |
|
} |
|
} |
|
|
|
func (c *Client) receivingStreamClose(controlStream ControlStream, dataStream DataStream) chan error { |
|
closeStreamChan := make(chan error) |
|
|
|
go func() { |
|
for { |
|
streamID, reason, err := controlStream.ReceiveStreamClose() |
|
if err != nil { |
|
closeStreamChan <- err |
|
return |
|
} |
|
if streamID == c.clientId { |
|
c.ctxCancel() |
|
_ = dataStream.Close() |
|
closeStreamChan <- errors.New(reason) |
|
_ = controlStream.CloseWithError(0, reason) |
|
return |
|
} |
|
} |
|
}() |
|
|
|
return closeStreamChan |
|
} |
|
|
|
// SetDataFrameObserver sets the data frame handler. |
|
func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) { |
|
c.processor = fn |
|
c.logger.Debugf("%s SetDataFrameObserver", ClientLogPrefix) |
|
} |
|
|
|
// SetBackFlowFrameObserver sets the backflow frame handler. |
|
func (c *Client) SetBackFlowFrameObserver(fn func(*frame.BackFlowFrame)) { |
|
c.receiver = fn |
|
c.logger.Debugf("%s SetBackFlowFrameObserver", ClientLogPrefix) |
|
} |
|
|
|
// SetObserveDataTags set the data tag list that will be observed. |
|
// Deprecated: use network.WithObserveDataTags instead |
|
func (c *Client) SetObserveDataTags(tag ...frame.Tag) { |
|
c.opts.observeDataTags = append(c.opts.observeDataTags, tag...) |
|
} |
|
|
|
// SetErrorHandler set error handler |
|
func (c *Client) SetErrorHandler(fn func(err error)) { |
|
c.errorfn = fn |
|
} |
|
|
|
// ClientId return the client ID |
|
func (c *Client) ClientId() string { |
|
return c.clientId |
|
} |
|
|
|
// Logger get client's logger instance, you can customize this using `hpds.WithLogger` |
|
func (c *Client) Logger() log.Logger { |
|
return c.logger |
|
}
|
|
|