58 lines
1.5 KiB
Go
58 lines
1.5 KiB
Go
package network
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
)
|
|
|
|
// StreamGroup is the group of stream includes ControlStream amd DataStream.
|
|
// One Connection has many DataStream and only one ControlStream, ControlStream authenticates
|
|
// Connection and recevies HandshakeFrame and CloseStreamFrame to create DataStream or close
|
|
// stream. the ControlStream always the first stream established between server and client.
|
|
type StreamGroup struct {
|
|
ctx context.Context
|
|
controlStream ServerControlStream
|
|
connector *Connector
|
|
group sync.WaitGroup
|
|
}
|
|
|
|
// NewStreamGroup returns StreamGroup.
|
|
func NewStreamGroup(ctx context.Context, controlStream ServerControlStream, connector *Connector) *StreamGroup {
|
|
group := &StreamGroup{
|
|
ctx: ctx,
|
|
controlStream: controlStream,
|
|
connector: connector,
|
|
}
|
|
return group
|
|
}
|
|
|
|
// Run run contextFunc with connector.
|
|
// Run continus Accepts DataStream and create a Context to run with contextFunc.
|
|
// TODO: run in aop model, like setMetadata -> handleRoute -> before -> handle -> after.
|
|
func (g *StreamGroup) Run(contextFunc func(c *Context)) error {
|
|
for {
|
|
dataStream, err := g.controlStream.AcceptStream(g.ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
g.group.Add(1)
|
|
_ = g.connector.Add(dataStream.ID(), dataStream)
|
|
|
|
go func() {
|
|
defer func() {
|
|
g.group.Done()
|
|
_ = g.connector.Remove(dataStream.ID())
|
|
}()
|
|
|
|
c := newContext(dataStream)
|
|
defer c.Clean()
|
|
|
|
contextFunc(c)
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Wait waits all dataStream down.
|
|
func (g *StreamGroup) Wait() { g.group.Wait() }
|