cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2021.12.3

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

datagramsession/session.go

140lines · modecode

1package datagramsession
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "time"
8
9 "github.com/google/uuid"
10 "github.com/rs/zerolog"
11)
12
13const (
14 defaultCloseIdleAfter = time.Second * 210
15)
16
17func SessionIdleErr(timeout time.Duration) error {
18 return fmt.Errorf("session idle for %v", timeout)
19}
20
21// Session is a bidirectional pipe of datagrams between transport and dstConn
22// Currently the only implementation of transport is quic DatagramMuxer
23// Destination can be a connection with origin or with eyeball
24// When the destination is origin:
25// - Datagrams from edge are read by Manager from the transport. Manager finds the corresponding Session and calls the
26// write method of the Session to send to origin
27// - Datagrams from origin are read from conn and SentTo transport. Transport will return them to eyeball
28// When the destination is eyeball:
29// - Datagrams from eyeball are read from conn and SentTo transport. Transport will send them to cloudflared
30// - Datagrams from cloudflared are read by Manager from the transport. Manager finds the corresponding Session and calls the
31// write method of the Session to send to eyeball
32type Session struct {
33 ID uuid.UUID
34 transport transport
35 dstConn io.ReadWriteCloser
36 // activeAtChan is used to communicate the last read/write time
37 activeAtChan chan time.Time
38 closeChan chan error
39 log *zerolog.Logger
40}
41
42func newSession(id uuid.UUID, transport transport, dstConn io.ReadWriteCloser, log *zerolog.Logger) *Session {
43 return &Session{
44 ID: id,
45 transport: transport,
46 dstConn: dstConn,
47 // activeAtChan has low capacity. It can be full when there are many concurrent read/write. markActive() will
48 // drop instead of blocking because last active time only needs to be an approximation
49 activeAtChan: make(chan time.Time, 2),
50 // capacity is 2 because close() and dstToTransport routine in Serve() can write to this channel
51 closeChan: make(chan error, 2),
52 log: log,
53 }
54}
55
56func (s *Session) Serve(ctx context.Context, closeAfterIdle time.Duration) (closedByRemote bool, err error) {
57 go func() {
58 // QUIC implementation copies data to another buffer before returning https://github.com/lucas-clemente/quic-go/blob/v0.24.0/session.go#L1967-L1975
59 // This makes it safe to share readBuffer between iterations
60 const maxPacketSize = 1500
61 readBuffer := make([]byte, maxPacketSize)
62 for {
63 if err := s.dstToTransport(readBuffer); err != nil {
64 s.closeChan <- err
65 return
66 }
67 }
68 }()
69 err = s.waitForCloseCondition(ctx, closeAfterIdle)
70 if closeSession, ok := err.(*errClosedSession); ok {
71 closedByRemote = closeSession.byRemote
72 }
73 return closedByRemote, err
74}
75
76func (s *Session) waitForCloseCondition(ctx context.Context, closeAfterIdle time.Duration) error {
77 // Closing dstConn cancels read so dstToTransport routine in Serve() can return
78 defer s.dstConn.Close()
79 if closeAfterIdle == 0 {
80 // provide deafult is caller doesn't specify one
81 closeAfterIdle = defaultCloseIdleAfter
82 }
83
84 checkIdleFreq := closeAfterIdle / 8
85 checkIdleTicker := time.NewTicker(checkIdleFreq)
86 defer checkIdleTicker.Stop()
87
88 activeAt := time.Now()
89 for {
90 select {
91 case <-ctx.Done():
92 return ctx.Err()
93 case reason := <-s.closeChan:
94 return reason
95 // TODO: TUN-5423 evaluate if using atomic is more efficient
96 case now := <-checkIdleTicker.C:
97 // The session is considered inactive if current time is after (last active time + allowed idle time)
98 if now.After(activeAt.Add(closeAfterIdle)) {
99 return SessionIdleErr(closeAfterIdle)
100 }
101 case activeAt = <-s.activeAtChan: // Update last active time
102 }
103 }
104}
105
106func (s *Session) dstToTransport(buffer []byte) error {
107 n, err := s.dstConn.Read(buffer)
108 s.markActive()
109 if n > 0 {
110 if n <= int(s.transport.MTU()) {
111 err = s.transport.SendTo(s.ID, buffer[:n])
112 } else {
113 // drop packet for now, eventually reply with ICMP for PMTUD
114 s.log.Debug().
115 Str("session", s.ID.String()).
116 Int("len", n).
117 Uint("mtu", s.transport.MTU()).
118 Msg("dropped packet exceeding MTU")
119 }
120 }
121 return err
122}
123
124func (s *Session) transportToDst(payload []byte) (int, error) {
125 s.markActive()
126 return s.dstConn.Write(payload)
127}
128
129// Sends the last active time to the idle checker loop without blocking. activeAtChan will only be full when there
130// are many concurrent read/write. It is fine to lose some precision
131func (s *Session) markActive() {
132 select {
133 case s.activeAtChan <- time.Now():
134 default:
135 }
136}
137
138func (s *Session) close(err *errClosedSession) {
139 s.closeChan <- err
140}
141