You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

57 lines
1.5 KiB

package network
import (
"context"
"sync"
)
// StreamGroup is the group of stream includes ControlStream amd DataStream.
// One Connection has many DataStream and only one ControlStream, ControlStream authenticates
// Connection and recevies HandshakeFrame and CloseStreamFrame to create DataStream or close
// stream. the ControlStream always the first stream established between server and client.
type StreamGroup struct {
ctx context.Context
controlStream ServerControlStream
connector *Connector
group sync.WaitGroup
}
// NewStreamGroup returns StreamGroup.
func NewStreamGroup(ctx context.Context, controlStream ServerControlStream, connector *Connector) *StreamGroup {
group := &StreamGroup{
ctx: ctx,
controlStream: controlStream,
connector: connector,
}
return group
}
// Run run contextFunc with connector.
// Run continus Accepts DataStream and create a Context to run with contextFunc.
// TODO: run in aop model, like setMetadata -> handleRoute -> before -> handle -> after.
func (g *StreamGroup) Run(contextFunc func(c *Context)) error {
for {
dataStream, err := g.controlStream.AcceptStream(g.ctx)
if err != nil {
return err
}
g.group.Add(1)
_ = g.connector.Add(dataStream.ID(), dataStream)
go func() {
defer func() {
g.group.Done()
_ = g.connector.Remove(dataStream.ID())
}()
c := newContext(dataStream)
defer c.Clean()
contextFunc(c)
}()
}
}
// Wait waits all dataStream down.
func (g *StreamGroup) Wait() { g.group.Wait() }