cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2021.12.2

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

datagramsession/session.go

129lines · modecode

1package datagramsession
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "time"
8
9 "github.com/google/uuid"
10)
11
12const (
13 defaultCloseIdleAfter = time.Second * 210
14)
15
16func SessionIdleErr(timeout time.Duration) error {
17 return fmt.Errorf("session idle for %v", timeout)
18}
19
20// Each Session is a bidirectional pipe of datagrams between transport and dstConn
21// Currently the only implementation of transport is quic DatagramMuxer
22// Destination can be a connection with origin or with eyeball
23// When the destination is origin:
24// - Datagrams from edge are read by Manager from the transport. Manager finds the corresponding Session and calls the
25// write method of the Session to send to origin
26// - Datagrams from origin are read from conn and SentTo transport. Transport will return them to eyeball
27// When the destination is eyeball:
28// - Datagrams from eyeball are read from conn and SentTo transport. Transport will send them to cloudflared
29// - Datagrams from cloudflared are read by Manager from the transport. Manager finds the corresponding Session and calls the
30// write method of the Session to send to eyeball
31type Session struct {
32 ID uuid.UUID
33 transport transport
34 dstConn io.ReadWriteCloser
35 // activeAtChan is used to communicate the last read/write time
36 activeAtChan chan time.Time
37 closeChan chan error
38}
39
40func newSession(id uuid.UUID, transport transport, dstConn io.ReadWriteCloser) *Session {
41 return &Session{
42 ID: id,
43 transport: transport,
44 dstConn: dstConn,
45 // activeAtChan has low capacity. It can be full when there are many concurrent read/write. markActive() will
46 // drop instead of blocking because last active time only needs to be an approximation
47 activeAtChan: make(chan time.Time, 2),
48 // capacity is 2 because close() and dstToTransport routine in Serve() can write to this channel
49 closeChan: make(chan error, 2),
50 }
51}
52
53func (s *Session) Serve(ctx context.Context, closeAfterIdle time.Duration) (closedByRemote bool, err error) {
54 go func() {
55 // QUIC implementation copies data to another buffer before returning https://github.com/lucas-clemente/quic-go/blob/v0.24.0/session.go#L1967-L1975
56 // This makes it safe to share readBuffer between iterations
57 readBuffer := make([]byte, s.transport.MTU())
58 for {
59 if err := s.dstToTransport(readBuffer); err != nil {
60 s.closeChan <- err
61 return
62 }
63 }
64 }()
65 err = s.waitForCloseCondition(ctx, closeAfterIdle)
66 if closeSession, ok := err.(*errClosedSession); ok {
67 closedByRemote = closeSession.byRemote
68 }
69 return closedByRemote, err
70}
71
72func (s *Session) waitForCloseCondition(ctx context.Context, closeAfterIdle time.Duration) error {
73 // Closing dstConn cancels read so dstToTransport routine in Serve() can return
74 defer s.dstConn.Close()
75 if closeAfterIdle == 0 {
76 // provide deafult is caller doesn't specify one
77 closeAfterIdle = defaultCloseIdleAfter
78 }
79
80 checkIdleFreq := closeAfterIdle / 8
81 checkIdleTicker := time.NewTicker(checkIdleFreq)
82 defer checkIdleTicker.Stop()
83
84 activeAt := time.Now()
85 for {
86 select {
87 case <-ctx.Done():
88 return ctx.Err()
89 case reason := <-s.closeChan:
90 return reason
91 // TODO: TUN-5423 evaluate if using atomic is more efficient
92 case now := <-checkIdleTicker.C:
93 // The session is considered inactive if current time is after (last active time + allowed idle time)
94 if now.After(activeAt.Add(closeAfterIdle)) {
95 return SessionIdleErr(closeAfterIdle)
96 }
97 case activeAt = <-s.activeAtChan: // Update last active time
98 }
99 }
100}
101
102func (s *Session) dstToTransport(buffer []byte) error {
103 n, err := s.dstConn.Read(buffer)
104 s.markActive()
105 if n > 0 {
106 if err := s.transport.SendTo(s.ID, buffer[:n]); err != nil {
107 return err
108 }
109 }
110 return err
111}
112
113func (s *Session) transportToDst(payload []byte) (int, error) {
114 s.markActive()
115 return s.dstConn.Write(payload)
116}
117
118// Sends the last active time to the idle checker loop without blocking. activeAtChan will only be full when there
119// are many concurrent read/write. It is fine to lose some precision
120func (s *Session) markActive() {
121 select {
122 case s.activeAtChan <- time.Now():
123 default:
124 }
125}
126
127func (s *Session) close(err *errClosedSession) {
128 s.closeChan <- err
129}
130