You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
57 lines
1.5 KiB
57 lines
1.5 KiB
package network |
|
|
|
import ( |
|
"context" |
|
"sync" |
|
) |
|
|
|
// StreamGroup is the group of stream includes ControlStream amd DataStream. |
|
// One Connection has many DataStream and only one ControlStream, ControlStream authenticates |
|
// Connection and recevies HandshakeFrame and CloseStreamFrame to create DataStream or close |
|
// stream. the ControlStream always the first stream established between server and client. |
|
type StreamGroup struct { |
|
ctx context.Context |
|
controlStream ServerControlStream |
|
connector *Connector |
|
group sync.WaitGroup |
|
} |
|
|
|
// NewStreamGroup returns StreamGroup. |
|
func NewStreamGroup(ctx context.Context, controlStream ServerControlStream, connector *Connector) *StreamGroup { |
|
group := &StreamGroup{ |
|
ctx: ctx, |
|
controlStream: controlStream, |
|
connector: connector, |
|
} |
|
return group |
|
} |
|
|
|
// Run run contextFunc with connector. |
|
// Run continus Accepts DataStream and create a Context to run with contextFunc. |
|
// TODO: run in aop model, like setMetadata -> handleRoute -> before -> handle -> after. |
|
func (g *StreamGroup) Run(contextFunc func(c *Context)) error { |
|
for { |
|
dataStream, err := g.controlStream.AcceptStream(g.ctx) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
g.group.Add(1) |
|
_ = g.connector.Add(dataStream.ID(), dataStream) |
|
|
|
go func() { |
|
defer func() { |
|
g.group.Done() |
|
_ = g.connector.Remove(dataStream.ID()) |
|
}() |
|
|
|
c := newContext(dataStream) |
|
defer c.Clean() |
|
|
|
contextFunc(c) |
|
}() |
|
} |
|
} |
|
|
|
// Wait waits all dataStream down. |
|
func (g *StreamGroup) Wait() { g.group.Wait() }
|
|
|