cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.5.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/connection.go

162lines · modecode

1package connection
2
3import (
4 "context"
5 "crypto/tls"
6 "net"
7 "sync"
8 "time"
9
10 "github.com/cloudflare/cloudflared/h2mux"
11 "github.com/cloudflare/cloudflared/tunnelrpc"
12 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
13 "github.com/pkg/errors"
14 "github.com/sirupsen/logrus"
15
16 rpc "zombiezen.com/go/capnproto2/rpc"
17)
18
19const (
20 dialTimeout = 5 * time.Second
21 openStreamTimeout = 30 * time.Second
22)
23
24type dialError struct {
25 cause error
26}
27
28func (e dialError) Error() string {
29 return e.cause.Error()
30}
31
32type muxerShutdownError struct{}
33
34func (e muxerShutdownError) Error() string {
35 return "muxer shutdown"
36}
37
38type ConnectionConfig struct {
39 TLSConfig *tls.Config
40 HeartbeatInterval time.Duration
41 MaxHeartbeats uint64
42 Logger *logrus.Entry
43}
44
45type connectionHandler interface {
46 serve(ctx context.Context) error
47 connect(ctx context.Context, parameters *tunnelpogs.ConnectParameters) (*tunnelpogs.ConnectResult, error)
48 shutdown()
49}
50
51type h2muxHandler struct {
52 muxer *h2mux.Muxer
53 logger *logrus.Entry
54}
55
56type muxedStreamHandler struct {
57}
58
59// Implements MuxedStreamHandler interface
60func (h *muxedStreamHandler) ServeStream(stream *h2mux.MuxedStream) error {
61 return nil
62}
63
64func (h *h2muxHandler) serve(ctx context.Context) error {
65 // Serve doesn't return until h2mux is shutdown
66 if err := h.muxer.Serve(ctx); err != nil {
67 return err
68 }
69 return muxerShutdownError{}
70}
71
72// Connect is used to establish connections with cloudflare's edge network
73func (h *h2muxHandler) connect(ctx context.Context, parameters *tunnelpogs.ConnectParameters) (*tunnelpogs.ConnectResult, error) {
74 openStreamCtx, cancel := context.WithTimeout(ctx, openStreamTimeout)
75 defer cancel()
76 conn, err := h.newRPConn(openStreamCtx)
77 if err != nil {
78 return nil, errors.Wrap(err, "Failed to create new RPC connection")
79 }
80 defer conn.Close()
81 tsClient := tunnelpogs.TunnelServer_PogsClient{Client: conn.Bootstrap(ctx)}
82 return tsClient.Connect(ctx, parameters)
83}
84
85func (h *h2muxHandler) shutdown() {
86 h.muxer.Shutdown()
87}
88
89func (h *h2muxHandler) newRPConn(ctx context.Context) (*rpc.Conn, error) {
90 stream, err := h.muxer.OpenStream(ctx, []h2mux.Header{
91 {Name: ":method", Value: "RPC"},
92 {Name: ":scheme", Value: "capnp"},
93 {Name: ":path", Value: "*"},
94 }, nil)
95 if err != nil {
96 return nil, err
97 }
98 return rpc.NewConn(
99 tunnelrpc.NewTransportLogger(h.logger.WithField("subsystem", "rpc-register"), rpc.StreamTransport(stream)),
100 tunnelrpc.ConnLog(h.logger.WithField("subsystem", "rpc-transport")),
101 ), nil
102}
103
104// NewConnectionHandler returns a connectionHandler, wrapping h2mux to make RPC calls
105func newH2MuxHandler(ctx context.Context,
106 config *ConnectionConfig,
107 edgeIP *net.TCPAddr,
108) (connectionHandler, error) {
109 // Inherit from parent context so we can cancel (Ctrl-C) while dialing
110 dialCtx, dialCancel := context.WithTimeout(ctx, dialTimeout)
111 defer dialCancel()
112 dialer := net.Dialer{DualStack: true}
113 plaintextEdgeConn, err := dialer.DialContext(dialCtx, "tcp", edgeIP.String())
114 if err != nil {
115 return nil, dialError{cause: errors.Wrap(err, "DialContext error")}
116 }
117 edgeConn := tls.Client(plaintextEdgeConn, config.TLSConfig)
118 edgeConn.SetDeadline(time.Now().Add(dialTimeout))
119 err = edgeConn.Handshake()
120 if err != nil {
121 return nil, dialError{cause: errors.Wrap(err, "Handshake with edge error")}
122 }
123 // clear the deadline on the conn; h2mux has its own timeouts
124 edgeConn.SetDeadline(time.Time{})
125 // Establish a muxed connection with the edge
126 // Client mux handshake with agent server
127 muxer, err := h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
128 Timeout: dialTimeout,
129 Handler: &muxedStreamHandler{},
130 IsClient: true,
131 HeartbeatInterval: config.HeartbeatInterval,
132 MaxHeartbeats: config.MaxHeartbeats,
133 Logger: config.Logger,
134 })
135 if err != nil {
136 return nil, err
137 }
138 return &h2muxHandler{
139 muxer: muxer,
140 logger: config.Logger,
141 }, nil
142}
143
144// connectionPool is a pool of connection handlers
145type connectionPool struct {
146 sync.Mutex
147 connectionHandlers []connectionHandler
148}
149
150func (cp *connectionPool) put(h connectionHandler) {
151 cp.Lock()
152 defer cp.Unlock()
153 cp.connectionHandlers = append(cp.connectionHandlers, h)
154}
155
156func (cp *connectionPool) close() {
157 cp.Lock()
158 defer cp.Unlock()
159 for _, h := range cp.connectionHandlers {
160 h.shutdown()
161 }
162}
163