cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2020.2.1

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/manager.go

302lines · modecode

1package connection
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/google/uuid"
11 "github.com/pkg/errors"
12 "github.com/prometheus/client_golang/prometheus"
13 "github.com/sirupsen/logrus"
14
15 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
16 "github.com/cloudflare/cloudflared/edgediscovery"
17 "github.com/cloudflare/cloudflared/h2mux"
18 "github.com/cloudflare/cloudflared/streamhandler"
19 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
20)
21
22const (
23 quickStartLink = "https://developers.cloudflare.com/argo-tunnel/quickstart/"
24 faqLink = "https://developers.cloudflare.com/argo-tunnel/faq/"
25 defaultRetryAfter = time.Second * 5
26 packageNamespace = "connection"
27 edgeManagerSubsystem = "edgemanager"
28)
29
30// EdgeManager manages connections with the edge
31type EdgeManager struct {
32 // streamHandler handles stream opened by the edge
33 streamHandler *streamhandler.StreamHandler
34 // TLSConfig is the TLS configuration to connect with edge
35 tlsConfig *tls.Config
36 // cloudflaredConfig is the cloudflared configuration that is determined when the process first starts
37 cloudflaredConfig *CloudflaredConfig
38 // serviceDiscoverer returns the next edge addr to connect to
39 serviceDiscoverer *edgediscovery.Edge
40 // state is attributes of ConnectionManager that can change during runtime.
41 state *edgeManagerState
42
43 logger *logrus.Entry
44
45 metrics *metrics
46}
47
48type metrics struct {
49 // activeStreams is a gauge shared by all muxers of this process to expose the total number of active streams
50 activeStreams prometheus.Gauge
51}
52
53func newMetrics(namespace, subsystem string) *metrics {
54 return &metrics{
55 activeStreams: h2mux.NewActiveStreamsMetrics(namespace, subsystem),
56 }
57}
58
59// EdgeManagerConfigurable is the configurable attributes of a EdgeConnectionManager
60type EdgeManagerConfigurable struct {
61 TunnelHostnames []h2mux.TunnelHostname
62 *tunnelpogs.EdgeConnectionConfig
63}
64
65type CloudflaredConfig struct {
66 CloudflaredID uuid.UUID
67 Tags []tunnelpogs.Tag
68 BuildInfo *buildinfo.BuildInfo
69 IntentLabel string
70}
71
72func NewEdgeManager(
73 streamHandler *streamhandler.StreamHandler,
74 edgeConnMgrConfigurable *EdgeManagerConfigurable,
75 userCredential []byte,
76 tlsConfig *tls.Config,
77 serviceDiscoverer *edgediscovery.Edge,
78 cloudflaredConfig *CloudflaredConfig,
79 logger *logrus.Logger,
80) *EdgeManager {
81 return &EdgeManager{
82 streamHandler: streamHandler,
83 tlsConfig: tlsConfig,
84 cloudflaredConfig: cloudflaredConfig,
85 serviceDiscoverer: serviceDiscoverer,
86 state: newEdgeConnectionManagerState(edgeConnMgrConfigurable, userCredential),
87 logger: logger.WithField("subsystem", "connectionManager"),
88 metrics: newMetrics(packageNamespace, edgeManagerSubsystem),
89 }
90}
91
92func (em *EdgeManager) Run(ctx context.Context) error {
93 defer em.shutdown()
94
95 // Currently, declarative tunnels don't have any concept of a stable connection
96 // Each edge connection is transient and when it dies, it is replaced by a different one,
97 // not restarted.
98 // So in the future we should really change this so that n connections are stored individually
99 connIndex := 0
100 for {
101 select {
102 case <-ctx.Done():
103 return errors.Wrap(ctx.Err(), "EdgeConnectionManager terminated")
104 default:
105 time.Sleep(1 * time.Second)
106 }
107 // Create/delete connection one at a time, so we don't need to adjust for connections that are being created/deleted
108 // in shouldCreateConnection or shouldReduceConnection calculation
109 if em.state.shouldCreateConnection(em.serviceDiscoverer.AvailableAddrs()) {
110 if connErr := em.newConnection(ctx, connIndex); connErr != nil {
111 if !connErr.ShouldRetry {
112 em.logger.WithError(connErr).Error(em.noRetryMessage())
113 return connErr
114 }
115 em.logger.WithError(connErr).Error("cannot create new connection")
116 } else {
117 connIndex++
118 }
119 } else if em.state.shouldReduceConnection() {
120 if err := em.closeConnection(ctx); err != nil {
121 em.logger.WithError(err).Error("cannot close connection")
122 }
123 }
124 }
125}
126
127func (em *EdgeManager) UpdateConfigurable(newConfigurable *EdgeManagerConfigurable) {
128 em.logger.Infof("New edge connection manager configuration %+v", newConfigurable)
129 em.state.updateConfigurable(newConfigurable)
130}
131
132func (em *EdgeManager) newConnection(ctx context.Context, index int) *tunnelpogs.ConnectError {
133 edgeTCPAddr, err := em.serviceDiscoverer.GetAddr(index)
134 if err != nil {
135 return retryConnection(fmt.Sprintf("edge address discovery error: %v", err))
136 }
137 configurable := em.state.getConfigurable()
138 edgeConn, err := DialEdge(ctx, configurable.Timeout, em.tlsConfig, edgeTCPAddr)
139 if err != nil {
140 return retryConnection(fmt.Sprintf("dial edge error: %v", err))
141 }
142 // Establish a muxed connection with the edge
143 // Client mux handshake with agent server
144 muxer, err := h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
145 Timeout: configurable.Timeout,
146 Handler: em.streamHandler,
147 IsClient: true,
148 HeartbeatInterval: configurable.HeartbeatInterval,
149 MaxHeartbeats: configurable.MaxFailedHeartbeats,
150 Logger: em.logger.WithField("subsystem", "muxer"),
151 }, em.metrics.activeStreams)
152 if err != nil {
153 retryConnection(fmt.Sprintf("couldn't perform handshake with edge: %v", err))
154 }
155
156 h2muxConn, err := newConnection(muxer, edgeTCPAddr)
157 if err != nil {
158 return retryConnection(fmt.Sprintf("couldn't create h2mux connection: %v", err))
159 }
160
161 go em.serveConn(ctx, h2muxConn)
162
163 connResult, err := h2muxConn.Connect(ctx, &tunnelpogs.ConnectParameters{
164 CloudflaredID: em.cloudflaredConfig.CloudflaredID,
165 CloudflaredVersion: em.cloudflaredConfig.BuildInfo.CloudflaredVersion,
166 NumPreviousAttempts: 0,
167 OriginCert: em.state.getUserCredential(),
168 IntentLabel: em.cloudflaredConfig.IntentLabel,
169 Tags: em.cloudflaredConfig.Tags,
170 }, em.logger)
171 if err != nil {
172 h2muxConn.Shutdown()
173 return retryConnection(fmt.Sprintf("couldn't connect to edge: %v", err))
174 }
175
176 if connErr := connResult.ConnectError(); connErr != nil {
177 return connErr
178 }
179
180 em.state.newConnection(h2muxConn)
181 em.logger.Infof("connected to %s", connResult.ConnectedTo())
182
183 if connResult.ClientConfig() != nil {
184 em.streamHandler.UseConfiguration(ctx, connResult.ClientConfig())
185 }
186 return nil
187}
188
189func (em *EdgeManager) closeConnection(ctx context.Context) error {
190 conn := em.state.getFirstConnection()
191 if conn == nil {
192 return fmt.Errorf("no connection to close")
193 }
194 conn.Shutdown()
195 // teardown will be handled by EdgeManager.serveConn in another goroutine
196 return nil
197}
198
199func (em *EdgeManager) serveConn(ctx context.Context, conn *Connection) {
200 err := conn.Serve(ctx)
201 em.logger.WithError(err).Warn("Connection closed")
202 em.state.closeConnection(conn)
203 em.serviceDiscoverer.GiveBack(conn.addr)
204}
205
206func (em *EdgeManager) noRetryMessage() string {
207 messageTemplate := "cloudflared could not register an Argo Tunnel on your account. Please confirm the following before trying again:" +
208 "1. You have Argo Smart Routing enabled in your account, See Enable Argo section of %s." +
209 "2. Your credential at %s is still valid. See %s."
210 return fmt.Sprintf(messageTemplate, quickStartLink, em.state.getConfigurable().UserCredentialPath, faqLink)
211}
212
213func (em *EdgeManager) shutdown() {
214 em.state.shutdown()
215}
216
217type edgeManagerState struct {
218 sync.RWMutex
219 configurable *EdgeManagerConfigurable
220 userCredential []byte
221 conns map[uuid.UUID]*Connection
222}
223
224func newEdgeConnectionManagerState(configurable *EdgeManagerConfigurable, userCredential []byte) *edgeManagerState {
225 return &edgeManagerState{
226 configurable: configurable,
227 userCredential: userCredential,
228 conns: make(map[uuid.UUID]*Connection),
229 }
230}
231
232func (ems *edgeManagerState) shouldCreateConnection(availableEdgeAddrs int) bool {
233 ems.RLock()
234 defer ems.RUnlock()
235 expectedHAConns := int(ems.configurable.NumHAConnections)
236 if availableEdgeAddrs < expectedHAConns {
237 expectedHAConns = availableEdgeAddrs
238 }
239 return len(ems.conns) < expectedHAConns
240}
241
242func (ems *edgeManagerState) shouldReduceConnection() bool {
243 ems.RLock()
244 defer ems.RUnlock()
245 return uint8(len(ems.conns)) > ems.configurable.NumHAConnections
246}
247
248func (ems *edgeManagerState) newConnection(conn *Connection) {
249 ems.Lock()
250 defer ems.Unlock()
251 ems.conns[conn.id] = conn
252}
253
254func (ems *edgeManagerState) closeConnection(conn *Connection) {
255 ems.Lock()
256 defer ems.Unlock()
257 delete(ems.conns, conn.id)
258}
259
260func (ems *edgeManagerState) getFirstConnection() *Connection {
261 ems.RLock()
262 defer ems.RUnlock()
263
264 for _, conn := range ems.conns {
265 return conn
266 }
267 return nil
268}
269
270func (ems *edgeManagerState) shutdown() {
271 ems.Lock()
272 defer ems.Unlock()
273 for _, conn := range ems.conns {
274 conn.Shutdown()
275 }
276}
277
278func (ems *edgeManagerState) getConfigurable() *EdgeManagerConfigurable {
279 ems.Lock()
280 defer ems.Unlock()
281 return ems.configurable
282}
283
284func (ems *edgeManagerState) updateConfigurable(newConfigurable *EdgeManagerConfigurable) {
285 ems.Lock()
286 defer ems.Unlock()
287 ems.configurable = newConfigurable
288}
289
290func (ems *edgeManagerState) getUserCredential() []byte {
291 ems.RLock()
292 defer ems.RUnlock()
293 return ems.userCredential
294}
295
296func retryConnection(cause string) *tunnelpogs.ConnectError {
297 return &tunnelpogs.ConnectError{
298 Cause: cause,
299 RetryAfter: defaultRetryAfter,
300 ShouldRetry: true,
301 }
302}
303