58 lines
		
	
	
		
			1.5 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			58 lines
		
	
	
		
			1.5 KiB
		
	
	
	
		
			Go
		
	
	
	
package network
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"sync"
 | 
						|
)
 | 
						|
 | 
						|
// StreamGroup is the group of stream includes ControlStream amd DataStream.
 | 
						|
// One Connection has many DataStream and only one ControlStream, ControlStream authenticates
 | 
						|
// Connection and recevies HandshakeFrame and CloseStreamFrame to create DataStream or close
 | 
						|
// stream. the ControlStream always the first stream established between server and client.
 | 
						|
type StreamGroup struct {
 | 
						|
	ctx           context.Context
 | 
						|
	controlStream ServerControlStream
 | 
						|
	connector     *Connector
 | 
						|
	group         sync.WaitGroup
 | 
						|
}
 | 
						|
 | 
						|
// NewStreamGroup returns StreamGroup.
 | 
						|
func NewStreamGroup(ctx context.Context, controlStream ServerControlStream, connector *Connector) *StreamGroup {
 | 
						|
	group := &StreamGroup{
 | 
						|
		ctx:           ctx,
 | 
						|
		controlStream: controlStream,
 | 
						|
		connector:     connector,
 | 
						|
	}
 | 
						|
	return group
 | 
						|
}
 | 
						|
 | 
						|
// Run run contextFunc with connector.
 | 
						|
// Run continus Accepts DataStream and create a Context to run with contextFunc.
 | 
						|
// TODO: run in aop model, like setMetadata -> handleRoute -> before -> handle -> after.
 | 
						|
func (g *StreamGroup) Run(contextFunc func(c *Context)) error {
 | 
						|
	for {
 | 
						|
		dataStream, err := g.controlStream.AcceptStream(g.ctx)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		g.group.Add(1)
 | 
						|
		_ = g.connector.Add(dataStream.ID(), dataStream)
 | 
						|
 | 
						|
		go func() {
 | 
						|
			defer func() {
 | 
						|
				g.group.Done()
 | 
						|
				_ = g.connector.Remove(dataStream.ID())
 | 
						|
			}()
 | 
						|
 | 
						|
			c := newContext(dataStream)
 | 
						|
			defer c.Clean()
 | 
						|
 | 
						|
			contextFunc(c)
 | 
						|
		}()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Wait waits all dataStream down.
 | 
						|
func (g *StreamGroup) Wait() { g.group.Wait() }
 |