package hpds_node

import (
	"context"
	"git.hpds.cc/Component/network"
	"git.hpds.cc/Component/network/frame"
	"git.hpds.cc/Component/network/log"
)

const (
	apLogPrefix = "\033[32m[hpds:access_point]\033[0m "
)

// AccessPoint is responsible for sending data to hpds.
type AccessPoint interface {
	// Close will close the connection to HPDS-Mq.
	Close() error
	// Connect to HPDS-Mq.
	Connect() error
	// SetDataTag will set the tag of data when invoking Write().
	SetDataTag(tag frame.Tag)
	// Write the data to downstream.
	Write(p []byte) (n int, err error)
	// WriteWithTag will write data with specified tag, default transactionId is epoch time.
	WriteWithTag(tag frame.Tag, data []byte) error
	// SetErrorHandler set the error handler function when server error occurs
	SetErrorHandler(fn func(err error))
	// SetReceiveHandler [Experimental] set to observe handler function
	SetReceiveHandler(fn func(tag frame.Tag, data []byte))
	// Broadcast the data to all downstream
	Broadcast(data []byte) error
}

// hpds-AccessPoint
type accessPoint struct {
	name       string
	mqEndpoint string
	client     *network.Client
	tag        frame.Tag
	fn         func(frame.Tag, []byte)
}

var _ AccessPoint = &accessPoint{}

// NewAccessPoint create a hpds-AccessPoint
func NewAccessPoint(name string, opts ...Option) AccessPoint {
	options := NewOptions(opts...)
	client := network.NewClient(name, network.ClientTypeSource, options.ClientOptions...)

	return &accessPoint{
		name:       name,
		mqEndpoint: options.MqAddr,
		client:     client,
	}
}

// Write the data to downstream.
func (s *accessPoint) Write(data []byte) (int, error) {
	err := s.WriteWithTag(s.tag, data)
	if err != nil {
		return 0, err
	}
	return len(data), nil
}

// SetDataTag will set the tag of data when invoking Write().
func (s *accessPoint) SetDataTag(tag frame.Tag) {
	s.tag = tag
}

// Close will close the connection to MessageQueue.
func (s *accessPoint) Close() error {
	if err := s.client.Close(); err != nil {
		log.Errorf("%s Close(): %v", apLogPrefix, err)
		return err
	}
	log.Debugf("%s is closed", apLogPrefix)
	return nil
}

// Connect to YoMo-MessageQueue.
func (s *accessPoint) Connect() error {
	// set backFlowFrame handler
	s.client.SetBackFlowFrameObserver(func(frm *frame.BackFlowFrame) {
		if s.fn != nil {
			s.fn(frm.GetDataTag(), frm.GetCarriage())
		}
	})

	err := s.client.Connect(context.Background(), s.mqEndpoint)
	if err != nil {
		log.Errorf("%s Connect() error: %s", apLogPrefix, err)
	}
	return err
}

// WriteWithTag will write data with specified tag, default transactionID is epoch time.
func (s *accessPoint) WriteWithTag(tag frame.Tag, data []byte) error {
	f := frame.NewDataFrame()
	f.SetCarriage(tag, data)
	f.SetSourceId(s.client.ClientId())
	log.Debugf("%s WriteWithTag: tid=%s, source_id=%s, data[%d]",
		apLogPrefix, f.TransactionId(), f.SourceId(), len(data))
	return s.client.WriteFrame(f)
}

// SetErrorHandler set the error handler function when server error occurs
func (s *accessPoint) SetErrorHandler(fn func(err error)) {
	s.client.SetErrorHandler(fn)
}

// SetReceiveHandler [Experimental] set to observe handler function
func (s *accessPoint) SetReceiveHandler(fn func(frame.Tag, []byte)) {
	s.fn = fn
	log.Debugf("%s SetReceiveHandler(%v)", apLogPrefix, s.fn)
}

// Broadcast Write the data to all downstream
func (s *accessPoint) Broadcast(data []byte) error {
	f := frame.NewDataFrame()
	f.SetCarriage(s.tag, data)
	f.SetSourceId(s.client.ClientId())
	f.SetBroadcast(true)
	log.Debugf("Broadcast", "data_frame", f.String())
	return s.client.WriteFrame(f)
}