cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.11.2

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/manager.go

317lines · modecode

1package connection
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "net"
8 "sync"
9 "time"
10
11 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
12 "github.com/cloudflare/cloudflared/h2mux"
13 "github.com/cloudflare/cloudflared/streamhandler"
14 "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
15 "github.com/prometheus/client_golang/prometheus"
16
17 "github.com/google/uuid"
18 "github.com/pkg/errors"
19 "github.com/sirupsen/logrus"
20)
21
22const (
23 quickStartLink = "https://developers.cloudflare.com/argo-tunnel/quickstart/"
24 faqLink = "https://developers.cloudflare.com/argo-tunnel/faq/"
25 defaultRetryAfter = time.Second * 5
26 packageNamespace = "connection"
27 edgeManagerSubsystem = "edgemanager"
28)
29
30// EdgeManager manages connections with the edge
31type EdgeManager struct {
32 // streamHandler handles stream opened by the edge
33 streamHandler *streamhandler.StreamHandler
34 // TLSConfig is the TLS configuration to connect with edge
35 tlsConfig *tls.Config
36 // cloudflaredConfig is the cloudflared configuration that is determined when the process first starts
37 cloudflaredConfig *CloudflaredConfig
38 // serviceDiscoverer returns the next edge addr to connect to
39 serviceDiscoverer EdgeServiceDiscoverer
40 // state is attributes of ConnectionManager that can change during runtime.
41 state *edgeManagerState
42
43 logger *logrus.Entry
44
45 metrics *metrics
46}
47
48type metrics struct {
49 // activeStreams is a gauge shared by all muxers of this process to expose the total number of active streams
50 activeStreams prometheus.Gauge
51}
52
53func newMetrics(namespace, subsystem string) *metrics {
54 return &metrics{
55 activeStreams: h2mux.NewActiveStreamsMetrics(namespace, subsystem),
56 }
57}
58
59// EdgeManagerConfigurable is the configurable attributes of a EdgeConnectionManager
60type EdgeManagerConfigurable struct {
61 TunnelHostnames []h2mux.TunnelHostname
62 *pogs.EdgeConnectionConfig
63}
64
65type CloudflaredConfig struct {
66 CloudflaredID uuid.UUID
67 Tags []pogs.Tag
68 BuildInfo *buildinfo.BuildInfo
69 IntentLabel string
70}
71
72func NewEdgeManager(
73 streamHandler *streamhandler.StreamHandler,
74 edgeConnMgrConfigurable *EdgeManagerConfigurable,
75 userCredential []byte,
76 tlsConfig *tls.Config,
77 serviceDiscoverer EdgeServiceDiscoverer,
78 cloudflaredConfig *CloudflaredConfig,
79 logger *logrus.Logger,
80) *EdgeManager {
81 return &EdgeManager{
82 streamHandler: streamHandler,
83 tlsConfig: tlsConfig,
84 cloudflaredConfig: cloudflaredConfig,
85 serviceDiscoverer: serviceDiscoverer,
86 state: newEdgeConnectionManagerState(edgeConnMgrConfigurable, userCredential),
87 logger: logger.WithField("subsystem", "connectionManager"),
88 metrics: newMetrics(packageNamespace, edgeManagerSubsystem),
89 }
90}
91
92func (em *EdgeManager) Run(ctx context.Context) error {
93 defer em.shutdown()
94
95 resolveEdgeIPTicker := time.Tick(resolveEdgeAddrTTL)
96 for {
97 select {
98 case <-ctx.Done():
99 return errors.Wrap(ctx.Err(), "EdgeConnectionManager terminated")
100 case <-resolveEdgeIPTicker:
101 if err := em.serviceDiscoverer.Refresh(); err != nil {
102 em.logger.WithError(err).Warn("Cannot refresh Cloudflare edge addresses")
103 }
104 default:
105 time.Sleep(1 * time.Second)
106 }
107 // Create/delete connection one at a time, so we don't need to adjust for connections that are being created/deleted
108 // in shouldCreateConnection or shouldReduceConnection calculation
109 if em.state.shouldCreateConnection(em.serviceDiscoverer.AvailableAddrs()) {
110 if connErr := em.newConnection(ctx); connErr != nil {
111 if !connErr.ShouldRetry {
112 em.logger.WithError(connErr).Error(em.noRetryMessage())
113 return connErr
114 }
115 em.logger.WithError(connErr).Error("cannot create new connection")
116 }
117 } else if em.state.shouldReduceConnection() {
118 if err := em.closeConnection(ctx); err != nil {
119 em.logger.WithError(err).Error("cannot close connection")
120 }
121 }
122 }
123}
124
125func (em *EdgeManager) UpdateConfigurable(newConfigurable *EdgeManagerConfigurable) {
126 em.logger.Infof("New edge connection manager configuration %+v", newConfigurable)
127 em.state.updateConfigurable(newConfigurable)
128}
129
130func (em *EdgeManager) newConnection(ctx context.Context) *pogs.ConnectError {
131 edgeIP := em.serviceDiscoverer.Addr()
132 edgeConn, err := em.dialEdge(ctx, edgeIP)
133 if err != nil {
134 return retryConnection(fmt.Sprintf("dial edge error: %v", err))
135 }
136 configurable := em.state.getConfigurable()
137 // Establish a muxed connection with the edge
138 // Client mux handshake with agent server
139 muxer, err := h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
140 Timeout: configurable.Timeout,
141 Handler: em.streamHandler,
142 IsClient: true,
143 HeartbeatInterval: configurable.HeartbeatInterval,
144 MaxHeartbeats: configurable.MaxFailedHeartbeats,
145 Logger: em.logger.WithField("subsystem", "muxer"),
146 }, em.metrics.activeStreams)
147 if err != nil {
148 retryConnection(fmt.Sprintf("couldn't perform handshake with edge: %v", err))
149 }
150
151 h2muxConn, err := newConnection(muxer, edgeIP)
152 if err != nil {
153 return retryConnection(fmt.Sprintf("couldn't create h2mux connection: %v", err))
154 }
155
156 go em.serveConn(ctx, h2muxConn)
157
158 connResult, err := h2muxConn.Connect(ctx, &pogs.ConnectParameters{
159 CloudflaredID: em.cloudflaredConfig.CloudflaredID,
160 CloudflaredVersion: em.cloudflaredConfig.BuildInfo.CloudflaredVersion,
161 NumPreviousAttempts: 0,
162 OriginCert: em.state.getUserCredential(),
163 IntentLabel: em.cloudflaredConfig.IntentLabel,
164 Tags: em.cloudflaredConfig.Tags,
165 }, em.logger)
166 if err != nil {
167 h2muxConn.Shutdown()
168 return retryConnection(fmt.Sprintf("couldn't connect to edge: %v", err))
169 }
170
171 if connErr := connResult.ConnectError(); connErr != nil {
172 return connErr
173 }
174
175 em.state.newConnection(h2muxConn)
176 em.logger.Infof("connected to %s", connResult.ConnectedTo())
177
178 if connResult.ClientConfig() != nil {
179 em.streamHandler.UseConfiguration(ctx, connResult.ClientConfig())
180 }
181 return nil
182}
183
184func (em *EdgeManager) closeConnection(ctx context.Context) error {
185 conn := em.state.getFirstConnection()
186 if conn == nil {
187 return fmt.Errorf("no connection to close")
188 }
189 conn.Shutdown()
190 return nil
191}
192
193func (em *EdgeManager) serveConn(ctx context.Context, conn *Connection) {
194 err := conn.Serve(ctx)
195 em.logger.WithError(err).Warn("Connection closed")
196 em.state.closeConnection(conn)
197}
198
199func (em *EdgeManager) dialEdge(ctx context.Context, edgeIP *net.TCPAddr) (*tls.Conn, error) {
200 timeout := em.state.getConfigurable().Timeout
201 // Inherit from parent context so we can cancel (Ctrl-C) while dialing
202 dialCtx, dialCancel := context.WithTimeout(ctx, timeout)
203 defer dialCancel()
204
205 dialer := net.Dialer{DualStack: true}
206 edgeConn, err := dialer.DialContext(dialCtx, "tcp", edgeIP.String())
207 if err != nil {
208 return nil, dialError{cause: errors.Wrap(err, "DialContext error")}
209 }
210 tlsEdgeConn := tls.Client(edgeConn, em.tlsConfig)
211 tlsEdgeConn.SetDeadline(time.Now().Add(timeout))
212
213 if err = tlsEdgeConn.Handshake(); err != nil {
214 return nil, dialError{cause: errors.Wrap(err, "Handshake with edge error")}
215 }
216 // clear the deadline on the conn; h2mux has its own timeouts
217 tlsEdgeConn.SetDeadline(time.Time{})
218 return tlsEdgeConn, nil
219}
220
221func (em *EdgeManager) noRetryMessage() string {
222 messageTemplate := "cloudflared could not register an Argo Tunnel on your account. Please confirm the following before trying again:" +
223 "1. You have Argo Smart Routing enabled in your account, See Enable Argo section of %s." +
224 "2. Your credential at %s is still valid. See %s."
225 return fmt.Sprintf(messageTemplate, quickStartLink, em.state.getConfigurable().UserCredentialPath, faqLink)
226}
227
228func (em *EdgeManager) shutdown() {
229 em.state.shutdown()
230}
231
232type edgeManagerState struct {
233 sync.RWMutex
234 configurable *EdgeManagerConfigurable
235 userCredential []byte
236 conns map[uuid.UUID]*Connection
237}
238
239func newEdgeConnectionManagerState(configurable *EdgeManagerConfigurable, userCredential []byte) *edgeManagerState {
240 return &edgeManagerState{
241 configurable: configurable,
242 userCredential: userCredential,
243 conns: make(map[uuid.UUID]*Connection),
244 }
245}
246
247func (ems *edgeManagerState) shouldCreateConnection(availableEdgeAddrs uint8) bool {
248 ems.RLock()
249 defer ems.RUnlock()
250 expectedHAConns := ems.configurable.NumHAConnections
251 if availableEdgeAddrs < expectedHAConns {
252 expectedHAConns = availableEdgeAddrs
253 }
254 return uint8(len(ems.conns)) < expectedHAConns
255}
256
257func (ems *edgeManagerState) shouldReduceConnection() bool {
258 ems.RLock()
259 defer ems.RUnlock()
260 return uint8(len(ems.conns)) > ems.configurable.NumHAConnections
261}
262
263func (ems *edgeManagerState) newConnection(conn *Connection) {
264 ems.Lock()
265 defer ems.Unlock()
266 ems.conns[conn.id] = conn
267}
268
269func (ems *edgeManagerState) closeConnection(conn *Connection) {
270 ems.Lock()
271 defer ems.Unlock()
272 delete(ems.conns, conn.id)
273}
274
275func (ems *edgeManagerState) getFirstConnection() *Connection {
276 ems.RLock()
277 defer ems.RUnlock()
278
279 for _, conn := range ems.conns {
280 return conn
281 }
282 return nil
283}
284
285func (ems *edgeManagerState) shutdown() {
286 ems.Lock()
287 defer ems.Unlock()
288 for _, conn := range ems.conns {
289 conn.Shutdown()
290 }
291}
292
293func (ems *edgeManagerState) getConfigurable() *EdgeManagerConfigurable {
294 ems.Lock()
295 defer ems.Unlock()
296 return ems.configurable
297}
298
299func (ems *edgeManagerState) updateConfigurable(newConfigurable *EdgeManagerConfigurable) {
300 ems.Lock()
301 defer ems.Unlock()
302 ems.configurable = newConfigurable
303}
304
305func (ems *edgeManagerState) getUserCredential() []byte {
306 ems.RLock()
307 defer ems.RUnlock()
308 return ems.userCredential
309}
310
311func retryConnection(cause string) *pogs.ConnectError {
312 return &pogs.ConnectError{
313 Cause: cause,
314 RetryAfter: defaultRetryAfter,
315 ShouldRetry: true,
316 }
317}
318