cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.11.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/manager.go

298lines · modecode

1package connection
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "net"
8 "sync"
9 "time"
10
11 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
12 "github.com/cloudflare/cloudflared/h2mux"
13 "github.com/cloudflare/cloudflared/streamhandler"
14 "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
15
16 "github.com/google/uuid"
17 "github.com/pkg/errors"
18 "github.com/sirupsen/logrus"
19)
20
21const (
22 quickStartLink = "https://developers.cloudflare.com/argo-tunnel/quickstart/"
23 faqLink = "https://developers.cloudflare.com/argo-tunnel/faq/"
24 defaultRetryAfter = time.Second * 5
25)
26
27// EdgeManager manages connections with the edge
28type EdgeManager struct {
29 // streamHandler handles stream opened by the edge
30 streamHandler *streamhandler.StreamHandler
31 // TLSConfig is the TLS configuration to connect with edge
32 tlsConfig *tls.Config
33 // cloudflaredConfig is the cloudflared configuration that is determined when the process first starts
34 cloudflaredConfig *CloudflaredConfig
35 // serviceDiscoverer returns the next edge addr to connect to
36 serviceDiscoverer EdgeServiceDiscoverer
37 // state is attributes of ConnectionManager that can change during runtime.
38 state *edgeManagerState
39
40 logger *logrus.Entry
41}
42
43// EdgeConnectionManagerConfigurable is the configurable attributes of a EdgeConnectionManager
44type EdgeManagerConfigurable struct {
45 TunnelHostnames []h2mux.TunnelHostname
46 *pogs.EdgeConnectionConfig
47}
48
49type CloudflaredConfig struct {
50 CloudflaredID uuid.UUID
51 Tags []pogs.Tag
52 BuildInfo *buildinfo.BuildInfo
53 IntentLabel string
54}
55
56func NewEdgeManager(
57 streamHandler *streamhandler.StreamHandler,
58 edgeConnMgrConfigurable *EdgeManagerConfigurable,
59 userCredential []byte,
60 tlsConfig *tls.Config,
61 serviceDiscoverer EdgeServiceDiscoverer,
62 cloudflaredConfig *CloudflaredConfig,
63 logger *logrus.Logger,
64) *EdgeManager {
65 return &EdgeManager{
66 streamHandler: streamHandler,
67 tlsConfig: tlsConfig,
68 cloudflaredConfig: cloudflaredConfig,
69 serviceDiscoverer: serviceDiscoverer,
70 state: newEdgeConnectionManagerState(edgeConnMgrConfigurable, userCredential),
71 logger: logger.WithField("subsystem", "connectionManager"),
72 }
73}
74
75func (em *EdgeManager) Run(ctx context.Context) error {
76 defer em.shutdown()
77
78 resolveEdgeIPTicker := time.Tick(resolveEdgeAddrTTL)
79 for {
80 select {
81 case <-ctx.Done():
82 return errors.Wrap(ctx.Err(), "EdgeConnectionManager terminated")
83 case <-resolveEdgeIPTicker:
84 if err := em.serviceDiscoverer.Refresh(); err != nil {
85 em.logger.WithError(err).Warn("Cannot refresh Cloudflare edge addresses")
86 }
87 default:
88 time.Sleep(1 * time.Second)
89 }
90 // Create/delete connection one at a time, so we don't need to adjust for connections that are being created/deleted
91 // in shouldCreateConnection or shouldReduceConnection calculation
92 if em.state.shouldCreateConnection(em.serviceDiscoverer.AvailableAddrs()) {
93 if connErr := em.newConnection(ctx); connErr != nil {
94 if !connErr.ShouldRetry {
95 em.logger.WithError(connErr).Error(em.noRetryMessage())
96 return connErr
97 }
98 em.logger.WithError(connErr).Error("cannot create new connection")
99 }
100 } else if em.state.shouldReduceConnection() {
101 if err := em.closeConnection(ctx); err != nil {
102 em.logger.WithError(err).Error("cannot close connection")
103 }
104 }
105 }
106}
107
108func (em *EdgeManager) UpdateConfigurable(newConfigurable *EdgeManagerConfigurable) {
109 em.logger.Infof("New edge connection manager configuration %+v", newConfigurable)
110 em.state.updateConfigurable(newConfigurable)
111}
112
113func (em *EdgeManager) newConnection(ctx context.Context) *pogs.ConnectError {
114 edgeIP := em.serviceDiscoverer.Addr()
115 edgeConn, err := em.dialEdge(ctx, edgeIP)
116 if err != nil {
117 return retryConnection(fmt.Sprintf("dial edge error: %v", err))
118 }
119 configurable := em.state.getConfigurable()
120 // Establish a muxed connection with the edge
121 // Client mux handshake with agent server
122 muxer, err := h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
123 Timeout: configurable.Timeout,
124 Handler: em.streamHandler,
125 IsClient: true,
126 HeartbeatInterval: configurable.HeartbeatInterval,
127 MaxHeartbeats: configurable.MaxFailedHeartbeats,
128 Logger: em.logger.WithField("subsystem", "muxer"),
129 })
130 if err != nil {
131 retryConnection(fmt.Sprintf("couldn't perform handshake with edge: %v", err))
132 }
133
134 h2muxConn, err := newConnection(muxer, edgeIP)
135 if err != nil {
136 return retryConnection(fmt.Sprintf("couldn't create h2mux connection: %v", err))
137 }
138
139 go em.serveConn(ctx, h2muxConn)
140
141 connResult, err := h2muxConn.Connect(ctx, &pogs.ConnectParameters{
142 CloudflaredID: em.cloudflaredConfig.CloudflaredID,
143 CloudflaredVersion: em.cloudflaredConfig.BuildInfo.CloudflaredVersion,
144 NumPreviousAttempts: 0,
145 OriginCert: em.state.getUserCredential(),
146 IntentLabel: em.cloudflaredConfig.IntentLabel,
147 Tags: em.cloudflaredConfig.Tags,
148 }, em.logger)
149 if err != nil {
150 h2muxConn.Shutdown()
151 return retryConnection(fmt.Sprintf("couldn't connect to edge: %v", err))
152 }
153
154 if connErr := connResult.ConnectError(); connErr != nil {
155 return connErr
156 }
157
158 em.state.newConnection(h2muxConn)
159 em.logger.Infof("connected to %s", connResult.ConnectedTo())
160
161 em.streamHandler.UseConfiguration(ctx, connResult.ClientConfig())
162 return nil
163}
164
165func (em *EdgeManager) closeConnection(ctx context.Context) error {
166 conn := em.state.getFirstConnection()
167 if conn == nil {
168 return fmt.Errorf("no connection to close")
169 }
170 conn.Shutdown()
171 return nil
172}
173
174func (em *EdgeManager) serveConn(ctx context.Context, conn *Connection) {
175 err := conn.Serve(ctx)
176 em.logger.WithError(err).Warn("Connection closed")
177 em.state.closeConnection(conn)
178}
179
180func (em *EdgeManager) dialEdge(ctx context.Context, edgeIP *net.TCPAddr) (*tls.Conn, error) {
181 timeout := em.state.getConfigurable().Timeout
182 // Inherit from parent context so we can cancel (Ctrl-C) while dialing
183 dialCtx, dialCancel := context.WithTimeout(ctx, timeout)
184 defer dialCancel()
185
186 dialer := net.Dialer{DualStack: true}
187 edgeConn, err := dialer.DialContext(dialCtx, "tcp", edgeIP.String())
188 if err != nil {
189 return nil, dialError{cause: errors.Wrap(err, "DialContext error")}
190 }
191 tlsEdgeConn := tls.Client(edgeConn, em.tlsConfig)
192 tlsEdgeConn.SetDeadline(time.Now().Add(timeout))
193
194 if err = tlsEdgeConn.Handshake(); err != nil {
195 return nil, dialError{cause: errors.Wrap(err, "Handshake with edge error")}
196 }
197 // clear the deadline on the conn; h2mux has its own timeouts
198 tlsEdgeConn.SetDeadline(time.Time{})
199 return tlsEdgeConn, nil
200}
201
202func (em *EdgeManager) noRetryMessage() string {
203 messageTemplate := "cloudflared could not register an Argo Tunnel on your account. Please confirm the following before trying again:" +
204 "1. You have Argo Smart Routing enabled in your account, See Enable Argo section of %s." +
205 "2. Your credential at %s is still valid. See %s."
206 return fmt.Sprintf(messageTemplate, quickStartLink, em.state.getConfigurable().UserCredentialPath, faqLink)
207}
208
209func (em *EdgeManager) shutdown() {
210 em.state.shutdown()
211}
212
213type edgeManagerState struct {
214 sync.RWMutex
215 configurable *EdgeManagerConfigurable
216 userCredential []byte
217 conns map[uuid.UUID]*Connection
218}
219
220func newEdgeConnectionManagerState(configurable *EdgeManagerConfigurable, userCredential []byte) *edgeManagerState {
221 return &edgeManagerState{
222 configurable: configurable,
223 userCredential: userCredential,
224 conns: make(map[uuid.UUID]*Connection),
225 }
226}
227
228func (ems *edgeManagerState) shouldCreateConnection(availableEdgeAddrs uint8) bool {
229 ems.RLock()
230 defer ems.RUnlock()
231 expectedHAConns := ems.configurable.NumHAConnections
232 if availableEdgeAddrs < expectedHAConns {
233 expectedHAConns = availableEdgeAddrs
234 }
235 return uint8(len(ems.conns)) < expectedHAConns
236}
237
238func (ems *edgeManagerState) shouldReduceConnection() bool {
239 ems.RLock()
240 defer ems.RUnlock()
241 return uint8(len(ems.conns)) > ems.configurable.NumHAConnections
242}
243
244func (ems *edgeManagerState) newConnection(conn *Connection) {
245 ems.Lock()
246 defer ems.Unlock()
247 ems.conns[conn.id] = conn
248}
249
250func (ems *edgeManagerState) closeConnection(conn *Connection) {
251 ems.Lock()
252 defer ems.Unlock()
253 delete(ems.conns, conn.id)
254}
255
256func (ems *edgeManagerState) getFirstConnection() *Connection {
257 ems.RLock()
258 defer ems.RUnlock()
259
260 for _, conn := range ems.conns {
261 return conn
262 }
263 return nil
264}
265
266func (ems *edgeManagerState) shutdown() {
267 ems.Lock()
268 defer ems.Unlock()
269 for _, conn := range ems.conns {
270 conn.Shutdown()
271 }
272}
273
274func (ems *edgeManagerState) getConfigurable() *EdgeManagerConfigurable {
275 ems.Lock()
276 defer ems.Unlock()
277 return ems.configurable
278}
279
280func (ems *edgeManagerState) updateConfigurable(newConfigurable *EdgeManagerConfigurable) {
281 ems.Lock()
282 defer ems.Unlock()
283 ems.configurable = newConfigurable
284}
285
286func (ems *edgeManagerState) getUserCredential() []byte {
287 ems.RLock()
288 defer ems.RUnlock()
289 return ems.userCredential
290}
291
292func retryConnection(cause string) *pogs.ConnectError {
293 return &pogs.ConnectError{
294 Cause: cause,
295 RetryAfter: defaultRetryAfter,
296 ShouldRetry: true,
297 }
298}
299