cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2020.2.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/discovery.go

420lines · modecode

1package connection
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "math/rand"
8 "net"
9 "sync"
10 "time"
11
12 "github.com/pkg/errors"
13 "github.com/sirupsen/logrus"
14)
15
16const (
17 // Used to discover HA origintunneld servers
18 srvService = "origintunneld"
19 srvProto = "tcp"
20 srvName = "argotunnel.com"
21
22 // Used to fallback to DoT when we can't use the default resolver to
23 // discover HA origintunneld servers (GitHub issue #75).
24 dotServerName = "cloudflare-dns.com"
25 dotServerAddr = "1.1.1.1:853"
26 dotTimeout = time.Duration(15 * time.Second)
27
28 // SRV record resolution TTL
29 resolveEdgeAddrTTL = 1 * time.Hour
30
31 subsystemEdgeAddrResolver = "edgeAddrResolver"
32)
33
34// Redeclare network functions so they can be overridden in tests.
35var (
36 netLookupSRV = net.LookupSRV
37 netLookupIP = net.LookupIP
38)
39
40// If the call to net.LookupSRV fails, try to fall back to DoT from Cloudflare directly.
41//
42// Note: Instead of DoT, we could also have used DoH. Either of these:
43// - directly via the JSON API (https://1.1.1.1/dns-query?ct=application/dns-json&name=_origintunneld._tcp.argotunnel.com&type=srv)
44// - indirectly via `tunneldns.NewUpstreamHTTPS()`
45// But both of these cases miss out on a key feature from the stdlib:
46// "The returned records are sorted by priority and randomized by weight within a priority."
47// (https://golang.org/pkg/net/#Resolver.LookupSRV)
48// Does this matter? I don't know. It may someday. Let's use DoT so we don't need to worry about it.
49// See also: Go feature request for stdlib-supported DoH: https://github.com/golang/go/issues/27552
50var fallbackLookupSRV = lookupSRVWithDOT
51
52var friendlyDNSErrorLines = []string{
53 `Please try the following things to diagnose this issue:`,
54 ` 1. ensure that argotunnel.com is returning "origintunneld" service records.`,
55 ` Run your system's equivalent of: dig srv _origintunneld._tcp.argotunnel.com`,
56 ` 2. ensure that your DNS resolver is not returning compressed SRV records.`,
57 ` See GitHub issue https://github.com/golang/go/issues/27546`,
58 ` For example, you could use Cloudflare's 1.1.1.1 as your resolver:`,
59 ` https://developers.cloudflare.com/1.1.1.1/setting-up-1.1.1.1/`,
60}
61
62// EdgeServiceDiscoverer is an interface for looking up Cloudflare's edge network addresses
63type EdgeServiceDiscoverer interface {
64 // Addr returns an unused address to connect to cloudflare's edge network.
65 // Before this method returns, the address will be removed from the pool of available addresses,
66 // so the caller can assume they have exclusive access to the address for tunneling purposes.
67 // The caller should remember to put it back via ReplaceAddr or MarkAddrBad.
68 Addr() (*net.TCPAddr, error)
69 // AnyAddr returns an address to connect to cloudflare's edge network.
70 // It may or may not be in active use for a tunnel.
71 // The caller should NOT return it via ReplaceAddr or MarkAddrBad!
72 AnyAddr() (*net.TCPAddr, error)
73 // ReplaceAddr is called when the address is no longer needed, e.g. due to a scaling-down of numHAConnections.
74 // It returns the address to the pool of available addresses.
75 ReplaceAddr(addr *net.TCPAddr)
76 // MarkAddrBad is called when there was a connectivity error for the address.
77 // It marks the address as unused but doesn't return it to the pool of available addresses.
78 MarkAddrBad(addr *net.TCPAddr)
79 // AvailableAddrs returns the number of addresses available for use
80 // (less those that have been marked bad).
81 AvailableAddrs() int
82 // Refresh rediscovers Cloudflare's edge network addresses.
83 // It resets the state of "bad" addresses but not those in active use.
84 Refresh() error
85}
86
87// EdgeAddrResolver discovers the addresses of Cloudflare's edge network through SRV record.
88// It implements EdgeServiceDiscoverer interface
89type EdgeAddrResolver struct {
90 sync.Mutex
91 // HA regions
92 regions []*region
93 // Logger for noteworthy events
94 logger *logrus.Entry
95}
96
97type region struct {
98 // Addresses that we expect will be in active use
99 addrs []*net.TCPAddr
100 // Addresses that are in active use.
101 // This is actually a set of net.TCPAddr's, but we can't make a map like
102 // map[net.TCPAddr]bool
103 // since net.TCPAddr contains a field of type net.IP and therefore it cannot be used as a map key.
104 // So instead we use map[string]*net.TCPAddr, where the keys are obtained by net.TCPAddr.String().
105 // (We keep the "raw" *net.TCPAddr values for the convenience of AnyAddr(). If that method didn't
106 // exist, we wouldn't strictly need the values, and this could be a map[string]bool.)
107 inUse map[string]*net.TCPAddr
108 // Addresses that were discarded due to a network error.
109 // Not sure what we'll do with these, but it feels good to keep them around for now.
110 bad []*net.TCPAddr
111}
112
113func NewEdgeAddrResolver(logger *logrus.Logger) (EdgeServiceDiscoverer, error) {
114 r := &EdgeAddrResolver{
115 logger: logger.WithField("subsystem", subsystemEdgeAddrResolver),
116 }
117 if err := r.Refresh(); err != nil {
118 return nil, err
119 }
120 return r, nil
121}
122
123func (r *EdgeAddrResolver) Addr() (*net.TCPAddr, error) {
124 r.Lock()
125 defer r.Unlock()
126
127 // compute the largest region based on len(addrs)
128 var largestRegion *region
129 {
130 if len(r.regions) == 0 {
131 return nil, errors.New("No HA regions")
132 }
133 largestRegion = r.regions[0]
134 for _, region := range r.regions[1:] {
135 if len(region.addrs) > len(largestRegion.addrs) {
136 largestRegion = region
137 }
138 }
139 if len(largestRegion.addrs) == 0 {
140 return nil, errors.New("No IP address to claim")
141 }
142 }
143
144 var addr *net.TCPAddr
145 addr, largestRegion.addrs = popAddr(largestRegion.addrs)
146 largestRegion.inUse[addr.String()] = addr
147 return addr, nil
148}
149
150func (r *EdgeAddrResolver) AnyAddr() (*net.TCPAddr, error) {
151 r.Lock()
152 defer r.Unlock()
153 for _, region := range r.regions {
154 // return an unused addr
155 if len(region.addrs) > 0 {
156 return region.addrs[rand.Intn(len(region.addrs))], nil
157 }
158 // return an addr that's in use
159 for _, addr := range region.inUse {
160 return addr, nil
161 }
162 }
163 return nil, fmt.Errorf("No IP addresses")
164}
165
166func (r *EdgeAddrResolver) ReplaceAddr(addr *net.TCPAddr) {
167 r.Lock()
168 defer r.Unlock()
169 addrString := addr.String()
170 for _, region := range r.regions {
171 if _, ok := region.inUse[addrString]; ok {
172 delete(region.inUse, addrString)
173 region.addrs = append(region.addrs, addr)
174 break
175 }
176 }
177}
178
179func (r *EdgeAddrResolver) MarkAddrBad(addr *net.TCPAddr) {
180 r.Lock()
181 defer r.Unlock()
182 addrString := addr.String()
183 for _, region := range r.regions {
184 if _, ok := region.inUse[addrString]; ok {
185 delete(region.inUse, addrString)
186 region.bad = append(region.bad, addr)
187 break
188 }
189 }
190}
191
192func (r *EdgeAddrResolver) AvailableAddrs() int {
193 r.Lock()
194 defer r.Unlock()
195 result := 0
196 for _, region := range r.regions {
197 result += len(region.addrs)
198 }
199 return result
200}
201
202func (r *EdgeAddrResolver) Refresh() error {
203 addrLists, err := EdgeDiscovery(r.logger)
204 if err != nil {
205 return err
206 }
207
208 r.Lock()
209 defer r.Unlock()
210 inUse := allInUse(r.regions)
211 r.regions = makeHARegions(addrLists, inUse)
212 return nil
213}
214
215// EdgeDiscovery implements HA service discovery lookup.
216func EdgeDiscovery(logger *logrus.Entry) ([][]*net.TCPAddr, error) {
217 _, addrs, err := netLookupSRV(srvService, srvProto, srvName)
218 if err != nil {
219 _, fallbackAddrs, fallbackErr := fallbackLookupSRV(srvService, srvProto, srvName)
220 if fallbackErr != nil || len(fallbackAddrs) == 0 {
221 // use the original DNS error `err` in messages, not `fallbackErr`
222 logger.Errorln("Error looking up Cloudflare edge IPs: the DNS query failed:", err)
223 for _, s := range friendlyDNSErrorLines {
224 logger.Errorln(s)
225 }
226 return nil, errors.Wrapf(err, "Could not lookup srv records on _%v._%v.%v", srvService, srvProto, srvName)
227 }
228 // Accept the fallback results and keep going
229 addrs = fallbackAddrs
230 }
231
232 var resolvedIPsPerCNAME [][]*net.TCPAddr
233 for _, addr := range addrs {
234 ips, err := resolveSRVToTCP(addr)
235 if err != nil {
236 return nil, err
237 }
238 resolvedIPsPerCNAME = append(resolvedIPsPerCNAME, ips)
239 }
240
241 return resolvedIPsPerCNAME, nil
242}
243
244func lookupSRVWithDOT(service, proto, name string) (cname string, addrs []*net.SRV, err error) {
245 // Inspiration: https://github.com/artyom/dot/blob/master/dot.go
246 r := &net.Resolver{
247 PreferGo: true,
248 Dial: func(ctx context.Context, _ string, _ string) (net.Conn, error) {
249 var dialer net.Dialer
250 conn, err := dialer.DialContext(ctx, "tcp", dotServerAddr)
251 if err != nil {
252 return nil, err
253 }
254 tlsConfig := &tls.Config{ServerName: dotServerName}
255 return tls.Client(conn, tlsConfig), nil
256 },
257 }
258 ctx, cancel := context.WithTimeout(context.Background(), dotTimeout)
259 defer cancel()
260 return r.LookupSRV(ctx, srvService, srvProto, srvName)
261}
262
263func resolveSRVToTCP(srv *net.SRV) ([]*net.TCPAddr, error) {
264 ips, err := netLookupIP(srv.Target)
265 if err != nil {
266 return nil, errors.Wrapf(err, "Couldn't resolve SRV record %v", srv)
267 }
268 if len(ips) == 0 {
269 return nil, fmt.Errorf("SRV record %v had no IPs", srv)
270 }
271 addrs := make([]*net.TCPAddr, len(ips))
272 for i, ip := range ips {
273 addrs[i] = &net.TCPAddr{IP: ip, Port: int(srv.Port)}
274 }
275 return addrs, nil
276}
277
278// EdgeHostnameResolver discovers the addresses of Cloudflare's edge network via a list of server hostnames.
279// It implements EdgeServiceDiscoverer interface, and is used mainly for testing connectivity.
280type EdgeHostnameResolver struct {
281 sync.Mutex
282 // hostnames of edge servers
283 hostnames []string
284 // Addrs to connect to cloudflare's edge network
285 addrs []*net.TCPAddr
286 // Addresses that are in active use.
287 // This is actually a set of net.TCPAddr's. We have to encode the keys
288 // with .String(), since net.TCPAddr contains a field of type net.IP and
289 // therefore it cannot be used as a map key
290 inUse map[string]*net.TCPAddr
291 // Addresses that were discarded due to a network error.
292 // Not sure what we'll do with these, but it feels good to keep them around for now.
293 bad []*net.TCPAddr
294}
295
296func NewEdgeHostnameResolver(edgeHostnames []string) (EdgeServiceDiscoverer, error) {
297 r := &EdgeHostnameResolver{
298 hostnames: edgeHostnames,
299 inUse: map[string]*net.TCPAddr{},
300 }
301 if err := r.Refresh(); err != nil {
302 return nil, err
303 }
304 return r, nil
305}
306
307func (r *EdgeHostnameResolver) Addr() (*net.TCPAddr, error) {
308 r.Lock()
309 defer r.Unlock()
310 if len(r.addrs) == 0 {
311 return nil, errors.New("No IP address to claim")
312 }
313 var addr *net.TCPAddr
314 addr, r.addrs = popAddr(r.addrs)
315 r.inUse[addr.String()] = addr
316 return addr, nil
317}
318
319func (r *EdgeHostnameResolver) AnyAddr() (*net.TCPAddr, error) {
320 r.Lock()
321 defer r.Unlock()
322 // return an unused addr
323 if len(r.addrs) > 0 {
324 return r.addrs[rand.Intn(len(r.addrs))], nil
325 }
326 // return an addr that's in use
327 for _, addr := range r.inUse {
328 return addr, nil
329 }
330 return nil, errors.New("No IP addresses")
331}
332
333func (r *EdgeHostnameResolver) ReplaceAddr(addr *net.TCPAddr) {
334 r.Lock()
335 defer r.Unlock()
336 delete(r.inUse, addr.String())
337 r.addrs = append(r.addrs, addr)
338}
339func (r *EdgeHostnameResolver) MarkAddrBad(addr *net.TCPAddr) {
340 r.Lock()
341 defer r.Unlock()
342 delete(r.inUse, addr.String())
343 r.bad = append(r.bad, addr)
344}
345
346func (r *EdgeHostnameResolver) AvailableAddrs() int {
347 r.Lock()
348 defer r.Unlock()
349 return len(r.addrs)
350}
351
352func (r *EdgeHostnameResolver) Refresh() error {
353 newAddrs, err := ResolveAddrs(r.hostnames)
354 if err != nil {
355 return err
356 }
357 r.Lock()
358 defer r.Unlock()
359 var notInUse []*net.TCPAddr
360 for _, newAddr := range newAddrs {
361 if _, ok := r.inUse[newAddr.String()]; !ok {
362 notInUse = append(notInUse, newAddr)
363 }
364 }
365 r.addrs = notInUse
366 r.bad = nil
367 return nil
368}
369
370// Resolve TCP address given a list of addresses. Address can be a hostname, however, it will return at most one
371// of the hostname's IP addresses
372func ResolveAddrs(addrs []string) ([]*net.TCPAddr, error) {
373 var tcpAddrs []*net.TCPAddr
374 for _, addr := range addrs {
375 tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
376 if err != nil {
377 return nil, err
378 }
379 tcpAddrs = append(tcpAddrs, tcpAddr)
380 }
381 return tcpAddrs, nil
382}
383
384// Compute total set of IP addresses in use. This is useful if the regions
385// are returned in a different order, or if an IP address is assigned to
386// a different region for some reasion.
387func allInUse(regions []*region) map[string]*net.TCPAddr {
388 result := make(map[string]*net.TCPAddr)
389 for _, region := range regions {
390 for k, v := range region.inUse {
391 result[k] = v
392 }
393 }
394 return result
395}
396
397func makeHARegions(addrLists [][]*net.TCPAddr, inUse map[string]*net.TCPAddr) (regions []*region) {
398 for _, addrList := range addrLists {
399 region := &region{inUse: map[string]*net.TCPAddr{}}
400 for _, addr := range addrList {
401 addrString := addr.String()
402 // No matter what region `addr` used to belong to, it's now a part
403 // of this region, so add it to this region's `inUse` map.
404 if _, ok := inUse[addrString]; ok {
405 region.inUse[addrString] = addr
406 } else {
407 region.addrs = append(region.addrs, addr)
408 }
409 }
410 regions = append(regions, region)
411 }
412 return
413}
414
415func popAddr(addrs []*net.TCPAddr) (*net.TCPAddr, []*net.TCPAddr) {
416 first := addrs[0]
417 addrs[0] = nil // prevent memory leak
418 addrs = addrs[1:]
419 return first, addrs
420}
421