cloudflare/cloudflared
Publicmirrored from https://github.com/cloudflare/cloudflaredAvailable
connection/supervisor.go
147lines · modecode
| 1 | package connection |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "net" |
| 6 | "time" |
| 7 | |
| 8 | tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" |
| 9 | "github.com/google/uuid" |
| 10 | "github.com/pkg/errors" |
| 11 | "github.com/sirupsen/logrus" |
| 12 | ) |
| 13 | |
| 14 | const ( |
| 15 | // Waiting time before retrying a failed tunnel connection |
| 16 | reconnectDuration = time.Second * 10 |
| 17 | // SRV record resolution TTL |
| 18 | resolveTTL = time.Hour |
| 19 | // Interval between establishing new connection |
| 20 | connectionInterval = time.Second |
| 21 | ) |
| 22 | |
| 23 | type CloudflaredConfig struct { |
| 24 | ConnectionConfig *ConnectionConfig |
| 25 | OriginCert []byte |
| 26 | Tags []tunnelpogs.Tag |
| 27 | EdgeAddrs []string |
| 28 | HAConnections uint |
| 29 | Logger *logrus.Logger |
| 30 | CloudflaredVersion string |
| 31 | } |
| 32 | |
| 33 | // Supervisor is a stateful object that manages connections with the edge |
| 34 | type Supervisor struct { |
| 35 | config *CloudflaredConfig |
| 36 | state *supervisorState |
| 37 | connErrors chan error |
| 38 | } |
| 39 | |
| 40 | type supervisorState struct { |
| 41 | // IPs to connect to cloudflare's edge network |
| 42 | edgeIPs []*net.TCPAddr |
| 43 | // index of the next element to use in edgeIPs |
| 44 | nextEdgeIPIndex int |
| 45 | // last time edgeIPs were refreshed |
| 46 | lastResolveTime time.Time |
| 47 | // ID of this cloudflared instance |
| 48 | cloudflaredID uuid.UUID |
| 49 | // connectionPool is a pool of connectionHandlers that can be used to make RPCs |
| 50 | connectionPool *connectionPool |
| 51 | } |
| 52 | |
| 53 | func (s *supervisorState) getNextEdgeIP() *net.TCPAddr { |
| 54 | ip := s.edgeIPs[s.nextEdgeIPIndex%len(s.edgeIPs)] |
| 55 | s.nextEdgeIPIndex++ |
| 56 | return ip |
| 57 | } |
| 58 | |
| 59 | func NewSupervisor(config *CloudflaredConfig) *Supervisor { |
| 60 | return &Supervisor{ |
| 61 | config: config, |
| 62 | state: &supervisorState{ |
| 63 | connectionPool: &connectionPool{}, |
| 64 | }, |
| 65 | connErrors: make(chan error), |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | func (s *Supervisor) Run(ctx context.Context) error { |
| 70 | logger := s.config.Logger |
| 71 | if err := s.initialize(); err != nil { |
| 72 | logger.WithError(err).Error("Failed to get edge IPs") |
| 73 | return err |
| 74 | } |
| 75 | defer s.state.connectionPool.close() |
| 76 | |
| 77 | var currentConnectionCount uint |
| 78 | expectedConnectionCount := s.config.HAConnections |
| 79 | if uint(len(s.state.edgeIPs)) < s.config.HAConnections { |
| 80 | logger.Warnf("You requested %d HA connections but I can give you at most %d.", s.config.HAConnections, len(s.state.edgeIPs)) |
| 81 | expectedConnectionCount = uint(len(s.state.edgeIPs)) |
| 82 | } |
| 83 | for { |
| 84 | select { |
| 85 | case <-ctx.Done(): |
| 86 | return nil |
| 87 | case connErr := <-s.connErrors: |
| 88 | logger.WithError(connErr).Warnf("Connection dropped unexpectedly") |
| 89 | currentConnectionCount-- |
| 90 | default: |
| 91 | time.Sleep(5 * time.Second) |
| 92 | } |
| 93 | if currentConnectionCount < expectedConnectionCount { |
| 94 | h, err := newH2MuxHandler(ctx, s.config.ConnectionConfig, s.state.getNextEdgeIP()) |
| 95 | if err != nil { |
| 96 | logger.WithError(err).Error("Failed to create new connection handler") |
| 97 | continue |
| 98 | } |
| 99 | go func() { |
| 100 | s.connErrors <- h.serve(ctx) |
| 101 | }() |
| 102 | connResult, err := s.connect(ctx, s.config, s.state.cloudflaredID, h) |
| 103 | if err != nil { |
| 104 | logger.WithError(err).Errorf("Failed to connect to cloudflared's edge network") |
| 105 | h.shutdown() |
| 106 | continue |
| 107 | } |
| 108 | if connErr := connResult.Err; connErr != nil && !connErr.ShouldRetry { |
| 109 | logger.WithError(connErr).Errorf("Server respond with don't retry to connect") |
| 110 | h.shutdown() |
| 111 | return err |
| 112 | } |
| 113 | logger.Infof("Connected to %s", connResult.ServerInfo.LocationName) |
| 114 | s.state.connectionPool.put(h) |
| 115 | currentConnectionCount++ |
| 116 | } |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | func (s *Supervisor) initialize() error { |
| 121 | edgeIPs, err := ResolveEdgeIPs(s.config.Logger, s.config.EdgeAddrs) |
| 122 | if err != nil { |
| 123 | return errors.Wrapf(err, "Failed to resolve cloudflare edge network address") |
| 124 | } |
| 125 | s.state.edgeIPs = edgeIPs |
| 126 | s.state.lastResolveTime = time.Now() |
| 127 | cloudflaredID, err := uuid.NewRandom() |
| 128 | if err != nil { |
| 129 | return errors.Wrap(err, "Failed to generate cloudflared ID") |
| 130 | } |
| 131 | s.state.cloudflaredID = cloudflaredID |
| 132 | return nil |
| 133 | } |
| 134 | |
| 135 | func (s *Supervisor) connect(ctx context.Context, |
| 136 | config *CloudflaredConfig, |
| 137 | cloudflaredID uuid.UUID, |
| 138 | h connectionHandler, |
| 139 | ) (*tunnelpogs.ConnectResult, error) { |
| 140 | connectParameters := &tunnelpogs.ConnectParameters{ |
| 141 | OriginCert: config.OriginCert, |
| 142 | CloudflaredID: cloudflaredID, |
| 143 | NumPreviousAttempts: 0, |
| 144 | CloudflaredVersion: config.CloudflaredVersion, |
| 145 | } |
| 146 | return h.connect(ctx, connectParameters) |
| 147 | } |
| 148 | |