cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2021.10.5

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/http2.go

301lines · modecode

1package connection
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net"
8 "net/http"
9 "runtime/debug"
10 "strings"
11 "sync"
12
13 "github.com/pkg/errors"
14 "github.com/rs/zerolog"
15 "golang.org/x/net/http2"
16
17 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
18)
19
20// note: these constants are exported so we can reuse them in the edge-side code
21const (
22 InternalUpgradeHeader = "Cf-Cloudflared-Proxy-Connection-Upgrade"
23 InternalTCPProxySrcHeader = "Cf-Cloudflared-Proxy-Src"
24 WebsocketUpgrade = "websocket"
25 ControlStreamUpgrade = "control-stream"
26)
27
28var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed")
29
30// HTTP2Connection represents a net.Conn that uses HTTP2 frames to proxy traffic from the edge to cloudflared on the
31// origin.
32type HTTP2Connection struct {
33 conn net.Conn
34 server *http2.Server
35 config *Config
36 connOptions *tunnelpogs.ConnectionOptions
37 observer *Observer
38 connIndex uint8
39 // newRPCClientFunc allows us to mock RPCs during testing
40 newRPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient
41
42 log *zerolog.Logger
43 activeRequestsWG sync.WaitGroup
44 controlStreamHandler ControlStreamHandler
45 stoppedGracefully bool
46 controlStreamErr error // result of running control stream handler
47}
48
49// NewHTTP2Connection returns a new instance of HTTP2Connection.
50func NewHTTP2Connection(
51 conn net.Conn,
52 config *Config,
53 connOptions *tunnelpogs.ConnectionOptions,
54 observer *Observer,
55 connIndex uint8,
56 controlStreamHandler ControlStreamHandler,
57 log *zerolog.Logger,
58) *HTTP2Connection {
59 return &HTTP2Connection{
60 conn: conn,
61 server: &http2.Server{
62 MaxConcurrentStreams: MaxConcurrentStreams,
63 },
64 config: config,
65 connOptions: connOptions,
66 observer: observer,
67 connIndex: connIndex,
68 newRPCClientFunc: newRegistrationRPCClient,
69 controlStreamHandler: controlStreamHandler,
70 log: log,
71 }
72}
73
74// Serve serves an HTTP2 server that the edge can talk to.
75func (c *HTTP2Connection) Serve(ctx context.Context) error {
76 go func() {
77 <-ctx.Done()
78 c.close()
79 }()
80 c.server.ServeConn(c.conn, &http2.ServeConnOpts{
81 Context: ctx,
82 Handler: c,
83 })
84
85 switch {
86 case c.controlStreamHandler.IsStopped():
87 return nil
88 case c.controlStreamErr != nil:
89 return c.controlStreamErr
90 default:
91 c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Lost connection with the edge")
92 return errEdgeConnectionClosed
93 }
94}
95
96func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
97 c.activeRequestsWG.Add(1)
98 defer c.activeRequestsWG.Done()
99
100 connType := determineHTTP2Type(r)
101 handleMissingRequestParts(connType, r)
102
103 respWriter, err := NewHTTP2RespWriter(r, w, connType)
104 if err != nil {
105 c.observer.log.Error().Msg(err.Error())
106 return
107 }
108
109 switch connType {
110 case TypeControlStream:
111 if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, true); err != nil {
112 c.controlStreamErr = err
113 c.log.Error().Err(err)
114 respWriter.WriteErrorResponse()
115 }
116
117 case TypeWebsocket, TypeHTTP:
118 stripWebsocketUpgradeHeader(r)
119 if err := c.config.OriginProxy.ProxyHTTP(respWriter, r, connType == TypeWebsocket); err != nil {
120 err := fmt.Errorf("Failed to proxy HTTP: %w", err)
121 c.log.Error().Err(err)
122 respWriter.WriteErrorResponse()
123 }
124
125 case TypeTCP:
126 host, err := getRequestHost(r)
127 if err != nil {
128 err := fmt.Errorf(`cloudflared recieved a warp-routing request with an empty host value: %w`, err)
129 c.log.Error().Err(err)
130 respWriter.WriteErrorResponse()
131 }
132
133 rws := NewHTTPResponseReadWriterAcker(respWriter, r)
134 if err := c.config.OriginProxy.ProxyTCP(r.Context(), rws, &TCPRequest{
135 Dest: host,
136 CFRay: FindCfRayHeader(r),
137 LBProbe: IsLBProbeRequest(r),
138 }); err != nil {
139 respWriter.WriteErrorResponse()
140 }
141
142 default:
143 err := fmt.Errorf("Received unknown connection type: %s", connType)
144 c.log.Error().Err(err)
145 respWriter.WriteErrorResponse()
146 }
147}
148
149func (c *HTTP2Connection) close() {
150 // Wait for all serve HTTP handlers to return
151 c.activeRequestsWG.Wait()
152 c.conn.Close()
153}
154
155type http2RespWriter struct {
156 r io.Reader
157 w http.ResponseWriter
158 flusher http.Flusher
159 shouldFlush bool
160}
161
162func NewHTTP2RespWriter(r *http.Request, w http.ResponseWriter, connType Type) (*http2RespWriter, error) {
163 flusher, isFlusher := w.(http.Flusher)
164 if !isFlusher {
165 respWriter := &http2RespWriter{
166 r: r.Body,
167 w: w,
168 }
169 respWriter.WriteErrorResponse()
170 return nil, fmt.Errorf("%T doesn't implement http.Flusher", w)
171 }
172
173 return &http2RespWriter{
174 r: r.Body,
175 w: w,
176 flusher: flusher,
177 shouldFlush: connType.shouldFlush(),
178 }, nil
179}
180
181func (rp *http2RespWriter) WriteRespHeaders(status int, header http.Header) error {
182 dest := rp.w.Header()
183 userHeaders := make(http.Header, len(header))
184 for name, values := range header {
185 // Since these are http2 headers, they're required to be lowercase
186 h2name := strings.ToLower(name)
187 if h2name == "content-length" {
188 // This header has meaning in HTTP/2 and will be used by the edge,
189 // so it should be sent as an HTTP/2 response header.
190 dest[name] = values
191 // Since these are http2 headers, they're required to be lowercase
192 } else if !IsControlResponseHeader(h2name) || IsWebsocketClientHeader(h2name) {
193 // User headers, on the other hand, must all be serialized so that
194 // HTTP/2 header validation won't be applied to HTTP/1 header values
195 userHeaders[name] = values
196 }
197 }
198
199 // Perform user header serialization and set them in the single header
200 dest.Set(CanonicalResponseUserHeaders, SerializeHeaders(userHeaders))
201 rp.setResponseMetaHeader(responseMetaHeaderOrigin)
202 // HTTP2 removes support for 101 Switching Protocols https://tools.ietf.org/html/rfc7540#section-8.1.1
203 if status == http.StatusSwitchingProtocols {
204 status = http.StatusOK
205 }
206 rp.w.WriteHeader(status)
207 if IsServerSentEvent(header) {
208 rp.shouldFlush = true
209 }
210 if rp.shouldFlush {
211 rp.flusher.Flush()
212 }
213 return nil
214}
215
216func (rp *http2RespWriter) WriteErrorResponse() {
217 rp.setResponseMetaHeader(responseMetaHeaderCfd)
218 rp.w.WriteHeader(http.StatusBadGateway)
219}
220
221func (rp *http2RespWriter) setResponseMetaHeader(value string) {
222 rp.w.Header().Set(CanonicalResponseMetaHeader, value)
223}
224
225func (rp *http2RespWriter) Read(p []byte) (n int, err error) {
226 return rp.r.Read(p)
227}
228
229func (rp *http2RespWriter) Write(p []byte) (n int, err error) {
230 defer func() {
231 // Implementer of OriginClient should make sure it doesn't write to the connection after Proxy returns
232 // Register a recover routine just in case.
233 if r := recover(); r != nil {
234 println(fmt.Sprintf("Recover from http2 response writer panic, error %s", debug.Stack()))
235 }
236 }()
237 n, err = rp.w.Write(p)
238 if err == nil && rp.shouldFlush {
239 rp.flusher.Flush()
240 }
241 return n, err
242}
243
244func (rp *http2RespWriter) Close() error {
245 return nil
246}
247
248func determineHTTP2Type(r *http.Request) Type {
249 switch {
250 case isWebsocketUpgrade(r):
251 return TypeWebsocket
252 case IsTCPStream(r):
253 return TypeTCP
254 case isControlStreamUpgrade(r):
255 return TypeControlStream
256 default:
257 return TypeHTTP
258 }
259}
260
261func handleMissingRequestParts(connType Type, r *http.Request) {
262 if connType == TypeHTTP {
263 // http library has no guarantees that we receive a filled URL. If not, then we fill it, as we reuse the request
264 // for proxying. We use the same values as we used to in h2mux. For proxying they should not matter since we
265 // control the dialer on every egress proxied.
266 if len(r.URL.Scheme) == 0 {
267 r.URL.Scheme = "http"
268 }
269 if len(r.URL.Host) == 0 {
270 r.URL.Host = "localhost:8080"
271 }
272 }
273}
274
275func isControlStreamUpgrade(r *http.Request) bool {
276 return r.Header.Get(InternalUpgradeHeader) == ControlStreamUpgrade
277}
278
279func isWebsocketUpgrade(r *http.Request) bool {
280 return r.Header.Get(InternalUpgradeHeader) == WebsocketUpgrade
281}
282
283// IsTCPStream discerns if the connection request needs a tcp stream proxy.
284func IsTCPStream(r *http.Request) bool {
285 return r.Header.Get(InternalTCPProxySrcHeader) != ""
286}
287
288func stripWebsocketUpgradeHeader(r *http.Request) {
289 r.Header.Del(InternalUpgradeHeader)
290}
291
292// getRequestHost returns the host of the http.Request.
293func getRequestHost(r *http.Request) (string, error) {
294 if r.Host != "" {
295 return r.Host, nil
296 }
297 if r.URL != nil {
298 return r.URL.Host, nil
299 }
300 return "", errors.New("host not set in incoming request")
301}
302