cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.5.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/supervisor.go

147lines · modecode

1package connection
2
3import (
4 "context"
5 "net"
6 "time"
7
8 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
9 "github.com/google/uuid"
10 "github.com/pkg/errors"
11 "github.com/sirupsen/logrus"
12)
13
14const (
15 // Waiting time before retrying a failed tunnel connection
16 reconnectDuration = time.Second * 10
17 // SRV record resolution TTL
18 resolveTTL = time.Hour
19 // Interval between establishing new connection
20 connectionInterval = time.Second
21)
22
23type CloudflaredConfig struct {
24 ConnectionConfig *ConnectionConfig
25 OriginCert []byte
26 Tags []tunnelpogs.Tag
27 EdgeAddrs []string
28 HAConnections uint
29 Logger *logrus.Logger
30 CloudflaredVersion string
31}
32
33// Supervisor is a stateful object that manages connections with the edge
34type Supervisor struct {
35 config *CloudflaredConfig
36 state *supervisorState
37 connErrors chan error
38}
39
40type supervisorState struct {
41 // IPs to connect to cloudflare's edge network
42 edgeIPs []*net.TCPAddr
43 // index of the next element to use in edgeIPs
44 nextEdgeIPIndex int
45 // last time edgeIPs were refreshed
46 lastResolveTime time.Time
47 // ID of this cloudflared instance
48 cloudflaredID uuid.UUID
49 // connectionPool is a pool of connectionHandlers that can be used to make RPCs
50 connectionPool *connectionPool
51}
52
53func (s *supervisorState) getNextEdgeIP() *net.TCPAddr {
54 ip := s.edgeIPs[s.nextEdgeIPIndex%len(s.edgeIPs)]
55 s.nextEdgeIPIndex++
56 return ip
57}
58
59func NewSupervisor(config *CloudflaredConfig) *Supervisor {
60 return &Supervisor{
61 config: config,
62 state: &supervisorState{
63 connectionPool: &connectionPool{},
64 },
65 connErrors: make(chan error),
66 }
67}
68
69func (s *Supervisor) Run(ctx context.Context) error {
70 logger := s.config.Logger
71 if err := s.initialize(); err != nil {
72 logger.WithError(err).Error("Failed to get edge IPs")
73 return err
74 }
75 defer s.state.connectionPool.close()
76
77 var currentConnectionCount uint
78 expectedConnectionCount := s.config.HAConnections
79 if uint(len(s.state.edgeIPs)) < s.config.HAConnections {
80 logger.Warnf("You requested %d HA connections but I can give you at most %d.", s.config.HAConnections, len(s.state.edgeIPs))
81 expectedConnectionCount = uint(len(s.state.edgeIPs))
82 }
83 for {
84 select {
85 case <-ctx.Done():
86 return nil
87 case connErr := <-s.connErrors:
88 logger.WithError(connErr).Warnf("Connection dropped unexpectedly")
89 currentConnectionCount--
90 default:
91 time.Sleep(5 * time.Second)
92 }
93 if currentConnectionCount < expectedConnectionCount {
94 h, err := newH2MuxHandler(ctx, s.config.ConnectionConfig, s.state.getNextEdgeIP())
95 if err != nil {
96 logger.WithError(err).Error("Failed to create new connection handler")
97 continue
98 }
99 go func() {
100 s.connErrors <- h.serve(ctx)
101 }()
102 connResult, err := s.connect(ctx, s.config, s.state.cloudflaredID, h)
103 if err != nil {
104 logger.WithError(err).Errorf("Failed to connect to cloudflared's edge network")
105 h.shutdown()
106 continue
107 }
108 if connErr := connResult.Err; connErr != nil && !connErr.ShouldRetry {
109 logger.WithError(connErr).Errorf("Server respond with don't retry to connect")
110 h.shutdown()
111 return err
112 }
113 logger.Infof("Connected to %s", connResult.ServerInfo.LocationName)
114 s.state.connectionPool.put(h)
115 currentConnectionCount++
116 }
117 }
118}
119
120func (s *Supervisor) initialize() error {
121 edgeIPs, err := ResolveEdgeIPs(s.config.Logger, s.config.EdgeAddrs)
122 if err != nil {
123 return errors.Wrapf(err, "Failed to resolve cloudflare edge network address")
124 }
125 s.state.edgeIPs = edgeIPs
126 s.state.lastResolveTime = time.Now()
127 cloudflaredID, err := uuid.NewRandom()
128 if err != nil {
129 return errors.Wrap(err, "Failed to generate cloudflared ID")
130 }
131 s.state.cloudflaredID = cloudflaredID
132 return nil
133}
134
135func (s *Supervisor) connect(ctx context.Context,
136 config *CloudflaredConfig,
137 cloudflaredID uuid.UUID,
138 h connectionHandler,
139) (*tunnelpogs.ConnectResult, error) {
140 connectParameters := &tunnelpogs.ConnectParameters{
141 OriginCert: config.OriginCert,
142 CloudflaredID: cloudflaredID,
143 NumPreviousAttempts: 0,
144 CloudflaredVersion: config.CloudflaredVersion,
145 }
146 return h.connect(ctx, connectParameters)
147}
148