cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2021.12.3

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

datagramsession/manager.go

139lines · modecode

1package datagramsession
2
3import (
4 "context"
5 "io"
6
7 "github.com/google/uuid"
8 "github.com/rs/zerolog"
9 "golang.org/x/sync/errgroup"
10)
11
12const (
13 requestChanCapacity = 16
14)
15
16// Manager defines the APIs to manage sessions from the same transport.
17type Manager interface {
18 // Serve starts the event loop
19 Serve(ctx context.Context) error
20 // RegisterSession starts tracking a session. Caller is responsible for starting the session
21 RegisterSession(ctx context.Context, sessionID uuid.UUID, dstConn io.ReadWriteCloser) (*Session, error)
22 // UnregisterSession stops tracking the session and terminates it
23 UnregisterSession(ctx context.Context, sessionID uuid.UUID, message string, byRemote bool) error
24}
25
26type manager struct {
27 registrationChan chan *registerSessionEvent
28 unregistrationChan chan *unregisterSessionEvent
29 datagramChan chan *newDatagram
30 transport transport
31 sessions map[uuid.UUID]*Session
32 log *zerolog.Logger
33}
34
35func NewManager(transport transport, log *zerolog.Logger) Manager {
36 return &manager{
37 registrationChan: make(chan *registerSessionEvent),
38 unregistrationChan: make(chan *unregisterSessionEvent),
39 // datagramChan is buffered, so it can read more datagrams from transport while the event loop is processing other events
40 datagramChan: make(chan *newDatagram, requestChanCapacity),
41 transport: transport,
42 sessions: make(map[uuid.UUID]*Session),
43 log: log,
44 }
45}
46
47func (m *manager) Serve(ctx context.Context) error {
48 errGroup, ctx := errgroup.WithContext(ctx)
49 errGroup.Go(func() error {
50 for {
51 sessionID, payload, err := m.transport.ReceiveFrom()
52 if err != nil {
53 return err
54 }
55 datagram := &newDatagram{
56 sessionID: sessionID,
57 payload: payload,
58 }
59 select {
60 case <-ctx.Done():
61 return ctx.Err()
62 // Only the event loop routine can update/lookup the sessions map to avoid concurrent access
63 // Send the datagram to the event loop. It will find the session to send to
64 case m.datagramChan <- datagram:
65 }
66 }
67 })
68 errGroup.Go(func() error {
69 for {
70 select {
71 case <-ctx.Done():
72 return ctx.Err()
73 case datagram := <-m.datagramChan:
74 m.sendToSession(datagram)
75 case registration := <-m.registrationChan:
76 m.registerSession(ctx, registration)
77 // TODO: TUN-5422: Unregister inactive session upon timeout
78 case unregistration := <-m.unregistrationChan:
79 m.unregisterSession(unregistration)
80 }
81 }
82 })
83 return errGroup.Wait()
84}
85
86func (m *manager) RegisterSession(ctx context.Context, sessionID uuid.UUID, originProxy io.ReadWriteCloser) (*Session, error) {
87 event := newRegisterSessionEvent(sessionID, originProxy)
88 select {
89 case <-ctx.Done():
90 return nil, ctx.Err()
91 case m.registrationChan <- event:
92 session := <-event.resultChan
93 return session, nil
94 }
95}
96
97func (m *manager) registerSession(ctx context.Context, registration *registerSessionEvent) {
98 session := newSession(registration.sessionID, m.transport, registration.originProxy, m.log)
99 m.sessions[registration.sessionID] = session
100 registration.resultChan <- session
101}
102
103func (m *manager) UnregisterSession(ctx context.Context, sessionID uuid.UUID, message string, byRemote bool) error {
104 event := &unregisterSessionEvent{
105 sessionID: sessionID,
106 err: &errClosedSession{
107 message: message,
108 byRemote: byRemote,
109 },
110 }
111 select {
112 case <-ctx.Done():
113 return ctx.Err()
114 case m.unregistrationChan <- event:
115 return nil
116 }
117}
118
119func (m *manager) unregisterSession(unregistration *unregisterSessionEvent) {
120 session, ok := m.sessions[unregistration.sessionID]
121 if ok {
122 delete(m.sessions, unregistration.sessionID)
123 session.close(unregistration.err)
124 }
125}
126
127func (m *manager) sendToSession(datagram *newDatagram) {
128 session, ok := m.sessions[datagram.sessionID]
129 if !ok {
130 m.log.Error().Str("sessionID", datagram.sessionID.String()).Msg("session not found")
131 return
132 }
133 // session writes to destination over a connected UDP socket, which should not be blocking, so this call doesn't
134 // need to run in another go routine
135 _, err := session.transportToDst(datagram.payload)
136 if err != nil {
137 m.log.Err(err).Str("sessionID", datagram.sessionID.String()).Msg("Failed to write payload to session")
138 }
139}
140