cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2020.11.6

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/h2mux.go

220lines · modecode

1package connection
2
3import (
4 "context"
5 "net"
6 "net/http"
7 "time"
8
9 "github.com/cloudflare/cloudflared/h2mux"
10 "github.com/cloudflare/cloudflared/logger"
11 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
12 "github.com/cloudflare/cloudflared/websocket"
13 "github.com/pkg/errors"
14 "golang.org/x/sync/errgroup"
15)
16
17const (
18 muxerTimeout = 5 * time.Second
19 openStreamTimeout = 30 * time.Second
20)
21
22type h2muxConnection struct {
23 config *Config
24 muxerConfig *MuxerConfig
25 muxer *h2mux.Muxer
26 // connectionID is only used by metrics, and prometheus requires labels to be string
27 connIndexStr string
28 connIndex uint8
29
30 observer *Observer
31}
32
33type MuxerConfig struct {
34 HeartbeatInterval time.Duration
35 MaxHeartbeats uint64
36 CompressionSetting h2mux.CompressionSetting
37 MetricsUpdateFreq time.Duration
38}
39
40func (mc *MuxerConfig) H2MuxerConfig(h h2mux.MuxedStreamHandler, logger logger.Service) *h2mux.MuxerConfig {
41 return &h2mux.MuxerConfig{
42 Timeout: muxerTimeout,
43 Handler: h,
44 IsClient: true,
45 HeartbeatInterval: mc.HeartbeatInterval,
46 MaxHeartbeats: mc.MaxHeartbeats,
47 Logger: logger,
48 CompressionQuality: mc.CompressionSetting,
49 }
50}
51
52// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error
53func NewH2muxConnection(ctx context.Context,
54 config *Config,
55 muxerConfig *MuxerConfig,
56 edgeConn net.Conn,
57 connIndex uint8,
58 observer *Observer,
59) (*h2muxConnection, error, bool) {
60 h := &h2muxConnection{
61 config: config,
62 muxerConfig: muxerConfig,
63 connIndexStr: uint8ToString(connIndex),
64 connIndex: connIndex,
65 observer: observer,
66 }
67
68 // Establish a muxed connection with the edge
69 // Client mux handshake with agent server
70 muxer, err := h2mux.Handshake(edgeConn, edgeConn, *muxerConfig.H2MuxerConfig(h, observer), h2mux.ActiveStreams)
71 if err != nil {
72 recoverable := isHandshakeErrRecoverable(err, connIndex, observer)
73 return nil, err, recoverable
74 }
75 h.muxer = muxer
76 return h, nil, false
77}
78
79func (h *h2muxConnection) ServeNamedTunnel(ctx context.Context, namedTunnel *NamedTunnelConfig, credentialManager CredentialManager, connOptions *tunnelpogs.ConnectionOptions, connectedFuse ConnectedFuse) error {
80 errGroup, serveCtx := errgroup.WithContext(ctx)
81 errGroup.Go(func() error {
82 return h.serveMuxer(serveCtx)
83 })
84
85 errGroup.Go(func() error {
86 stream, err := h.newRPCStream(serveCtx, register)
87 if err != nil {
88 return err
89 }
90 rpcClient := newRegistrationRPCClient(ctx, stream, h.observer)
91 defer rpcClient.Close()
92
93 if err = rpcClient.RegisterConnection(serveCtx, namedTunnel, connOptions, h.connIndex, h.observer); err != nil {
94 return err
95 }
96 connectedFuse.Connected()
97 return nil
98 })
99
100 errGroup.Go(func() error {
101 h.controlLoop(serveCtx, connectedFuse, true)
102 return nil
103 })
104 return errGroup.Wait()
105}
106
107func (h *h2muxConnection) ServeClassicTunnel(ctx context.Context, classicTunnel *ClassicTunnelConfig, credentialManager CredentialManager, registrationOptions *tunnelpogs.RegistrationOptions, connectedFuse ConnectedFuse) error {
108 errGroup, serveCtx := errgroup.WithContext(ctx)
109 errGroup.Go(func() error {
110 return h.serveMuxer(serveCtx)
111 })
112
113 errGroup.Go(func() (err error) {
114 defer func() {
115 if err == nil {
116 connectedFuse.Connected()
117 }
118 }()
119 if classicTunnel.UseReconnectToken && connectedFuse.IsConnected() {
120 err := h.reconnectTunnel(ctx, credentialManager, classicTunnel, registrationOptions)
121 if err == nil {
122 return nil
123 }
124 // log errors and proceed to RegisterTunnel
125 h.observer.Errorf("Couldn't reconnect connection %d. Reregistering it instead. Error was: %v", h.connIndex, err)
126 }
127 return h.registerTunnel(ctx, credentialManager, classicTunnel, registrationOptions)
128 })
129
130 errGroup.Go(func() error {
131 h.controlLoop(serveCtx, connectedFuse, false)
132 return nil
133 })
134 return errGroup.Wait()
135}
136
137func (h *h2muxConnection) serveMuxer(ctx context.Context) error {
138 // All routines should stop when muxer finish serving. When muxer is shutdown
139 // gracefully, it doesn't return an error, so we need to return errMuxerShutdown
140 // here to notify other routines to stop
141 err := h.muxer.Serve(ctx)
142 if err == nil {
143 return muxerShutdownError{}
144 }
145 return err
146}
147
148func (h *h2muxConnection) controlLoop(ctx context.Context, connectedFuse ConnectedFuse, isNamedTunnel bool) {
149 updateMetricsTickC := time.Tick(h.muxerConfig.MetricsUpdateFreq)
150 for {
151 select {
152 case <-ctx.Done():
153 // UnregisterTunnel blocks until the RPC call returns
154 if connectedFuse.IsConnected() {
155 h.unregister(isNamedTunnel)
156 }
157 h.muxer.Shutdown()
158 return
159 case <-updateMetricsTickC:
160 h.observer.metrics.updateMuxerMetrics(h.connIndexStr, h.muxer.Metrics())
161 }
162 }
163}
164
165func (h *h2muxConnection) newRPCStream(ctx context.Context, rpcName rpcName) (*h2mux.MuxedStream, error) {
166 openStreamCtx, openStreamCancel := context.WithTimeout(ctx, openStreamTimeout)
167 defer openStreamCancel()
168 stream, err := h.muxer.OpenRPCStream(openStreamCtx)
169 if err != nil {
170 return nil, err
171 }
172 return stream, nil
173}
174
175func (h *h2muxConnection) ServeStream(stream *h2mux.MuxedStream) error {
176 respWriter := &h2muxRespWriter{stream}
177
178 req, reqErr := h.newRequest(stream)
179 if reqErr != nil {
180 respWriter.WriteErrorResponse()
181 return reqErr
182 }
183
184 err := h.config.OriginClient.Proxy(respWriter, req, websocket.IsWebSocketUpgrade(req))
185 if err != nil {
186 respWriter.WriteErrorResponse()
187 return err
188 }
189 return nil
190}
191
192func (h *h2muxConnection) newRequest(stream *h2mux.MuxedStream) (*http.Request, error) {
193 req, err := http.NewRequest("GET", "http://localhost:8080", h2mux.MuxedStreamReader{MuxedStream: stream})
194 if err != nil {
195 return nil, errors.Wrap(err, "Unexpected error from http.NewRequest")
196 }
197 err = h2mux.H2RequestHeadersToH1Request(stream.Headers, req)
198 if err != nil {
199 return nil, errors.Wrap(err, "invalid request received")
200 }
201 return req, nil
202}
203
204type h2muxRespWriter struct {
205 *h2mux.MuxedStream
206}
207
208func (rp *h2muxRespWriter) WriteRespHeaders(resp *http.Response) error {
209 headers := h2mux.H1ResponseToH2ResponseHeaders(resp)
210 headers = append(headers, h2mux.Header{Name: responseMetaHeaderField, Value: responseMetaHeaderOrigin})
211 return rp.WriteHeaders(headers)
212}
213
214func (rp *h2muxRespWriter) WriteErrorResponse() {
215 rp.WriteHeaders([]h2mux.Header{
216 {Name: ":status", Value: "502"},
217 {Name: responseMetaHeaderField, Value: responseMetaHeaderCfd},
218 })
219 rp.Write([]byte("502 Bad Gateway"))
220}
221