328 lines
8.0 KiB
Go
328 lines
8.0 KiB
Go
package network
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"git.hpds.cc/Component/network/id"
|
|
"time"
|
|
|
|
"git.hpds.cc/Component/network/frame"
|
|
"git.hpds.cc/Component/network/log"
|
|
)
|
|
|
|
// ClientOption client options
|
|
type ClientOption func(*clientOptions)
|
|
|
|
// Client is the abstraction of a HPDS-Client. a HPDS-Client can be
|
|
// Protocol Gateway, Message Queue or StreamFunction.
|
|
type Client struct {
|
|
name string // name of the client
|
|
clientId string // id of the client
|
|
streamType StreamType // type of the dataStream
|
|
processor func(*frame.DataFrame) // function to invoke when data arrived
|
|
receiver func(*frame.BackFlowFrame) // function to invoke when data is processed
|
|
errorfn func(error) // function to invoke when error occured
|
|
opts *clientOptions
|
|
|
|
// ctx and ctxCancel manage the lifecycle of client.
|
|
ctx context.Context
|
|
ctxCancel context.CancelFunc
|
|
logger log.Logger
|
|
writeFrameChan chan frame.Frame
|
|
shutdownChan chan error
|
|
}
|
|
|
|
// NewClient creates a new HPDS-Client.
|
|
func NewClient(appName string, connType ClientType, opts ...ClientOption) *Client {
|
|
option := defaultClientOption()
|
|
|
|
for _, o := range opts {
|
|
o(option)
|
|
}
|
|
clientId := id.New()
|
|
|
|
if option.credential != nil {
|
|
log.Infof("%s use credential, credential_name: %s;", ClientLogPrefix, option.credential.Name())
|
|
}
|
|
|
|
ctx, ctxCancel := context.WithCancel(context.Background())
|
|
|
|
cli := new(Client)
|
|
if cli.logger == nil {
|
|
cli.logger = log.Default()
|
|
}
|
|
cli.name = appName
|
|
cli.clientId = clientId
|
|
cli.streamType = connType
|
|
cli.opts = option
|
|
cli.errorfn = func(err error) {
|
|
cli.logger.Errorf("%s client err, %s", ClientLogPrefix, err)
|
|
}
|
|
cli.writeFrameChan = make(chan frame.Frame)
|
|
cli.shutdownChan = make(chan error, 1)
|
|
cli.ctx = ctx
|
|
cli.ctxCancel = ctxCancel
|
|
return cli
|
|
}
|
|
|
|
// Connect connects to HPDS-MessageQueue.
|
|
func (c *Client) Connect(ctx context.Context, addr string) error {
|
|
controlStream, dataStream, err := c.openStream(ctx, addr)
|
|
if err != nil {
|
|
c.logger.Errorf("%s connect error, %s", ClientLogPrefix, err)
|
|
return err
|
|
}
|
|
|
|
go c.runBackground(ctx, addr, controlStream, dataStream)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) runBackground(ctx context.Context, addr string, controlStream ClientControlStream, dataStream DataStream) {
|
|
reconnection := make(chan struct{})
|
|
|
|
go c.processStream(controlStream, dataStream, reconnection)
|
|
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
c.cleanStream(controlStream, nil)
|
|
return
|
|
case <-ctx.Done():
|
|
c.cleanStream(controlStream, ctx.Err())
|
|
return
|
|
case <-reconnection:
|
|
RECONNECT:
|
|
var err error
|
|
controlStream, dataStream, err = c.openStream(ctx, addr)
|
|
if err != nil {
|
|
c.logger.Errorf("%s client reconnect error, %s", ClientLogPrefix, err)
|
|
time.Sleep(time.Second)
|
|
goto RECONNECT
|
|
}
|
|
go c.processStream(controlStream, dataStream, reconnection)
|
|
}
|
|
}
|
|
}
|
|
|
|
// WriteFrame write frame to client.
|
|
func (c *Client) WriteFrame(f frame.Frame) error {
|
|
c.writeFrameChan <- f
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) cleanStream(controlStream ClientControlStream, err error) {
|
|
errString := ""
|
|
if err != nil {
|
|
errString = err.Error()
|
|
c.logger.Errorf("%s client cancel with error, %s", ClientLogPrefix, err)
|
|
}
|
|
|
|
// controlStream is nil represents that client is not connected.
|
|
if controlStream == nil {
|
|
return
|
|
}
|
|
|
|
_ = controlStream.CloseWithError(0, errString)
|
|
}
|
|
|
|
// Close the client.
|
|
func (c *Client) Close() error {
|
|
// break runBackgroud() for-loop.
|
|
c.ctxCancel()
|
|
|
|
// non-blocking to return Wait().
|
|
select {
|
|
case c.shutdownChan <- nil:
|
|
default:
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) openControlStream(ctx context.Context, addr string) (ClientControlStream, error) {
|
|
controlStream, err := OpenClientControlStream(ctx, addr, c.opts.tlsConfig, c.opts.quicConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := controlStream.Authenticate(c.opts.credential); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return controlStream, nil
|
|
}
|
|
|
|
func (c *Client) openStream(ctx context.Context, addr string) (ClientControlStream, DataStream, error) {
|
|
controlStream, err := c.openControlStream(ctx, addr)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
dataStream, err := c.openDataStream(ctx, controlStream)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return controlStream, dataStream, nil
|
|
}
|
|
|
|
func (c *Client) openDataStream(ctx context.Context, controlStream ClientControlStream) (DataStream, error) {
|
|
handshakeFrame := frame.NewHandshakeFrame(
|
|
c.name,
|
|
c.clientId,
|
|
byte(c.streamType),
|
|
c.opts.observeDataTags,
|
|
[]byte{}, // The stream does not require metadata currently.
|
|
)
|
|
dataStream, err := controlStream.OpenStream(ctx, handshakeFrame)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return dataStream, nil
|
|
}
|
|
|
|
func (c *Client) processStream(controlStream ClientControlStream, dataStream DataStream, reconnection chan<- struct{}) {
|
|
defer func() {
|
|
_ = dataStream.Close()
|
|
}()
|
|
|
|
var (
|
|
controlStreamErrChan = c.receivingStreamClose(controlStream, dataStream)
|
|
readFrameChan = c.readFrame(dataStream)
|
|
)
|
|
for {
|
|
select {
|
|
case err := <-controlStreamErrChan:
|
|
c.shutdownWithError(err)
|
|
case result := <-readFrameChan:
|
|
if err := result.err; err != nil {
|
|
c.errorfn(err)
|
|
reconnection <- struct{}{}
|
|
return
|
|
}
|
|
c.handleFrame(result.frame)
|
|
case f := <-c.writeFrameChan:
|
|
err := dataStream.WriteFrame(f)
|
|
// restore DataFrame.
|
|
if d, ok := f.(*frame.DataFrame); ok {
|
|
d.Clean()
|
|
}
|
|
if err != nil {
|
|
c.errorfn(err)
|
|
reconnection <- struct{}{}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Wait waits client error returning.
|
|
func (c *Client) Wait() error {
|
|
err := <-c.shutdownChan
|
|
return err
|
|
}
|
|
|
|
func (c *Client) shutdownWithError(err error) {
|
|
// non-blocking shutdown client.
|
|
select {
|
|
case c.shutdownChan <- err:
|
|
default:
|
|
}
|
|
}
|
|
|
|
type readResult struct {
|
|
frame frame.Frame
|
|
err error
|
|
}
|
|
|
|
func (c *Client) readFrame(dataStream DataStream) chan readResult {
|
|
readChan := make(chan readResult)
|
|
go func() {
|
|
for {
|
|
f, err := dataStream.ReadFrame()
|
|
readChan <- readResult{f, err}
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return readChan
|
|
}
|
|
|
|
func (c *Client) handleFrame(f frame.Frame) {
|
|
switch ff := f.(type) {
|
|
case *frame.DataFrame:
|
|
if c.processor == nil {
|
|
c.logger.Warnf("%s client processor has not been set", ClientLogPrefix)
|
|
} else {
|
|
c.processor(ff)
|
|
}
|
|
case *frame.BackFlowFrame:
|
|
if c.receiver == nil {
|
|
c.logger.Warnf("%s client receiver has not been set", ClientLogPrefix)
|
|
} else {
|
|
c.receiver(ff)
|
|
}
|
|
default:
|
|
c.logger.Warnf("%s client data stream receive unexcepted frame, frame_type: %v", ClientLogPrefix, f)
|
|
}
|
|
}
|
|
|
|
func (c *Client) receivingStreamClose(controlStream ControlStream, dataStream DataStream) chan error {
|
|
closeStreamChan := make(chan error)
|
|
|
|
go func() {
|
|
for {
|
|
streamID, reason, err := controlStream.ReceiveStreamClose()
|
|
if err != nil {
|
|
closeStreamChan <- err
|
|
return
|
|
}
|
|
if streamID == c.clientId {
|
|
c.ctxCancel()
|
|
_ = dataStream.Close()
|
|
closeStreamChan <- errors.New(reason)
|
|
_ = controlStream.CloseWithError(0, reason)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return closeStreamChan
|
|
}
|
|
|
|
// SetDataFrameObserver sets the data frame handler.
|
|
func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) {
|
|
c.processor = fn
|
|
c.logger.Debugf("%s SetDataFrameObserver", ClientLogPrefix)
|
|
}
|
|
|
|
// SetBackFlowFrameObserver sets the backflow frame handler.
|
|
func (c *Client) SetBackFlowFrameObserver(fn func(*frame.BackFlowFrame)) {
|
|
c.receiver = fn
|
|
c.logger.Debugf("%s SetBackFlowFrameObserver", ClientLogPrefix)
|
|
}
|
|
|
|
// SetObserveDataTags set the data tag list that will be observed.
|
|
// Deprecated: use network.WithObserveDataTags instead
|
|
func (c *Client) SetObserveDataTags(tag ...frame.Tag) {
|
|
c.opts.observeDataTags = append(c.opts.observeDataTags, tag...)
|
|
}
|
|
|
|
// SetErrorHandler set error handler
|
|
func (c *Client) SetErrorHandler(fn func(err error)) {
|
|
c.errorfn = fn
|
|
}
|
|
|
|
// ClientId return the client ID
|
|
func (c *Client) ClientId() string {
|
|
return c.clientId
|
|
}
|
|
|
|
// Logger get client's logger instance, you can customize this using `hpds.WithLogger`
|
|
func (c *Client) Logger() log.Logger {
|
|
return c.logger
|
|
}
|