cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.8.3

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/manager.go

284lines · modecode

1package connection
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "net"
8 "sync"
9 "time"
10
11 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
12 "github.com/cloudflare/cloudflared/h2mux"
13 "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
14 "github.com/google/uuid"
15 "github.com/pkg/errors"
16 "github.com/sirupsen/logrus"
17)
18
19const (
20 quickStartLink = "https://developers.cloudflare.com/argo-tunnel/quickstart/"
21 faqLink = "https://developers.cloudflare.com/argo-tunnel/faq/"
22)
23
24// EdgeManager manages connections with the edge
25type EdgeManager struct {
26 // streamHandler handles stream opened by the edge
27 streamHandler h2mux.MuxedStreamHandler
28 // TLSConfig is the TLS configuration to connect with edge
29 tlsConfig *tls.Config
30 // cloudflaredConfig is the cloudflared configuration that is determined when the process first starts
31 cloudflaredConfig *CloudflaredConfig
32 // serviceDiscoverer returns the next edge addr to connect to
33 serviceDiscoverer EdgeServiceDiscoverer
34 // state is attributes of ConnectionManager that can change during runtime.
35 state *edgeManagerState
36
37 logger *logrus.Entry
38}
39
40// EdgeConnectionManagerConfigurable is the configurable attributes of a EdgeConnectionManager
41type EdgeManagerConfigurable struct {
42 TunnelHostnames []h2mux.TunnelHostname
43 *pogs.EdgeConnectionConfig
44}
45
46type CloudflaredConfig struct {
47 CloudflaredID uuid.UUID
48 Tags []pogs.Tag
49 BuildInfo *buildinfo.BuildInfo
50 Scope pogs.Scope
51}
52
53func NewEdgeManager(
54 streamHandler h2mux.MuxedStreamHandler,
55 edgeConnMgrConfigurable *EdgeManagerConfigurable,
56 userCredential []byte,
57 tlsConfig *tls.Config,
58 serviceDiscoverer EdgeServiceDiscoverer,
59 cloudflaredConfig *CloudflaredConfig,
60 logger *logrus.Logger,
61) *EdgeManager {
62 return &EdgeManager{
63 streamHandler: streamHandler,
64 tlsConfig: tlsConfig,
65 cloudflaredConfig: cloudflaredConfig,
66 serviceDiscoverer: serviceDiscoverer,
67 state: newEdgeConnectionManagerState(edgeConnMgrConfigurable, userCredential),
68 logger: logger.WithField("subsystem", "connectionManager"),
69 }
70}
71
72func (em *EdgeManager) Run(ctx context.Context) error {
73 defer em.shutdown()
74
75 resolveEdgeIPTicker := time.Tick(resolveEdgeAddrTTL)
76 for {
77 select {
78 case <-ctx.Done():
79 return errors.Wrap(ctx.Err(), "EdgeConnectionManager terminated")
80 case <-resolveEdgeIPTicker:
81 if err := em.serviceDiscoverer.Refresh(); err != nil {
82 em.logger.WithError(err).Warn("Cannot refresh Cloudflare edge addresses")
83 }
84 default:
85 time.Sleep(1 * time.Second)
86 }
87 // Create/delete connection one at a time, so we don't need to adjust for connections that are being created/deleted
88 // in shouldCreateConnection or shouldReduceConnection calculation
89 if em.state.shouldCreateConnection(em.serviceDiscoverer.AvailableAddrs()) {
90 if err := em.newConnection(ctx); err != nil {
91 em.logger.WithError(err).Error("cannot create new connection")
92 }
93 } else if em.state.shouldReduceConnection() {
94 if err := em.closeConnection(ctx); err != nil {
95 em.logger.WithError(err).Error("cannot close connection")
96 }
97 }
98 }
99}
100
101func (em *EdgeManager) UpdateConfigurable(newConfigurable *EdgeManagerConfigurable) {
102 em.logger.Infof("New edge connection manager configuration %+v", newConfigurable)
103 em.state.updateConfigurable(newConfigurable)
104}
105
106func (em *EdgeManager) newConnection(ctx context.Context) error {
107 edgeIP := em.serviceDiscoverer.Addr()
108 edgeConn, err := em.dialEdge(ctx, edgeIP)
109 if err != nil {
110 return errors.Wrap(err, "dial edge error")
111 }
112 configurable := em.state.getConfigurable()
113 // Establish a muxed connection with the edge
114 // Client mux handshake with agent server
115 muxer, err := h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
116 Timeout: configurable.Timeout,
117 Handler: em.streamHandler,
118 IsClient: true,
119 HeartbeatInterval: configurable.HeartbeatInterval,
120 MaxHeartbeats: configurable.MaxFailedHeartbeats,
121 Logger: em.logger.WithField("subsystem", "muxer"),
122 })
123 if err != nil {
124 return errors.Wrap(err, "couldn't perform handshake with edge")
125 }
126
127 h2muxConn, err := newConnection(muxer, edgeIP)
128 if err != nil {
129 return errors.Wrap(err, "couldn't create h2mux connection")
130 }
131
132 go em.serveConn(ctx, h2muxConn)
133
134 connResult, err := h2muxConn.Connect(ctx, &pogs.ConnectParameters{
135 CloudflaredID: em.cloudflaredConfig.CloudflaredID,
136 CloudflaredVersion: em.cloudflaredConfig.BuildInfo.CloudflaredVersion,
137 NumPreviousAttempts: 0,
138 OriginCert: em.state.getUserCredential(),
139 Scope: em.cloudflaredConfig.Scope,
140 Tags: em.cloudflaredConfig.Tags,
141 }, em.logger)
142 if err != nil {
143 h2muxConn.Shutdown()
144 return errors.Wrap(err, "couldn't connect to edge")
145 }
146
147 if connErr := connResult.Err; connErr != nil {
148 if !connErr.ShouldRetry {
149 return errors.Wrap(connErr, em.noRetryMessage())
150 }
151 return errors.Wrapf(connErr, "edge responded with RetryAfter=%v", connErr.RetryAfter)
152 }
153
154 em.state.newConnection(h2muxConn)
155 em.logger.Infof("connected to %s", connResult.ServerInfo.LocationName)
156 return nil
157}
158
159func (em *EdgeManager) closeConnection(ctx context.Context) error {
160 conn := em.state.getFirstConnection()
161 if conn == nil {
162 return fmt.Errorf("no connection to close")
163 }
164 conn.Shutdown()
165 return nil
166}
167
168func (em *EdgeManager) serveConn(ctx context.Context, conn *Connection) {
169 err := conn.Serve(ctx)
170 em.logger.WithError(err).Warn("Connection closed")
171 em.state.closeConnection(conn)
172}
173
174func (em *EdgeManager) dialEdge(ctx context.Context, edgeIP *net.TCPAddr) (*tls.Conn, error) {
175 timeout := em.state.getConfigurable().Timeout
176 // Inherit from parent context so we can cancel (Ctrl-C) while dialing
177 dialCtx, dialCancel := context.WithTimeout(ctx, timeout)
178 defer dialCancel()
179
180 dialer := net.Dialer{DualStack: true}
181 edgeConn, err := dialer.DialContext(dialCtx, "tcp", edgeIP.String())
182 if err != nil {
183 return nil, dialError{cause: errors.Wrap(err, "DialContext error")}
184 }
185 tlsEdgeConn := tls.Client(edgeConn, em.tlsConfig)
186 tlsEdgeConn.SetDeadline(time.Now().Add(timeout))
187
188 if err = tlsEdgeConn.Handshake(); err != nil {
189 return nil, dialError{cause: errors.Wrap(err, "Handshake with edge error")}
190 }
191 // clear the deadline on the conn; h2mux has its own timeouts
192 tlsEdgeConn.SetDeadline(time.Time{})
193 return tlsEdgeConn, nil
194}
195
196func (em *EdgeManager) noRetryMessage() string {
197 messageTemplate := "cloudflared could not register an Argo Tunnel on your account. Please confirm the following before trying again:" +
198 "1. You have Argo Smart Routing enabled in your account, See Enable Argo section of %s." +
199 "2. Your credential at %s is still valid. See %s."
200 return fmt.Sprintf(messageTemplate, quickStartLink, em.state.getConfigurable().UserCredentialPath, faqLink)
201}
202
203func (em *EdgeManager) shutdown() {
204 em.state.shutdown()
205}
206
207type edgeManagerState struct {
208 sync.RWMutex
209 configurable *EdgeManagerConfigurable
210 userCredential []byte
211 conns map[uuid.UUID]*Connection
212}
213
214func newEdgeConnectionManagerState(configurable *EdgeManagerConfigurable, userCredential []byte) *edgeManagerState {
215 return &edgeManagerState{
216 configurable: configurable,
217 userCredential: userCredential,
218 conns: make(map[uuid.UUID]*Connection),
219 }
220}
221
222func (ems *edgeManagerState) shouldCreateConnection(availableEdgeAddrs uint8) bool {
223 ems.RLock()
224 defer ems.RUnlock()
225 expectedHAConns := ems.configurable.NumHAConnections
226 if availableEdgeAddrs < expectedHAConns {
227 expectedHAConns = availableEdgeAddrs
228 }
229 return uint8(len(ems.conns)) < expectedHAConns
230}
231
232func (ems *edgeManagerState) shouldReduceConnection() bool {
233 ems.RLock()
234 defer ems.RUnlock()
235 return uint8(len(ems.conns)) > ems.configurable.NumHAConnections
236}
237
238func (ems *edgeManagerState) newConnection(conn *Connection) {
239 ems.Lock()
240 defer ems.Unlock()
241 ems.conns[conn.id] = conn
242}
243
244func (ems *edgeManagerState) closeConnection(conn *Connection) {
245 ems.Lock()
246 defer ems.Unlock()
247 delete(ems.conns, conn.id)
248}
249
250func (ems *edgeManagerState) getFirstConnection() *Connection {
251 ems.RLock()
252 defer ems.RUnlock()
253
254 for _, conn := range ems.conns {
255 return conn
256 }
257 return nil
258}
259
260func (ems *edgeManagerState) shutdown() {
261 ems.Lock()
262 defer ems.Unlock()
263 for _, conn := range ems.conns {
264 conn.Shutdown()
265 }
266}
267
268func (ems *edgeManagerState) getConfigurable() *EdgeManagerConfigurable {
269 ems.Lock()
270 defer ems.Unlock()
271 return ems.configurable
272}
273
274func (ems *edgeManagerState) updateConfigurable(newConfigurable *EdgeManagerConfigurable) {
275 ems.Lock()
276 defer ems.Unlock()
277 ems.configurable = newConfigurable
278}
279
280func (ems *edgeManagerState) getUserCredential() []byte {
281 ems.RLock()
282 defer ems.RUnlock()
283 return ems.userCredential
284}
285