cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.10.1

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/manager.go

294lines · modecode

1package connection
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "net"
8 "sync"
9 "time"
10
11 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
12 "github.com/cloudflare/cloudflared/h2mux"
13 "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
14 "github.com/google/uuid"
15 "github.com/pkg/errors"
16 "github.com/sirupsen/logrus"
17)
18
19const (
20 quickStartLink = "https://developers.cloudflare.com/argo-tunnel/quickstart/"
21 faqLink = "https://developers.cloudflare.com/argo-tunnel/faq/"
22 defaultRetryAfter = time.Second * 5
23)
24
25// EdgeManager manages connections with the edge
26type EdgeManager struct {
27 // streamHandler handles stream opened by the edge
28 streamHandler h2mux.MuxedStreamHandler
29 // TLSConfig is the TLS configuration to connect with edge
30 tlsConfig *tls.Config
31 // cloudflaredConfig is the cloudflared configuration that is determined when the process first starts
32 cloudflaredConfig *CloudflaredConfig
33 // serviceDiscoverer returns the next edge addr to connect to
34 serviceDiscoverer EdgeServiceDiscoverer
35 // state is attributes of ConnectionManager that can change during runtime.
36 state *edgeManagerState
37
38 logger *logrus.Entry
39}
40
41// EdgeConnectionManagerConfigurable is the configurable attributes of a EdgeConnectionManager
42type EdgeManagerConfigurable struct {
43 TunnelHostnames []h2mux.TunnelHostname
44 *pogs.EdgeConnectionConfig
45}
46
47type CloudflaredConfig struct {
48 CloudflaredID uuid.UUID
49 Tags []pogs.Tag
50 BuildInfo *buildinfo.BuildInfo
51 IntentLabel string
52}
53
54func NewEdgeManager(
55 streamHandler h2mux.MuxedStreamHandler,
56 edgeConnMgrConfigurable *EdgeManagerConfigurable,
57 userCredential []byte,
58 tlsConfig *tls.Config,
59 serviceDiscoverer EdgeServiceDiscoverer,
60 cloudflaredConfig *CloudflaredConfig,
61 logger *logrus.Logger,
62) *EdgeManager {
63 return &EdgeManager{
64 streamHandler: streamHandler,
65 tlsConfig: tlsConfig,
66 cloudflaredConfig: cloudflaredConfig,
67 serviceDiscoverer: serviceDiscoverer,
68 state: newEdgeConnectionManagerState(edgeConnMgrConfigurable, userCredential),
69 logger: logger.WithField("subsystem", "connectionManager"),
70 }
71}
72
73func (em *EdgeManager) Run(ctx context.Context) error {
74 defer em.shutdown()
75
76 resolveEdgeIPTicker := time.Tick(resolveEdgeAddrTTL)
77 for {
78 select {
79 case <-ctx.Done():
80 return errors.Wrap(ctx.Err(), "EdgeConnectionManager terminated")
81 case <-resolveEdgeIPTicker:
82 if err := em.serviceDiscoverer.Refresh(); err != nil {
83 em.logger.WithError(err).Warn("Cannot refresh Cloudflare edge addresses")
84 }
85 default:
86 time.Sleep(1 * time.Second)
87 }
88 // Create/delete connection one at a time, so we don't need to adjust for connections that are being created/deleted
89 // in shouldCreateConnection or shouldReduceConnection calculation
90 if em.state.shouldCreateConnection(em.serviceDiscoverer.AvailableAddrs()) {
91 if connErr := em.newConnection(ctx); connErr != nil {
92 if !connErr.ShouldRetry {
93 em.logger.WithError(connErr).Error(em.noRetryMessage())
94 return connErr
95 }
96 em.logger.WithError(connErr).Error("cannot create new connection")
97 }
98 } else if em.state.shouldReduceConnection() {
99 if err := em.closeConnection(ctx); err != nil {
100 em.logger.WithError(err).Error("cannot close connection")
101 }
102 }
103 }
104}
105
106func (em *EdgeManager) UpdateConfigurable(newConfigurable *EdgeManagerConfigurable) {
107 em.logger.Infof("New edge connection manager configuration %+v", newConfigurable)
108 em.state.updateConfigurable(newConfigurable)
109}
110
111func (em *EdgeManager) newConnection(ctx context.Context) *pogs.ConnectError {
112 edgeIP := em.serviceDiscoverer.Addr()
113 edgeConn, err := em.dialEdge(ctx, edgeIP)
114 if err != nil {
115 return retryConnection(fmt.Sprintf("dial edge error: %v", err))
116 }
117 configurable := em.state.getConfigurable()
118 // Establish a muxed connection with the edge
119 // Client mux handshake with agent server
120 muxer, err := h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
121 Timeout: configurable.Timeout,
122 Handler: em.streamHandler,
123 IsClient: true,
124 HeartbeatInterval: configurable.HeartbeatInterval,
125 MaxHeartbeats: configurable.MaxFailedHeartbeats,
126 Logger: em.logger.WithField("subsystem", "muxer"),
127 })
128 if err != nil {
129 retryConnection(fmt.Sprintf("couldn't perform handshake with edge: %v", err))
130 }
131
132 h2muxConn, err := newConnection(muxer, edgeIP)
133 if err != nil {
134 return retryConnection(fmt.Sprintf("couldn't create h2mux connection: %v", err))
135 }
136
137 go em.serveConn(ctx, h2muxConn)
138
139 connResult, err := h2muxConn.Connect(ctx, &pogs.ConnectParameters{
140 CloudflaredID: em.cloudflaredConfig.CloudflaredID,
141 CloudflaredVersion: em.cloudflaredConfig.BuildInfo.CloudflaredVersion,
142 NumPreviousAttempts: 0,
143 OriginCert: em.state.getUserCredential(),
144 IntentLabel: em.cloudflaredConfig.IntentLabel,
145 Tags: em.cloudflaredConfig.Tags,
146 }, em.logger)
147 if err != nil {
148 h2muxConn.Shutdown()
149 return retryConnection(fmt.Sprintf("couldn't connect to edge: %v", err))
150 }
151
152 if connErr := connResult.ConnectError(); connErr != nil {
153 return connErr
154 }
155
156 em.state.newConnection(h2muxConn)
157 em.logger.Infof("connected to %s", connResult.ConnectedTo())
158 return nil
159}
160
161func (em *EdgeManager) closeConnection(ctx context.Context) error {
162 conn := em.state.getFirstConnection()
163 if conn == nil {
164 return fmt.Errorf("no connection to close")
165 }
166 conn.Shutdown()
167 return nil
168}
169
170func (em *EdgeManager) serveConn(ctx context.Context, conn *Connection) {
171 err := conn.Serve(ctx)
172 em.logger.WithError(err).Warn("Connection closed")
173 em.state.closeConnection(conn)
174}
175
176func (em *EdgeManager) dialEdge(ctx context.Context, edgeIP *net.TCPAddr) (*tls.Conn, error) {
177 timeout := em.state.getConfigurable().Timeout
178 // Inherit from parent context so we can cancel (Ctrl-C) while dialing
179 dialCtx, dialCancel := context.WithTimeout(ctx, timeout)
180 defer dialCancel()
181
182 dialer := net.Dialer{DualStack: true}
183 edgeConn, err := dialer.DialContext(dialCtx, "tcp", edgeIP.String())
184 if err != nil {
185 return nil, dialError{cause: errors.Wrap(err, "DialContext error")}
186 }
187 tlsEdgeConn := tls.Client(edgeConn, em.tlsConfig)
188 tlsEdgeConn.SetDeadline(time.Now().Add(timeout))
189
190 if err = tlsEdgeConn.Handshake(); err != nil {
191 return nil, dialError{cause: errors.Wrap(err, "Handshake with edge error")}
192 }
193 // clear the deadline on the conn; h2mux has its own timeouts
194 tlsEdgeConn.SetDeadline(time.Time{})
195 return tlsEdgeConn, nil
196}
197
198func (em *EdgeManager) noRetryMessage() string {
199 messageTemplate := "cloudflared could not register an Argo Tunnel on your account. Please confirm the following before trying again:" +
200 "1. You have Argo Smart Routing enabled in your account, See Enable Argo section of %s." +
201 "2. Your credential at %s is still valid. See %s."
202 return fmt.Sprintf(messageTemplate, quickStartLink, em.state.getConfigurable().UserCredentialPath, faqLink)
203}
204
205func (em *EdgeManager) shutdown() {
206 em.state.shutdown()
207}
208
209type edgeManagerState struct {
210 sync.RWMutex
211 configurable *EdgeManagerConfigurable
212 userCredential []byte
213 conns map[uuid.UUID]*Connection
214}
215
216func newEdgeConnectionManagerState(configurable *EdgeManagerConfigurable, userCredential []byte) *edgeManagerState {
217 return &edgeManagerState{
218 configurable: configurable,
219 userCredential: userCredential,
220 conns: make(map[uuid.UUID]*Connection),
221 }
222}
223
224func (ems *edgeManagerState) shouldCreateConnection(availableEdgeAddrs uint8) bool {
225 ems.RLock()
226 defer ems.RUnlock()
227 expectedHAConns := ems.configurable.NumHAConnections
228 if availableEdgeAddrs < expectedHAConns {
229 expectedHAConns = availableEdgeAddrs
230 }
231 return uint8(len(ems.conns)) < expectedHAConns
232}
233
234func (ems *edgeManagerState) shouldReduceConnection() bool {
235 ems.RLock()
236 defer ems.RUnlock()
237 return uint8(len(ems.conns)) > ems.configurable.NumHAConnections
238}
239
240func (ems *edgeManagerState) newConnection(conn *Connection) {
241 ems.Lock()
242 defer ems.Unlock()
243 ems.conns[conn.id] = conn
244}
245
246func (ems *edgeManagerState) closeConnection(conn *Connection) {
247 ems.Lock()
248 defer ems.Unlock()
249 delete(ems.conns, conn.id)
250}
251
252func (ems *edgeManagerState) getFirstConnection() *Connection {
253 ems.RLock()
254 defer ems.RUnlock()
255
256 for _, conn := range ems.conns {
257 return conn
258 }
259 return nil
260}
261
262func (ems *edgeManagerState) shutdown() {
263 ems.Lock()
264 defer ems.Unlock()
265 for _, conn := range ems.conns {
266 conn.Shutdown()
267 }
268}
269
270func (ems *edgeManagerState) getConfigurable() *EdgeManagerConfigurable {
271 ems.Lock()
272 defer ems.Unlock()
273 return ems.configurable
274}
275
276func (ems *edgeManagerState) updateConfigurable(newConfigurable *EdgeManagerConfigurable) {
277 ems.Lock()
278 defer ems.Unlock()
279 ems.configurable = newConfigurable
280}
281
282func (ems *edgeManagerState) getUserCredential() []byte {
283 ems.RLock()
284 defer ems.RUnlock()
285 return ems.userCredential
286}
287
288func retryConnection(cause string) *pogs.ConnectError {
289 return &pogs.ConnectError{
290 Cause: cause,
291 RetryAfter: defaultRetryAfter,
292 ShouldRetry: true,
293 }
294}
295