cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.9.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/manager.go

286lines · modecode

1package connection
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "net"
8 "sync"
9 "time"
10
11 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
12 "github.com/cloudflare/cloudflared/h2mux"
13 "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
14 "github.com/google/uuid"
15 "github.com/pkg/errors"
16 "github.com/sirupsen/logrus"
17)
18
19const (
20 quickStartLink = "https://developers.cloudflare.com/argo-tunnel/quickstart/"
21 faqLink = "https://developers.cloudflare.com/argo-tunnel/faq/"
22)
23
24// EdgeManager manages connections with the edge
25type EdgeManager struct {
26 // streamHandler handles stream opened by the edge
27 streamHandler h2mux.MuxedStreamHandler
28 // TLSConfig is the TLS configuration to connect with edge
29 tlsConfig *tls.Config
30 // cloudflaredConfig is the cloudflared configuration that is determined when the process first starts
31 cloudflaredConfig *CloudflaredConfig
32 // serviceDiscoverer returns the next edge addr to connect to
33 serviceDiscoverer EdgeServiceDiscoverer
34 // state is attributes of ConnectionManager that can change during runtime.
35 state *edgeManagerState
36
37 logger *logrus.Entry
38}
39
40// EdgeConnectionManagerConfigurable is the configurable attributes of a EdgeConnectionManager
41type EdgeManagerConfigurable struct {
42 TunnelHostnames []h2mux.TunnelHostname
43 *pogs.EdgeConnectionConfig
44}
45
46type CloudflaredConfig struct {
47 CloudflaredID uuid.UUID
48 Tags []pogs.Tag
49 BuildInfo *buildinfo.BuildInfo
50 Name string
51 Group string
52}
53
54func NewEdgeManager(
55 streamHandler h2mux.MuxedStreamHandler,
56 edgeConnMgrConfigurable *EdgeManagerConfigurable,
57 userCredential []byte,
58 tlsConfig *tls.Config,
59 serviceDiscoverer EdgeServiceDiscoverer,
60 cloudflaredConfig *CloudflaredConfig,
61 logger *logrus.Logger,
62) *EdgeManager {
63 return &EdgeManager{
64 streamHandler: streamHandler,
65 tlsConfig: tlsConfig,
66 cloudflaredConfig: cloudflaredConfig,
67 serviceDiscoverer: serviceDiscoverer,
68 state: newEdgeConnectionManagerState(edgeConnMgrConfigurable, userCredential),
69 logger: logger.WithField("subsystem", "connectionManager"),
70 }
71}
72
73func (em *EdgeManager) Run(ctx context.Context) error {
74 defer em.shutdown()
75
76 resolveEdgeIPTicker := time.Tick(resolveEdgeAddrTTL)
77 for {
78 select {
79 case <-ctx.Done():
80 return errors.Wrap(ctx.Err(), "EdgeConnectionManager terminated")
81 case <-resolveEdgeIPTicker:
82 if err := em.serviceDiscoverer.Refresh(); err != nil {
83 em.logger.WithError(err).Warn("Cannot refresh Cloudflare edge addresses")
84 }
85 default:
86 time.Sleep(1 * time.Second)
87 }
88 // Create/delete connection one at a time, so we don't need to adjust for connections that are being created/deleted
89 // in shouldCreateConnection or shouldReduceConnection calculation
90 if em.state.shouldCreateConnection(em.serviceDiscoverer.AvailableAddrs()) {
91 if err := em.newConnection(ctx); err != nil {
92 em.logger.WithError(err).Error("cannot create new connection")
93 }
94 } else if em.state.shouldReduceConnection() {
95 if err := em.closeConnection(ctx); err != nil {
96 em.logger.WithError(err).Error("cannot close connection")
97 }
98 }
99 }
100}
101
102func (em *EdgeManager) UpdateConfigurable(newConfigurable *EdgeManagerConfigurable) {
103 em.logger.Infof("New edge connection manager configuration %+v", newConfigurable)
104 em.state.updateConfigurable(newConfigurable)
105}
106
107func (em *EdgeManager) newConnection(ctx context.Context) error {
108 edgeIP := em.serviceDiscoverer.Addr()
109 edgeConn, err := em.dialEdge(ctx, edgeIP)
110 if err != nil {
111 return errors.Wrap(err, "dial edge error")
112 }
113 configurable := em.state.getConfigurable()
114 // Establish a muxed connection with the edge
115 // Client mux handshake with agent server
116 muxer, err := h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
117 Timeout: configurable.Timeout,
118 Handler: em.streamHandler,
119 IsClient: true,
120 HeartbeatInterval: configurable.HeartbeatInterval,
121 MaxHeartbeats: configurable.MaxFailedHeartbeats,
122 Logger: em.logger.WithField("subsystem", "muxer"),
123 })
124 if err != nil {
125 return errors.Wrap(err, "couldn't perform handshake with edge")
126 }
127
128 h2muxConn, err := newConnection(muxer, edgeIP)
129 if err != nil {
130 return errors.Wrap(err, "couldn't create h2mux connection")
131 }
132
133 go em.serveConn(ctx, h2muxConn)
134
135 connResult, err := h2muxConn.Connect(ctx, &pogs.ConnectParameters{
136 CloudflaredID: em.cloudflaredConfig.CloudflaredID,
137 CloudflaredVersion: em.cloudflaredConfig.BuildInfo.CloudflaredVersion,
138 NumPreviousAttempts: 0,
139 OriginCert: em.state.getUserCredential(),
140 Name: em.cloudflaredConfig.Name,
141 Group: em.cloudflaredConfig.Group,
142 Tags: em.cloudflaredConfig.Tags,
143 }, em.logger)
144 if err != nil {
145 h2muxConn.Shutdown()
146 return errors.Wrap(err, "couldn't connect to edge")
147 }
148
149 if connErr := connResult.Err; connErr != nil {
150 if !connErr.ShouldRetry {
151 return errors.Wrap(connErr, em.noRetryMessage())
152 }
153 return errors.Wrapf(connErr, "edge responded with RetryAfter=%v", connErr.RetryAfter)
154 }
155
156 em.state.newConnection(h2muxConn)
157 em.logger.Infof("connected to %s", connResult.ServerInfo.LocationName)
158 return nil
159}
160
161func (em *EdgeManager) closeConnection(ctx context.Context) error {
162 conn := em.state.getFirstConnection()
163 if conn == nil {
164 return fmt.Errorf("no connection to close")
165 }
166 conn.Shutdown()
167 return nil
168}
169
170func (em *EdgeManager) serveConn(ctx context.Context, conn *Connection) {
171 err := conn.Serve(ctx)
172 em.logger.WithError(err).Warn("Connection closed")
173 em.state.closeConnection(conn)
174}
175
176func (em *EdgeManager) dialEdge(ctx context.Context, edgeIP *net.TCPAddr) (*tls.Conn, error) {
177 timeout := em.state.getConfigurable().Timeout
178 // Inherit from parent context so we can cancel (Ctrl-C) while dialing
179 dialCtx, dialCancel := context.WithTimeout(ctx, timeout)
180 defer dialCancel()
181
182 dialer := net.Dialer{DualStack: true}
183 edgeConn, err := dialer.DialContext(dialCtx, "tcp", edgeIP.String())
184 if err != nil {
185 return nil, dialError{cause: errors.Wrap(err, "DialContext error")}
186 }
187 tlsEdgeConn := tls.Client(edgeConn, em.tlsConfig)
188 tlsEdgeConn.SetDeadline(time.Now().Add(timeout))
189
190 if err = tlsEdgeConn.Handshake(); err != nil {
191 return nil, dialError{cause: errors.Wrap(err, "Handshake with edge error")}
192 }
193 // clear the deadline on the conn; h2mux has its own timeouts
194 tlsEdgeConn.SetDeadline(time.Time{})
195 return tlsEdgeConn, nil
196}
197
198func (em *EdgeManager) noRetryMessage() string {
199 messageTemplate := "cloudflared could not register an Argo Tunnel on your account. Please confirm the following before trying again:" +
200 "1. You have Argo Smart Routing enabled in your account, See Enable Argo section of %s." +
201 "2. Your credential at %s is still valid. See %s."
202 return fmt.Sprintf(messageTemplate, quickStartLink, em.state.getConfigurable().UserCredentialPath, faqLink)
203}
204
205func (em *EdgeManager) shutdown() {
206 em.state.shutdown()
207}
208
209type edgeManagerState struct {
210 sync.RWMutex
211 configurable *EdgeManagerConfigurable
212 userCredential []byte
213 conns map[uuid.UUID]*Connection
214}
215
216func newEdgeConnectionManagerState(configurable *EdgeManagerConfigurable, userCredential []byte) *edgeManagerState {
217 return &edgeManagerState{
218 configurable: configurable,
219 userCredential: userCredential,
220 conns: make(map[uuid.UUID]*Connection),
221 }
222}
223
224func (ems *edgeManagerState) shouldCreateConnection(availableEdgeAddrs uint8) bool {
225 ems.RLock()
226 defer ems.RUnlock()
227 expectedHAConns := ems.configurable.NumHAConnections
228 if availableEdgeAddrs < expectedHAConns {
229 expectedHAConns = availableEdgeAddrs
230 }
231 return uint8(len(ems.conns)) < expectedHAConns
232}
233
234func (ems *edgeManagerState) shouldReduceConnection() bool {
235 ems.RLock()
236 defer ems.RUnlock()
237 return uint8(len(ems.conns)) > ems.configurable.NumHAConnections
238}
239
240func (ems *edgeManagerState) newConnection(conn *Connection) {
241 ems.Lock()
242 defer ems.Unlock()
243 ems.conns[conn.id] = conn
244}
245
246func (ems *edgeManagerState) closeConnection(conn *Connection) {
247 ems.Lock()
248 defer ems.Unlock()
249 delete(ems.conns, conn.id)
250}
251
252func (ems *edgeManagerState) getFirstConnection() *Connection {
253 ems.RLock()
254 defer ems.RUnlock()
255
256 for _, conn := range ems.conns {
257 return conn
258 }
259 return nil
260}
261
262func (ems *edgeManagerState) shutdown() {
263 ems.Lock()
264 defer ems.Unlock()
265 for _, conn := range ems.conns {
266 conn.Shutdown()
267 }
268}
269
270func (ems *edgeManagerState) getConfigurable() *EdgeManagerConfigurable {
271 ems.Lock()
272 defer ems.Unlock()
273 return ems.configurable
274}
275
276func (ems *edgeManagerState) updateConfigurable(newConfigurable *EdgeManagerConfigurable) {
277 ems.Lock()
278 defer ems.Unlock()
279 ems.configurable = newConfigurable
280}
281
282func (ems *edgeManagerState) getUserCredential() []byte {
283 ems.RLock()
284 defer ems.RUnlock()
285 return ems.userCredential
286}
287