cloudflare/cloudflared
Publicmirrored from https://github.com/cloudflare/cloudflaredAvailable
connection/http2.go
301lines · modecode
| 1 | package connection |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "fmt" |
| 6 | "io" |
| 7 | "net" |
| 8 | "net/http" |
| 9 | "runtime/debug" |
| 10 | "strings" |
| 11 | "sync" |
| 12 | |
| 13 | "github.com/pkg/errors" |
| 14 | "github.com/rs/zerolog" |
| 15 | "golang.org/x/net/http2" |
| 16 | |
| 17 | tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" |
| 18 | ) |
| 19 | |
| 20 | // note: these constants are exported so we can reuse them in the edge-side code |
| 21 | const ( |
| 22 | InternalUpgradeHeader = "Cf-Cloudflared-Proxy-Connection-Upgrade" |
| 23 | InternalTCPProxySrcHeader = "Cf-Cloudflared-Proxy-Src" |
| 24 | WebsocketUpgrade = "websocket" |
| 25 | ControlStreamUpgrade = "control-stream" |
| 26 | ) |
| 27 | |
| 28 | var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed") |
| 29 | |
| 30 | // HTTP2Connection represents a net.Conn that uses HTTP2 frames to proxy traffic from the edge to cloudflared on the |
| 31 | // origin. |
| 32 | type HTTP2Connection struct { |
| 33 | conn net.Conn |
| 34 | server *http2.Server |
| 35 | config *Config |
| 36 | connOptions *tunnelpogs.ConnectionOptions |
| 37 | observer *Observer |
| 38 | connIndex uint8 |
| 39 | // newRPCClientFunc allows us to mock RPCs during testing |
| 40 | newRPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient |
| 41 | |
| 42 | log *zerolog.Logger |
| 43 | activeRequestsWG sync.WaitGroup |
| 44 | controlStreamHandler ControlStreamHandler |
| 45 | stoppedGracefully bool |
| 46 | controlStreamErr error // result of running control stream handler |
| 47 | } |
| 48 | |
| 49 | // NewHTTP2Connection returns a new instance of HTTP2Connection. |
| 50 | func NewHTTP2Connection( |
| 51 | conn net.Conn, |
| 52 | config *Config, |
| 53 | connOptions *tunnelpogs.ConnectionOptions, |
| 54 | observer *Observer, |
| 55 | connIndex uint8, |
| 56 | controlStreamHandler ControlStreamHandler, |
| 57 | log *zerolog.Logger, |
| 58 | ) *HTTP2Connection { |
| 59 | return &HTTP2Connection{ |
| 60 | conn: conn, |
| 61 | server: &http2.Server{ |
| 62 | MaxConcurrentStreams: MaxConcurrentStreams, |
| 63 | }, |
| 64 | config: config, |
| 65 | connOptions: connOptions, |
| 66 | observer: observer, |
| 67 | connIndex: connIndex, |
| 68 | newRPCClientFunc: newRegistrationRPCClient, |
| 69 | controlStreamHandler: controlStreamHandler, |
| 70 | log: log, |
| 71 | } |
| 72 | } |
| 73 | |
| 74 | // Serve serves an HTTP2 server that the edge can talk to. |
| 75 | func (c *HTTP2Connection) Serve(ctx context.Context) error { |
| 76 | go func() { |
| 77 | <-ctx.Done() |
| 78 | c.close() |
| 79 | }() |
| 80 | c.server.ServeConn(c.conn, &http2.ServeConnOpts{ |
| 81 | Context: ctx, |
| 82 | Handler: c, |
| 83 | }) |
| 84 | |
| 85 | switch { |
| 86 | case c.controlStreamHandler.IsStopped(): |
| 87 | return nil |
| 88 | case c.controlStreamErr != nil: |
| 89 | return c.controlStreamErr |
| 90 | default: |
| 91 | c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Lost connection with the edge") |
| 92 | return errEdgeConnectionClosed |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 97 | c.activeRequestsWG.Add(1) |
| 98 | defer c.activeRequestsWG.Done() |
| 99 | |
| 100 | connType := determineHTTP2Type(r) |
| 101 | handleMissingRequestParts(connType, r) |
| 102 | |
| 103 | respWriter, err := NewHTTP2RespWriter(r, w, connType) |
| 104 | if err != nil { |
| 105 | c.observer.log.Error().Msg(err.Error()) |
| 106 | return |
| 107 | } |
| 108 | |
| 109 | switch connType { |
| 110 | case TypeControlStream: |
| 111 | if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, true); err != nil { |
| 112 | c.controlStreamErr = err |
| 113 | c.log.Error().Err(err) |
| 114 | respWriter.WriteErrorResponse() |
| 115 | } |
| 116 | |
| 117 | case TypeWebsocket, TypeHTTP: |
| 118 | stripWebsocketUpgradeHeader(r) |
| 119 | if err := c.config.OriginProxy.ProxyHTTP(respWriter, r, connType == TypeWebsocket); err != nil { |
| 120 | err := fmt.Errorf("Failed to proxy HTTP: %w", err) |
| 121 | c.log.Error().Err(err) |
| 122 | respWriter.WriteErrorResponse() |
| 123 | } |
| 124 | |
| 125 | case TypeTCP: |
| 126 | host, err := getRequestHost(r) |
| 127 | if err != nil { |
| 128 | err := fmt.Errorf(`cloudflared recieved a warp-routing request with an empty host value: %w`, err) |
| 129 | c.log.Error().Err(err) |
| 130 | respWriter.WriteErrorResponse() |
| 131 | } |
| 132 | |
| 133 | rws := NewHTTPResponseReadWriterAcker(respWriter, r) |
| 134 | if err := c.config.OriginProxy.ProxyTCP(r.Context(), rws, &TCPRequest{ |
| 135 | Dest: host, |
| 136 | CFRay: FindCfRayHeader(r), |
| 137 | LBProbe: IsLBProbeRequest(r), |
| 138 | }); err != nil { |
| 139 | respWriter.WriteErrorResponse() |
| 140 | } |
| 141 | |
| 142 | default: |
| 143 | err := fmt.Errorf("Received unknown connection type: %s", connType) |
| 144 | c.log.Error().Err(err) |
| 145 | respWriter.WriteErrorResponse() |
| 146 | } |
| 147 | } |
| 148 | |
| 149 | func (c *HTTP2Connection) close() { |
| 150 | // Wait for all serve HTTP handlers to return |
| 151 | c.activeRequestsWG.Wait() |
| 152 | c.conn.Close() |
| 153 | } |
| 154 | |
| 155 | type http2RespWriter struct { |
| 156 | r io.Reader |
| 157 | w http.ResponseWriter |
| 158 | flusher http.Flusher |
| 159 | shouldFlush bool |
| 160 | } |
| 161 | |
| 162 | func NewHTTP2RespWriter(r *http.Request, w http.ResponseWriter, connType Type) (*http2RespWriter, error) { |
| 163 | flusher, isFlusher := w.(http.Flusher) |
| 164 | if !isFlusher { |
| 165 | respWriter := &http2RespWriter{ |
| 166 | r: r.Body, |
| 167 | w: w, |
| 168 | } |
| 169 | respWriter.WriteErrorResponse() |
| 170 | return nil, fmt.Errorf("%T doesn't implement http.Flusher", w) |
| 171 | } |
| 172 | |
| 173 | return &http2RespWriter{ |
| 174 | r: r.Body, |
| 175 | w: w, |
| 176 | flusher: flusher, |
| 177 | shouldFlush: connType.shouldFlush(), |
| 178 | }, nil |
| 179 | } |
| 180 | |
| 181 | func (rp *http2RespWriter) WriteRespHeaders(status int, header http.Header) error { |
| 182 | dest := rp.w.Header() |
| 183 | userHeaders := make(http.Header, len(header)) |
| 184 | for name, values := range header { |
| 185 | // Since these are http2 headers, they're required to be lowercase |
| 186 | h2name := strings.ToLower(name) |
| 187 | if h2name == "content-length" { |
| 188 | // This header has meaning in HTTP/2 and will be used by the edge, |
| 189 | // so it should be sent as an HTTP/2 response header. |
| 190 | dest[name] = values |
| 191 | // Since these are http2 headers, they're required to be lowercase |
| 192 | } else if !IsControlResponseHeader(h2name) || IsWebsocketClientHeader(h2name) { |
| 193 | // User headers, on the other hand, must all be serialized so that |
| 194 | // HTTP/2 header validation won't be applied to HTTP/1 header values |
| 195 | userHeaders[name] = values |
| 196 | } |
| 197 | } |
| 198 | |
| 199 | // Perform user header serialization and set them in the single header |
| 200 | dest.Set(CanonicalResponseUserHeaders, SerializeHeaders(userHeaders)) |
| 201 | rp.setResponseMetaHeader(responseMetaHeaderOrigin) |
| 202 | // HTTP2 removes support for 101 Switching Protocols https://tools.ietf.org/html/rfc7540#section-8.1.1 |
| 203 | if status == http.StatusSwitchingProtocols { |
| 204 | status = http.StatusOK |
| 205 | } |
| 206 | rp.w.WriteHeader(status) |
| 207 | if IsServerSentEvent(header) { |
| 208 | rp.shouldFlush = true |
| 209 | } |
| 210 | if rp.shouldFlush { |
| 211 | rp.flusher.Flush() |
| 212 | } |
| 213 | return nil |
| 214 | } |
| 215 | |
| 216 | func (rp *http2RespWriter) WriteErrorResponse() { |
| 217 | rp.setResponseMetaHeader(responseMetaHeaderCfd) |
| 218 | rp.w.WriteHeader(http.StatusBadGateway) |
| 219 | } |
| 220 | |
| 221 | func (rp *http2RespWriter) setResponseMetaHeader(value string) { |
| 222 | rp.w.Header().Set(CanonicalResponseMetaHeader, value) |
| 223 | } |
| 224 | |
| 225 | func (rp *http2RespWriter) Read(p []byte) (n int, err error) { |
| 226 | return rp.r.Read(p) |
| 227 | } |
| 228 | |
| 229 | func (rp *http2RespWriter) Write(p []byte) (n int, err error) { |
| 230 | defer func() { |
| 231 | // Implementer of OriginClient should make sure it doesn't write to the connection after Proxy returns |
| 232 | // Register a recover routine just in case. |
| 233 | if r := recover(); r != nil { |
| 234 | println(fmt.Sprintf("Recover from http2 response writer panic, error %s", debug.Stack())) |
| 235 | } |
| 236 | }() |
| 237 | n, err = rp.w.Write(p) |
| 238 | if err == nil && rp.shouldFlush { |
| 239 | rp.flusher.Flush() |
| 240 | } |
| 241 | return n, err |
| 242 | } |
| 243 | |
| 244 | func (rp *http2RespWriter) Close() error { |
| 245 | return nil |
| 246 | } |
| 247 | |
| 248 | func determineHTTP2Type(r *http.Request) Type { |
| 249 | switch { |
| 250 | case isWebsocketUpgrade(r): |
| 251 | return TypeWebsocket |
| 252 | case IsTCPStream(r): |
| 253 | return TypeTCP |
| 254 | case isControlStreamUpgrade(r): |
| 255 | return TypeControlStream |
| 256 | default: |
| 257 | return TypeHTTP |
| 258 | } |
| 259 | } |
| 260 | |
| 261 | func handleMissingRequestParts(connType Type, r *http.Request) { |
| 262 | if connType == TypeHTTP { |
| 263 | // http library has no guarantees that we receive a filled URL. If not, then we fill it, as we reuse the request |
| 264 | // for proxying. We use the same values as we used to in h2mux. For proxying they should not matter since we |
| 265 | // control the dialer on every egress proxied. |
| 266 | if len(r.URL.Scheme) == 0 { |
| 267 | r.URL.Scheme = "http" |
| 268 | } |
| 269 | if len(r.URL.Host) == 0 { |
| 270 | r.URL.Host = "localhost:8080" |
| 271 | } |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | func isControlStreamUpgrade(r *http.Request) bool { |
| 276 | return r.Header.Get(InternalUpgradeHeader) == ControlStreamUpgrade |
| 277 | } |
| 278 | |
| 279 | func isWebsocketUpgrade(r *http.Request) bool { |
| 280 | return r.Header.Get(InternalUpgradeHeader) == WebsocketUpgrade |
| 281 | } |
| 282 | |
| 283 | // IsTCPStream discerns if the connection request needs a tcp stream proxy. |
| 284 | func IsTCPStream(r *http.Request) bool { |
| 285 | return r.Header.Get(InternalTCPProxySrcHeader) != "" |
| 286 | } |
| 287 | |
| 288 | func stripWebsocketUpgradeHeader(r *http.Request) { |
| 289 | r.Header.Del(InternalUpgradeHeader) |
| 290 | } |
| 291 | |
| 292 | // getRequestHost returns the host of the http.Request. |
| 293 | func getRequestHost(r *http.Request) (string, error) { |
| 294 | if r.Host != "" { |
| 295 | return r.Host, nil |
| 296 | } |
| 297 | if r.URL != nil { |
| 298 | return r.URL.Host, nil |
| 299 | } |
| 300 | return "", errors.New("host not set in incoming request") |
| 301 | } |
| 302 | |