cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2021.3.2

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/http2.go

275lines · modecode

1package connection
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "math"
8 "net"
9 "net/http"
10 "strings"
11 "sync"
12
13 "github.com/cloudflare/cloudflared/h2mux"
14 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
15
16 "github.com/rs/zerolog"
17 "golang.org/x/net/http2"
18)
19
20const (
21 internalUpgradeHeader = "Cf-Cloudflared-Proxy-Connection-Upgrade"
22 tcpStreamHeader = "Cf-Cloudflared-Proxy-Src"
23 websocketUpgrade = "websocket"
24 controlStreamUpgrade = "control-stream"
25)
26
27var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed")
28
29type http2Connection struct {
30 conn net.Conn
31 server *http2.Server
32 config *Config
33 namedTunnel *NamedTunnelConfig
34 connOptions *tunnelpogs.ConnectionOptions
35 observer *Observer
36 connIndexStr string
37 connIndex uint8
38 // newRPCClientFunc allows us to mock RPCs during testing
39 newRPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient
40
41 activeRequestsWG sync.WaitGroup
42 connectedFuse ConnectedFuse
43 gracefulShutdownC <-chan struct{}
44 stoppedGracefully bool
45 controlStreamErr error // result of running control stream handler
46}
47
48func NewHTTP2Connection(
49 conn net.Conn,
50 config *Config,
51 namedTunnelConfig *NamedTunnelConfig,
52 connOptions *tunnelpogs.ConnectionOptions,
53 observer *Observer,
54 connIndex uint8,
55 connectedFuse ConnectedFuse,
56 gracefulShutdownC <-chan struct{},
57) *http2Connection {
58 return &http2Connection{
59 conn: conn,
60 server: &http2.Server{
61 MaxConcurrentStreams: math.MaxUint32,
62 },
63 config: config,
64 namedTunnel: namedTunnelConfig,
65 connOptions: connOptions,
66 observer: observer,
67 connIndexStr: uint8ToString(connIndex),
68 connIndex: connIndex,
69 newRPCClientFunc: newRegistrationRPCClient,
70 connectedFuse: connectedFuse,
71 gracefulShutdownC: gracefulShutdownC,
72 }
73}
74
75func (c *http2Connection) Serve(ctx context.Context) error {
76 go func() {
77 <-ctx.Done()
78 c.close()
79 }()
80 c.server.ServeConn(c.conn, &http2.ServeConnOpts{
81 Context: ctx,
82 Handler: c,
83 })
84
85 switch {
86 case c.stoppedGracefully:
87 return nil
88 case c.controlStreamErr != nil:
89 return c.controlStreamErr
90 default:
91 c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Lost connection with the edge")
92 return errEdgeConnectionClosed
93 }
94}
95
96func (c *http2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
97 c.activeRequestsWG.Add(1)
98 defer c.activeRequestsWG.Done()
99
100 connType := determineHTTP2Type(r)
101 respWriter, err := newHTTP2RespWriter(r, w, connType)
102 if err != nil {
103 c.observer.log.Error().Msg(err.Error())
104 return
105 }
106
107 var proxyErr error
108 switch connType {
109 case TypeControlStream:
110 proxyErr = c.serveControlStream(r.Context(), respWriter)
111 c.controlStreamErr = proxyErr
112 case TypeWebsocket:
113 stripWebsocketUpgradeHeader(r)
114 proxyErr = c.config.OriginProxy.Proxy(respWriter, r, TypeWebsocket)
115 default:
116 proxyErr = c.config.OriginProxy.Proxy(respWriter, r, connType)
117 }
118 if proxyErr != nil {
119 respWriter.WriteErrorResponse()
120 }
121}
122
123func (c *http2Connection) serveControlStream(ctx context.Context, respWriter *http2RespWriter) error {
124 rpcClient := c.newRPCClientFunc(ctx, respWriter, c.observer.log)
125 defer rpcClient.Close()
126
127 if err := rpcClient.RegisterConnection(ctx, c.namedTunnel, c.connOptions, c.connIndex, c.observer); err != nil {
128 return err
129 }
130 c.connectedFuse.Connected()
131
132 // wait for connection termination or start of graceful shutdown
133 select {
134 case <-ctx.Done():
135 break
136 case <-c.gracefulShutdownC:
137 c.stoppedGracefully = true
138 }
139
140 c.observer.sendUnregisteringEvent(c.connIndex)
141 rpcClient.GracefulShutdown(ctx, c.config.GracePeriod)
142 c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection")
143 return nil
144}
145
146func (c *http2Connection) close() {
147 // Wait for all serve HTTP handlers to return
148 c.activeRequestsWG.Wait()
149 c.conn.Close()
150}
151
152type http2RespWriter struct {
153 r io.Reader
154 w http.ResponseWriter
155 flusher http.Flusher
156 shouldFlush bool
157}
158
159func newHTTP2RespWriter(r *http.Request, w http.ResponseWriter, connType Type) (*http2RespWriter, error) {
160 flusher, isFlusher := w.(http.Flusher)
161 if !isFlusher {
162 respWriter := &http2RespWriter{
163 r: r.Body,
164 w: w,
165 }
166 respWriter.WriteErrorResponse()
167 return nil, fmt.Errorf("%T doesn't implement http.Flusher", w)
168 }
169
170 return &http2RespWriter{
171 r: r.Body,
172 w: w,
173 flusher: flusher,
174 shouldFlush: connType.shouldFlush(),
175 }, nil
176}
177
178func (rp *http2RespWriter) WriteRespHeaders(status int, header http.Header) error {
179 dest := rp.w.Header()
180 userHeaders := make(http.Header, len(header))
181 for header, values := range header {
182 // Since these are http2 headers, they're required to be lowercase
183 h2name := strings.ToLower(header)
184 for _, v := range values {
185 if h2name == "content-length" {
186 // This header has meaning in HTTP/2 and will be used by the edge,
187 // so it should be sent as an HTTP/2 response header.
188 dest.Add(h2name, v)
189 // Since these are http2 headers, they're required to be lowercase
190 } else if !h2mux.IsControlHeader(h2name) || h2mux.IsWebsocketClientHeader(h2name) {
191 // User headers, on the other hand, must all be serialized so that
192 // HTTP/2 header validation won't be applied to HTTP/1 header values
193 userHeaders.Add(h2name, v)
194 }
195 }
196 }
197
198 // Perform user header serialization and set them in the single header
199 dest.Set(canonicalResponseUserHeadersField, h2mux.SerializeHeaders(userHeaders))
200 rp.setResponseMetaHeader(responseMetaHeaderOrigin)
201 // HTTP2 removes support for 101 Switching Protocols https://tools.ietf.org/html/rfc7540#section-8.1.1
202 if status == http.StatusSwitchingProtocols {
203 status = http.StatusOK
204 }
205 rp.w.WriteHeader(status)
206 if IsServerSentEvent(header) {
207 rp.shouldFlush = true
208 }
209 if rp.shouldFlush {
210 rp.flusher.Flush()
211 }
212 return nil
213}
214
215func (rp *http2RespWriter) WriteErrorResponse() {
216 rp.setResponseMetaHeader(responseMetaHeaderCfd)
217 rp.w.WriteHeader(http.StatusBadGateway)
218}
219
220func (rp *http2RespWriter) setResponseMetaHeader(value string) {
221 rp.w.Header().Set(canonicalResponseMetaHeaderField, value)
222}
223
224func (rp *http2RespWriter) Read(p []byte) (n int, err error) {
225 return rp.r.Read(p)
226}
227
228func (rp *http2RespWriter) Write(p []byte) (n int, err error) {
229 defer func() {
230 // Implementer of OriginClient should make sure it doesn't write to the connection after Proxy returns
231 // Register a recover routine just in case.
232 if r := recover(); r != nil {
233 println("Recover from http2 response writer panic, error", r)
234 }
235 }()
236 n, err = rp.w.Write(p)
237 if err == nil && rp.shouldFlush {
238 rp.flusher.Flush()
239 }
240 return n, err
241}
242
243func (rp *http2RespWriter) Close() error {
244 return nil
245}
246
247func determineHTTP2Type(r *http.Request) Type {
248 switch {
249 case isWebsocketUpgrade(r):
250 return TypeWebsocket
251 case IsTCPStream(r):
252 return TypeTCP
253 case isControlStreamUpgrade(r):
254 return TypeControlStream
255 default:
256 return TypeHTTP
257 }
258}
259
260func isControlStreamUpgrade(r *http.Request) bool {
261 return r.Header.Get(internalUpgradeHeader) == controlStreamUpgrade
262}
263
264func isWebsocketUpgrade(r *http.Request) bool {
265 return r.Header.Get(internalUpgradeHeader) == websocketUpgrade
266}
267
268// IsTCPStream discerns if the connection request needs a tcp stream proxy.
269func IsTCPStream(r *http.Request) bool {
270 return r.Header.Get(tcpStreamHeader) != ""
271}
272
273func stripWebsocketUpgradeHeader(r *http.Request) {
274 r.Header.Del(internalUpgradeHeader)
275}
276