cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2020.2.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/manager.go

299lines · modecode

1package connection
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/google/uuid"
11 "github.com/pkg/errors"
12 "github.com/prometheus/client_golang/prometheus"
13 "github.com/sirupsen/logrus"
14
15 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
16 "github.com/cloudflare/cloudflared/h2mux"
17 "github.com/cloudflare/cloudflared/streamhandler"
18 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
19)
20
21const (
22 quickStartLink = "https://developers.cloudflare.com/argo-tunnel/quickstart/"
23 faqLink = "https://developers.cloudflare.com/argo-tunnel/faq/"
24 defaultRetryAfter = time.Second * 5
25 packageNamespace = "connection"
26 edgeManagerSubsystem = "edgemanager"
27)
28
29// EdgeManager manages connections with the edge
30type EdgeManager struct {
31 // streamHandler handles stream opened by the edge
32 streamHandler *streamhandler.StreamHandler
33 // TLSConfig is the TLS configuration to connect with edge
34 tlsConfig *tls.Config
35 // cloudflaredConfig is the cloudflared configuration that is determined when the process first starts
36 cloudflaredConfig *CloudflaredConfig
37 // serviceDiscoverer returns the next edge addr to connect to
38 serviceDiscoverer EdgeServiceDiscoverer
39 // state is attributes of ConnectionManager that can change during runtime.
40 state *edgeManagerState
41
42 logger *logrus.Entry
43
44 metrics *metrics
45}
46
47type metrics struct {
48 // activeStreams is a gauge shared by all muxers of this process to expose the total number of active streams
49 activeStreams prometheus.Gauge
50}
51
52func newMetrics(namespace, subsystem string) *metrics {
53 return &metrics{
54 activeStreams: h2mux.NewActiveStreamsMetrics(namespace, subsystem),
55 }
56}
57
58// EdgeManagerConfigurable is the configurable attributes of a EdgeConnectionManager
59type EdgeManagerConfigurable struct {
60 TunnelHostnames []h2mux.TunnelHostname
61 *tunnelpogs.EdgeConnectionConfig
62}
63
64type CloudflaredConfig struct {
65 CloudflaredID uuid.UUID
66 Tags []tunnelpogs.Tag
67 BuildInfo *buildinfo.BuildInfo
68 IntentLabel string
69}
70
71func NewEdgeManager(
72 streamHandler *streamhandler.StreamHandler,
73 edgeConnMgrConfigurable *EdgeManagerConfigurable,
74 userCredential []byte,
75 tlsConfig *tls.Config,
76 serviceDiscoverer EdgeServiceDiscoverer,
77 cloudflaredConfig *CloudflaredConfig,
78 logger *logrus.Logger,
79) *EdgeManager {
80 return &EdgeManager{
81 streamHandler: streamHandler,
82 tlsConfig: tlsConfig,
83 cloudflaredConfig: cloudflaredConfig,
84 serviceDiscoverer: serviceDiscoverer,
85 state: newEdgeConnectionManagerState(edgeConnMgrConfigurable, userCredential),
86 logger: logger.WithField("subsystem", "connectionManager"),
87 metrics: newMetrics(packageNamespace, edgeManagerSubsystem),
88 }
89}
90
91func (em *EdgeManager) Run(ctx context.Context) error {
92 defer em.shutdown()
93
94 resolveEdgeIPTicker := time.Tick(resolveEdgeAddrTTL)
95 for {
96 select {
97 case <-ctx.Done():
98 return errors.Wrap(ctx.Err(), "EdgeConnectionManager terminated")
99 case <-resolveEdgeIPTicker:
100 if err := em.serviceDiscoverer.Refresh(); err != nil {
101 em.logger.WithError(err).Warn("Cannot refresh Cloudflare edge addresses")
102 }
103 default:
104 time.Sleep(1 * time.Second)
105 }
106 // Create/delete connection one at a time, so we don't need to adjust for connections that are being created/deleted
107 // in shouldCreateConnection or shouldReduceConnection calculation
108 if em.state.shouldCreateConnection(em.serviceDiscoverer.AvailableAddrs()) {
109 if connErr := em.newConnection(ctx); connErr != nil {
110 if !connErr.ShouldRetry {
111 em.logger.WithError(connErr).Error(em.noRetryMessage())
112 return connErr
113 }
114 em.logger.WithError(connErr).Error("cannot create new connection")
115 }
116 } else if em.state.shouldReduceConnection() {
117 if err := em.closeConnection(ctx); err != nil {
118 em.logger.WithError(err).Error("cannot close connection")
119 }
120 }
121 }
122}
123
124func (em *EdgeManager) UpdateConfigurable(newConfigurable *EdgeManagerConfigurable) {
125 em.logger.Infof("New edge connection manager configuration %+v", newConfigurable)
126 em.state.updateConfigurable(newConfigurable)
127}
128
129func (em *EdgeManager) newConnection(ctx context.Context) *tunnelpogs.ConnectError {
130 edgeTCPAddr, err := em.serviceDiscoverer.Addr()
131 if err != nil {
132 return retryConnection(fmt.Sprintf("edge address discovery error: %v", err))
133 }
134 configurable := em.state.getConfigurable()
135 edgeConn, err := DialEdge(ctx, configurable.Timeout, em.tlsConfig, edgeTCPAddr)
136 if err != nil {
137 return retryConnection(fmt.Sprintf("dial edge error: %v", err))
138 }
139 // Establish a muxed connection with the edge
140 // Client mux handshake with agent server
141 muxer, err := h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
142 Timeout: configurable.Timeout,
143 Handler: em.streamHandler,
144 IsClient: true,
145 HeartbeatInterval: configurable.HeartbeatInterval,
146 MaxHeartbeats: configurable.MaxFailedHeartbeats,
147 Logger: em.logger.WithField("subsystem", "muxer"),
148 }, em.metrics.activeStreams)
149 if err != nil {
150 retryConnection(fmt.Sprintf("couldn't perform handshake with edge: %v", err))
151 }
152
153 h2muxConn, err := newConnection(muxer, edgeTCPAddr)
154 if err != nil {
155 return retryConnection(fmt.Sprintf("couldn't create h2mux connection: %v", err))
156 }
157
158 go em.serveConn(ctx, h2muxConn)
159
160 connResult, err := h2muxConn.Connect(ctx, &tunnelpogs.ConnectParameters{
161 CloudflaredID: em.cloudflaredConfig.CloudflaredID,
162 CloudflaredVersion: em.cloudflaredConfig.BuildInfo.CloudflaredVersion,
163 NumPreviousAttempts: 0,
164 OriginCert: em.state.getUserCredential(),
165 IntentLabel: em.cloudflaredConfig.IntentLabel,
166 Tags: em.cloudflaredConfig.Tags,
167 }, em.logger)
168 if err != nil {
169 h2muxConn.Shutdown()
170 return retryConnection(fmt.Sprintf("couldn't connect to edge: %v", err))
171 }
172
173 if connErr := connResult.ConnectError(); connErr != nil {
174 return connErr
175 }
176
177 em.state.newConnection(h2muxConn)
178 em.logger.Infof("connected to %s", connResult.ConnectedTo())
179
180 if connResult.ClientConfig() != nil {
181 em.streamHandler.UseConfiguration(ctx, connResult.ClientConfig())
182 }
183 return nil
184}
185
186func (em *EdgeManager) closeConnection(ctx context.Context) error {
187 conn := em.state.getFirstConnection()
188 if conn == nil {
189 return fmt.Errorf("no connection to close")
190 }
191 conn.Shutdown()
192 // teardown will be handled by EdgeManager.serveConn in another goroutine
193 return nil
194}
195
196func (em *EdgeManager) serveConn(ctx context.Context, conn *Connection) {
197 err := conn.Serve(ctx)
198 em.logger.WithError(err).Warn("Connection closed")
199 em.state.closeConnection(conn)
200 em.serviceDiscoverer.ReplaceAddr(conn.addr)
201}
202
203func (em *EdgeManager) noRetryMessage() string {
204 messageTemplate := "cloudflared could not register an Argo Tunnel on your account. Please confirm the following before trying again:" +
205 "1. You have Argo Smart Routing enabled in your account, See Enable Argo section of %s." +
206 "2. Your credential at %s is still valid. See %s."
207 return fmt.Sprintf(messageTemplate, quickStartLink, em.state.getConfigurable().UserCredentialPath, faqLink)
208}
209
210func (em *EdgeManager) shutdown() {
211 em.state.shutdown()
212}
213
214type edgeManagerState struct {
215 sync.RWMutex
216 configurable *EdgeManagerConfigurable
217 userCredential []byte
218 conns map[uuid.UUID]*Connection
219}
220
221func newEdgeConnectionManagerState(configurable *EdgeManagerConfigurable, userCredential []byte) *edgeManagerState {
222 return &edgeManagerState{
223 configurable: configurable,
224 userCredential: userCredential,
225 conns: make(map[uuid.UUID]*Connection),
226 }
227}
228
229func (ems *edgeManagerState) shouldCreateConnection(availableEdgeAddrs int) bool {
230 ems.RLock()
231 defer ems.RUnlock()
232 expectedHAConns := int(ems.configurable.NumHAConnections)
233 if availableEdgeAddrs < expectedHAConns {
234 expectedHAConns = availableEdgeAddrs
235 }
236 return len(ems.conns) < expectedHAConns
237}
238
239func (ems *edgeManagerState) shouldReduceConnection() bool {
240 ems.RLock()
241 defer ems.RUnlock()
242 return uint8(len(ems.conns)) > ems.configurable.NumHAConnections
243}
244
245func (ems *edgeManagerState) newConnection(conn *Connection) {
246 ems.Lock()
247 defer ems.Unlock()
248 ems.conns[conn.id] = conn
249}
250
251func (ems *edgeManagerState) closeConnection(conn *Connection) {
252 ems.Lock()
253 defer ems.Unlock()
254 delete(ems.conns, conn.id)
255}
256
257func (ems *edgeManagerState) getFirstConnection() *Connection {
258 ems.RLock()
259 defer ems.RUnlock()
260
261 for _, conn := range ems.conns {
262 return conn
263 }
264 return nil
265}
266
267func (ems *edgeManagerState) shutdown() {
268 ems.Lock()
269 defer ems.Unlock()
270 for _, conn := range ems.conns {
271 conn.Shutdown()
272 }
273}
274
275func (ems *edgeManagerState) getConfigurable() *EdgeManagerConfigurable {
276 ems.Lock()
277 defer ems.Unlock()
278 return ems.configurable
279}
280
281func (ems *edgeManagerState) updateConfigurable(newConfigurable *EdgeManagerConfigurable) {
282 ems.Lock()
283 defer ems.Unlock()
284 ems.configurable = newConfigurable
285}
286
287func (ems *edgeManagerState) getUserCredential() []byte {
288 ems.RLock()
289 defer ems.RUnlock()
290 return ems.userCredential
291}
292
293func retryConnection(cause string) *tunnelpogs.ConnectError {
294 return &tunnelpogs.ConnectError{
295 Cause: cause,
296 RetryAfter: defaultRetryAfter,
297 ShouldRetry: true,
298 }
299}
300