cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.7.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/manager.go

281lines · modecode

1package connection
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "net"
8 "sync"
9 "time"
10
11 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
12 "github.com/cloudflare/cloudflared/h2mux"
13 "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
14 "github.com/google/uuid"
15 "github.com/pkg/errors"
16 "github.com/sirupsen/logrus"
17)
18
19const (
20 quickStartLink = "https://developers.cloudflare.com/argo-tunnel/quickstart/"
21 faqLink = "https://developers.cloudflare.com/argo-tunnel/faq/"
22)
23
24// EdgeManager manages connections with the edge
25type EdgeManager struct {
26 // streamHandler handles stream opened by the edge
27 streamHandler h2mux.MuxedStreamHandler
28 // TLSConfig is the TLS configuration to connect with edge
29 tlsConfig *tls.Config
30 // cloudflaredConfig is the cloudflared configuration that is determined when the process first starts
31 cloudflaredConfig *CloudflaredConfig
32 // serviceDiscoverer returns the next edge addr to connect to
33 serviceDiscoverer EdgeServiceDiscoverer
34 // state is attributes of ConnectionManager that can change during runtime.
35 state *edgeManagerState
36
37 logger *logrus.Entry
38}
39
40// EdgeConnectionManagerConfigurable is the configurable attributes of a EdgeConnectionManager
41type EdgeManagerConfigurable struct {
42 TunnelHostnames []h2mux.TunnelHostname
43 *pogs.EdgeConnectionConfig
44}
45
46type CloudflaredConfig struct {
47 CloudflaredID uuid.UUID
48 Tags []pogs.Tag
49 BuildInfo *buildinfo.BuildInfo
50}
51
52func NewEdgeManager(
53 streamHandler h2mux.MuxedStreamHandler,
54 edgeConnMgrConfigurable *EdgeManagerConfigurable,
55 userCredential []byte,
56 tlsConfig *tls.Config,
57 serviceDiscoverer EdgeServiceDiscoverer,
58 cloudflaredConfig *CloudflaredConfig,
59 logger *logrus.Logger,
60) *EdgeManager {
61 return &EdgeManager{
62 streamHandler: streamHandler,
63 tlsConfig: tlsConfig,
64 cloudflaredConfig: cloudflaredConfig,
65 serviceDiscoverer: serviceDiscoverer,
66 state: newEdgeConnectionManagerState(edgeConnMgrConfigurable, userCredential),
67 logger: logger.WithField("subsystem", "connectionManager"),
68 }
69}
70
71func (em *EdgeManager) Run(ctx context.Context) error {
72 defer em.shutdown()
73
74 resolveEdgeIPTicker := time.Tick(resolveEdgeAddrTTL)
75 for {
76 select {
77 case <-ctx.Done():
78 return errors.Wrap(ctx.Err(), "EdgeConnectionManager terminated")
79 case <-resolveEdgeIPTicker:
80 if err := em.serviceDiscoverer.Refresh(); err != nil {
81 em.logger.WithError(err).Warn("Cannot refresh Cloudflare edge addresses")
82 }
83 default:
84 time.Sleep(1 * time.Second)
85 }
86 // Create/delete connection one at a time, so we don't need to adjust for connections that are being created/deleted
87 // in shouldCreateConnection or shouldReduceConnection calculation
88 if em.state.shouldCreateConnection(em.serviceDiscoverer.AvailableAddrs()) {
89 if err := em.newConnection(ctx); err != nil {
90 em.logger.WithError(err).Error("cannot create new connection")
91 }
92 } else if em.state.shouldReduceConnection() {
93 if err := em.closeConnection(ctx); err != nil {
94 em.logger.WithError(err).Error("cannot close connection")
95 }
96 }
97 }
98}
99
100func (em *EdgeManager) UpdateConfigurable(newConfigurable *EdgeManagerConfigurable) {
101 em.logger.Infof("New edge connection manager configuration %+v", newConfigurable)
102 em.state.updateConfigurable(newConfigurable)
103}
104
105func (em *EdgeManager) newConnection(ctx context.Context) error {
106 edgeIP := em.serviceDiscoverer.Addr()
107 edgeConn, err := em.dialEdge(ctx, edgeIP)
108 if err != nil {
109 return errors.Wrap(err, "dial edge error")
110 }
111 configurable := em.state.getConfigurable()
112 // Establish a muxed connection with the edge
113 // Client mux handshake with agent server
114 muxer, err := h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
115 Timeout: configurable.Timeout,
116 Handler: em.streamHandler,
117 IsClient: true,
118 HeartbeatInterval: configurable.HeartbeatInterval,
119 MaxHeartbeats: configurable.MaxFailedHeartbeats,
120 Logger: em.logger.WithField("subsystem", "muxer"),
121 })
122 if err != nil {
123 return errors.Wrap(err, "handshake with edge error")
124 }
125
126 h2muxConn, err := newConnection(muxer, edgeIP)
127 if err != nil {
128 return errors.Wrap(err, "create h2mux connection error")
129 }
130
131 go em.serveConn(ctx, h2muxConn)
132
133 connResult, err := h2muxConn.Connect(ctx, &pogs.ConnectParameters{
134 OriginCert: em.state.getUserCredential(),
135 CloudflaredID: em.cloudflaredConfig.CloudflaredID,
136 NumPreviousAttempts: 0,
137 CloudflaredVersion: em.cloudflaredConfig.BuildInfo.CloudflaredVersion,
138 }, em.logger)
139 if err != nil {
140 h2muxConn.Shutdown()
141 return errors.Wrap(err, "connect with edge error")
142 }
143
144 if connErr := connResult.Err; connErr != nil {
145 if !connErr.ShouldRetry {
146 return errors.Wrap(connErr, em.noRetryMessage())
147 }
148 return errors.Wrapf(connErr, "server respond with retry at %v", connErr.RetryAfter)
149 }
150
151 em.state.newConnection(h2muxConn)
152 em.logger.Infof("connected to %s", connResult.ServerInfo.LocationName)
153 return nil
154}
155
156func (em *EdgeManager) closeConnection(ctx context.Context) error {
157 conn := em.state.getFirstConnection()
158 if conn == nil {
159 return fmt.Errorf("no connection to close")
160 }
161 conn.Shutdown()
162 return nil
163}
164
165func (em *EdgeManager) serveConn(ctx context.Context, conn *Connection) {
166 err := conn.Serve(ctx)
167 em.logger.WithError(err).Warn("Connection closed")
168 em.state.closeConnection(conn)
169}
170
171func (em *EdgeManager) dialEdge(ctx context.Context, edgeIP *net.TCPAddr) (*tls.Conn, error) {
172 timeout := em.state.getConfigurable().Timeout
173 // Inherit from parent context so we can cancel (Ctrl-C) while dialing
174 dialCtx, dialCancel := context.WithTimeout(ctx, timeout)
175 defer dialCancel()
176
177 dialer := net.Dialer{DualStack: true}
178 edgeConn, err := dialer.DialContext(dialCtx, "tcp", edgeIP.String())
179 if err != nil {
180 return nil, dialError{cause: errors.Wrap(err, "DialContext error")}
181 }
182 tlsEdgeConn := tls.Client(edgeConn, em.tlsConfig)
183 tlsEdgeConn.SetDeadline(time.Now().Add(timeout))
184
185 if err = tlsEdgeConn.Handshake(); err != nil {
186 return nil, dialError{cause: errors.Wrap(err, "Handshake with edge error")}
187 }
188 // clear the deadline on the conn; h2mux has its own timeouts
189 tlsEdgeConn.SetDeadline(time.Time{})
190 return tlsEdgeConn, nil
191}
192
193func (em *EdgeManager) noRetryMessage() string {
194 messageTemplate := "cloudflared could not register an Argo Tunnel on your account. Please confirm the following before trying again:" +
195 "1. You have Argo Smart Routing enabled in your account, See Enable Argo section of %s." +
196 "2. Your credential at %s is still valid. See %s."
197 return fmt.Sprintf(messageTemplate, quickStartLink, em.state.getConfigurable().UserCredentialPath, faqLink)
198}
199
200func (em *EdgeManager) shutdown() {
201 em.state.shutdown()
202}
203
204type edgeManagerState struct {
205 sync.RWMutex
206 configurable *EdgeManagerConfigurable
207 userCredential []byte
208 conns map[uuid.UUID]*Connection
209}
210
211func newEdgeConnectionManagerState(configurable *EdgeManagerConfigurable, userCredential []byte) *edgeManagerState {
212 return &edgeManagerState{
213 configurable: configurable,
214 userCredential: userCredential,
215 conns: make(map[uuid.UUID]*Connection),
216 }
217}
218
219func (ems *edgeManagerState) shouldCreateConnection(availableEdgeAddrs uint8) bool {
220 ems.RLock()
221 defer ems.RUnlock()
222 expectedHAConns := ems.configurable.NumHAConnections
223 if availableEdgeAddrs < expectedHAConns {
224 expectedHAConns = availableEdgeAddrs
225 }
226 return uint8(len(ems.conns)) < expectedHAConns
227}
228
229func (ems *edgeManagerState) shouldReduceConnection() bool {
230 ems.RLock()
231 defer ems.RUnlock()
232 return uint8(len(ems.conns)) > ems.configurable.NumHAConnections
233}
234
235func (ems *edgeManagerState) newConnection(conn *Connection) {
236 ems.Lock()
237 defer ems.Unlock()
238 ems.conns[conn.id] = conn
239}
240
241func (ems *edgeManagerState) closeConnection(conn *Connection) {
242 ems.Lock()
243 defer ems.Unlock()
244 delete(ems.conns, conn.id)
245}
246
247func (ems *edgeManagerState) getFirstConnection() *Connection {
248 ems.RLock()
249 defer ems.RUnlock()
250
251 for _, conn := range ems.conns {
252 return conn
253 }
254 return nil
255}
256
257func (ems *edgeManagerState) shutdown() {
258 ems.Lock()
259 defer ems.Unlock()
260 for _, conn := range ems.conns {
261 conn.Shutdown()
262 }
263}
264
265func (ems *edgeManagerState) getConfigurable() *EdgeManagerConfigurable {
266 ems.Lock()
267 defer ems.Unlock()
268 return ems.configurable
269}
270
271func (ems *edgeManagerState) updateConfigurable(newConfigurable *EdgeManagerConfigurable) {
272 ems.Lock()
273 defer ems.Unlock()
274 ems.configurable = newConfigurable
275}
276
277func (ems *edgeManagerState) getUserCredential() []byte {
278 ems.RLock()
279 defer ems.RUnlock()
280 return ems.userCredential
281}