network/client.go

328 lines
7.8 KiB
Go
Raw Normal View History

2022-10-11 17:36:09 +08:00
package network
import (
"context"
"errors"
"git.hpds.cc/Component/network/id"
"time"
"git.hpds.cc/Component/network/frame"
"git.hpds.cc/Component/network/log"
)
// ClientOption client options
2023-04-05 16:15:59 +08:00
type ClientOption func(*clientOptions)
2022-10-11 17:36:09 +08:00
// Client is the abstraction of a HPDS-Client. a HPDS-Client can be
// Protocol Gateway, Message Queue or StreamFunction.
type Client struct {
name string // name of the client
clientId string // id of the client
2023-04-05 16:15:59 +08:00
streamType StreamType // type of the dataStream
processor func(*frame.DataFrame) // function to invoke when data arrived
receiver func(*frame.BackFlowFrame) // function to invoke when data is processed
errorfn func(error) // function to invoke when error occured
opts *clientOptions
// ctx and ctxCancel manage the lifecycle of client.
2023-04-05 16:41:15 +08:00
ctx context.Context
ctxCancel context.CancelFunc
logger log.Logger
2023-04-05 16:15:59 +08:00
writeFrameChan chan frame.Frame
shutdownChan chan error
2022-10-11 17:36:09 +08:00
}
// NewClient creates a new HPDS-Client.
func NewClient(appName string, connType ClientType, opts ...ClientOption) *Client {
2023-04-05 16:15:59 +08:00
option := defaultClientOption()
for _, o := range opts {
o(option)
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
clientId := id.New()
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
if option.credential != nil {
log.Infof("use credential, credential_name: %s;", option.credential.Name())
}
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
ctx, ctxCancel := context.WithCancel(context.Background())
2023-04-05 16:41:15 +08:00
cli := new(Client)
if cli.logger == nil {
cli.logger = log.Default()
}
cli.name = appName
cli.clientId = clientId
cli.streamType = connType
cli.opts = option
cli.errorfn = func(err error) {
cli.logger.Errorf("client err, %s", err)
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:41:15 +08:00
cli.writeFrameChan = make(chan frame.Frame)
cli.shutdownChan = make(chan error, 1)
cli.ctx = ctx
cli.ctxCancel = ctxCancel
return cli
2022-10-11 17:36:09 +08:00
}
// Connect connects to HPDS-MessageQueue.
func (c *Client) Connect(ctx context.Context, addr string) error {
2023-04-05 16:15:59 +08:00
controlStream, dataStream, err := c.openStream(ctx, addr)
if err != nil {
2023-04-05 16:41:15 +08:00
c.logger.Errorf("connect error, %s", err)
2022-10-11 17:36:09 +08:00
return err
}
2023-04-05 16:15:59 +08:00
go c.runBackground(ctx, addr, controlStream, dataStream)
2022-10-11 17:36:09 +08:00
return nil
}
2023-04-05 16:15:59 +08:00
func (c *Client) runBackground(ctx context.Context, addr string, controlStream ClientControlStream, dataStream DataStream) {
reconnection := make(chan struct{})
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
go c.processStream(controlStream, dataStream, reconnection)
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
for {
select {
case <-c.ctx.Done():
c.cleanStream(controlStream, nil)
return
case <-ctx.Done():
c.cleanStream(controlStream, ctx.Err())
return
case <-reconnection:
RECONNECT:
var err error
controlStream, dataStream, err = c.openStream(ctx, addr)
if err != nil {
2023-04-05 16:41:15 +08:00
c.logger.Errorf("client reconnect error, %s", err)
2023-04-05 16:15:59 +08:00
time.Sleep(time.Second)
goto RECONNECT
}
go c.processStream(controlStream, dataStream, reconnection)
}
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
}
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
// WriteFrame write frame to client.
func (c *Client) WriteFrame(f frame.Frame) error {
c.writeFrameChan <- f
return nil
}
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
func (c *Client) cleanStream(controlStream ClientControlStream, err error) {
errString := ""
2022-10-11 17:36:09 +08:00
if err != nil {
2023-04-05 16:15:59 +08:00
errString = err.Error()
2023-04-05 16:41:15 +08:00
c.logger.Errorf("client cancel with error, %s", err)
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
// controlStream is nil represents that client is not connected.
if controlStream == nil {
return
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
_ = controlStream.CloseWithError(0, errString)
2022-10-11 17:36:09 +08:00
}
// Close the client.
2023-04-05 16:15:59 +08:00
func (c *Client) Close() error {
// break runBackgroud() for-loop.
c.ctxCancel()
// non-blocking to return Wait().
select {
case c.shutdownChan <- nil:
default:
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
return nil
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
func (c *Client) openControlStream(ctx context.Context, addr string) (ClientControlStream, error) {
controlStream, err := OpenClientControlStream(ctx, addr, c.opts.tlsConfig, c.opts.quicConfig)
2022-10-11 17:36:09 +08:00
if err != nil {
2023-04-05 16:15:59 +08:00
return nil, err
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
if err := controlStream.Authenticate(c.opts.credential); err != nil {
return nil, err
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
return controlStream, nil
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
func (c *Client) openStream(ctx context.Context, addr string) (ClientControlStream, DataStream, error) {
controlStream, err := c.openControlStream(ctx, addr)
if err != nil {
return nil, nil, err
}
dataStream, err := c.openDataStream(ctx, controlStream)
if err != nil {
return nil, nil, err
}
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
return controlStream, dataStream, nil
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
func (c *Client) openDataStream(ctx context.Context, controlStream ClientControlStream) (DataStream, error) {
handshakeFrame := frame.NewHandshakeFrame(
c.name,
c.clientId,
byte(c.streamType),
c.opts.observeDataTags,
[]byte{}, // The stream does not require metadata currently.
)
dataStream, err := controlStream.OpenStream(ctx, handshakeFrame)
if err != nil {
return nil, err
}
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
return dataStream, nil
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
func (c *Client) processStream(controlStream ClientControlStream, dataStream DataStream, reconnection chan<- struct{}) {
defer func() {
_ = dataStream.Close()
}()
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
var (
controlStreamErrChan = c.receivingStreamClose(controlStream, dataStream)
readFrameChan = c.readFrame(dataStream)
)
2022-10-11 17:36:09 +08:00
for {
select {
2023-04-05 16:15:59 +08:00
case err := <-controlStreamErrChan:
c.shutdownWithError(err)
case result := <-readFrameChan:
if err := result.err; err != nil {
c.errorfn(err)
reconnection <- struct{}{}
return
}
c.handleFrame(result.frame)
case f := <-c.writeFrameChan:
err := dataStream.WriteFrame(f)
// restore DataFrame.
if d, ok := f.(*frame.DataFrame); ok {
d.Clean()
}
if err != nil {
c.errorfn(err)
reconnection <- struct{}{}
return
2022-10-11 17:36:09 +08:00
}
}
}
}
2023-04-05 16:15:59 +08:00
// Wait waits client error returning.
func (c *Client) Wait() error {
err := <-c.shutdownChan
return err
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
func (c *Client) shutdownWithError(err error) {
// non-blocking shutdown client.
select {
case c.shutdownChan <- err:
default:
}
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
type readResult struct {
frame frame.Frame
err error
}
func (c *Client) readFrame(dataStream DataStream) chan readResult {
readChan := make(chan readResult)
go func() {
for {
f, err := dataStream.ReadFrame()
readChan <- readResult{f, err}
if err != nil {
return
}
}
}()
return readChan
}
func (c *Client) handleFrame(f frame.Frame) {
switch ff := f.(type) {
case *frame.DataFrame:
if c.processor == nil {
2023-04-05 16:41:15 +08:00
c.logger.Warnf("client processor has not been set")
2022-10-11 17:36:09 +08:00
} else {
2023-04-05 16:15:59 +08:00
c.processor(ff)
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
case *frame.BackFlowFrame:
if c.receiver == nil {
2023-04-05 16:41:15 +08:00
c.logger.Warnf("client receiver has not been set")
2023-04-05 16:15:59 +08:00
} else {
c.receiver(ff)
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
default:
2023-04-05 16:41:15 +08:00
c.logger.Warnf("client data stream receive unexcepted frame, frame_type: %v", f)
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
}
func (c *Client) receivingStreamClose(controlStream ControlStream, dataStream DataStream) chan error {
closeStreamChan := make(chan error)
go func() {
for {
streamID, reason, err := controlStream.ReceiveStreamClose()
if err != nil {
closeStreamChan <- err
return
}
if streamID == c.clientId {
c.ctxCancel()
_ = dataStream.Close()
closeStreamChan <- errors.New(reason)
_ = controlStream.CloseWithError(0, reason)
return
}
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
}()
2022-10-11 17:36:09 +08:00
2023-04-05 16:15:59 +08:00
return closeStreamChan
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
// SetDataFrameObserver sets the data frame handler.
func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) {
c.processor = fn
2023-04-05 16:41:15 +08:00
c.logger.Debugf("SetDataFrameObserver")
2022-10-11 17:36:09 +08:00
}
2023-04-05 16:15:59 +08:00
// SetBackFlowFrameObserver sets the backflow frame handler.
func (c *Client) SetBackFlowFrameObserver(fn func(*frame.BackFlowFrame)) {
c.receiver = fn
2023-04-05 16:41:15 +08:00
c.logger.Debugf("SetBackFlowFrameObserver")
2023-04-05 16:15:59 +08:00
}
// SetObserveDataTags set the data tag list that will be observed.
// Deprecated: use yomo.WithObserveDataTags instead
func (c *Client) SetObserveDataTags(tag ...frame.Tag) {
c.opts.observeDataTags = append(c.opts.observeDataTags, tag...)
2022-10-11 17:36:09 +08:00
}
// SetErrorHandler set error handler
func (c *Client) SetErrorHandler(fn func(err error)) {
2023-04-05 16:15:59 +08:00
c.errorfn = fn
2022-10-11 17:36:09 +08:00
}
// ClientId return the client ID
func (c *Client) ClientId() string {
return c.clientId
}
2023-04-05 16:41:15 +08:00
// Logger get client's logger instance, you can customize this using `hpds.WithLogger`
func (c *Client) Logger() log.Logger {
return c.logger
}