cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2020.11.6

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/http2.go

221lines · modecode

1package connection
2
3import (
4 "context"
5 "errors"
6 "io"
7 "math"
8 "net"
9 "net/http"
10 "strings"
11 "sync"
12
13 "github.com/cloudflare/cloudflared/h2mux"
14 "github.com/cloudflare/cloudflared/logger"
15 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
16
17 "golang.org/x/net/http2"
18)
19
20const (
21 internalUpgradeHeader = "Cf-Cloudflared-Proxy-Connection-Upgrade"
22 websocketUpgrade = "websocket"
23 controlStreamUpgrade = "control-stream"
24)
25
26var (
27 errNotFlusher = errors.New("ResponseWriter doesn't implement http.Flusher")
28)
29
30type http2Connection struct {
31 conn net.Conn
32 server *http2.Server
33 config *Config
34 namedTunnel *NamedTunnelConfig
35 connOptions *tunnelpogs.ConnectionOptions
36 observer *Observer
37 connIndexStr string
38 connIndex uint8
39 wg *sync.WaitGroup
40 // newRPCClientFunc allows us to mock RPCs during testing
41 newRPCClientFunc func(context.Context, io.ReadWriteCloser, logger.Service) NamedTunnelRPCClient
42 connectedFuse ConnectedFuse
43}
44
45func NewHTTP2Connection(
46 conn net.Conn,
47 config *Config,
48 namedTunnelConfig *NamedTunnelConfig,
49 connOptions *tunnelpogs.ConnectionOptions,
50 observer *Observer,
51 connIndex uint8,
52 connectedFuse ConnectedFuse,
53) *http2Connection {
54 return &http2Connection{
55 conn: conn,
56 server: &http2.Server{
57 MaxConcurrentStreams: math.MaxUint32,
58 },
59 config: config,
60 namedTunnel: namedTunnelConfig,
61 connOptions: connOptions,
62 observer: observer,
63 connIndexStr: uint8ToString(connIndex),
64 connIndex: connIndex,
65 wg: &sync.WaitGroup{},
66 newRPCClientFunc: newRegistrationRPCClient,
67 connectedFuse: connectedFuse,
68 }
69}
70
71func (c *http2Connection) Serve(ctx context.Context) {
72 go func() {
73 <-ctx.Done()
74 c.close()
75 }()
76 c.server.ServeConn(c.conn, &http2.ServeConnOpts{
77 Context: ctx,
78 Handler: c,
79 })
80}
81
82func (c *http2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
83 c.wg.Add(1)
84 defer c.wg.Done()
85
86 respWriter := &http2RespWriter{
87 r: r.Body,
88 w: w,
89 }
90 flusher, isFlusher := w.(http.Flusher)
91 if !isFlusher {
92 c.observer.Errorf("%T doesn't implement http.Flusher", w)
93 respWriter.WriteErrorResponse()
94 return
95 }
96 respWriter.flusher = flusher
97 var err error
98 if isControlStreamUpgrade(r) {
99 respWriter.shouldFlush = true
100 err = c.serveControlStream(r.Context(), respWriter)
101 } else if isWebsocketUpgrade(r) {
102 respWriter.shouldFlush = true
103 stripWebsocketUpgradeHeader(r)
104 err = c.config.OriginClient.Proxy(respWriter, r, true)
105 } else {
106 err = c.config.OriginClient.Proxy(respWriter, r, false)
107 }
108
109 if err != nil {
110 respWriter.WriteErrorResponse()
111 }
112}
113
114func (c *http2Connection) serveControlStream(ctx context.Context, respWriter *http2RespWriter) error {
115 rpcClient := c.newRPCClientFunc(ctx, respWriter, c.observer)
116 defer rpcClient.Close()
117
118 if err := rpcClient.RegisterConnection(ctx, c.namedTunnel, c.connOptions, c.connIndex, c.observer); err != nil {
119 return err
120 }
121 c.connectedFuse.Connected()
122
123 <-ctx.Done()
124 rpcClient.GracefulShutdown(ctx, c.config.GracePeriod)
125 return nil
126}
127
128func (c *http2Connection) close() {
129 // Wait for all serve HTTP handlers to return
130 c.wg.Wait()
131 c.conn.Close()
132}
133
134type http2RespWriter struct {
135 r io.Reader
136 w http.ResponseWriter
137 flusher http.Flusher
138 shouldFlush bool
139}
140
141func (rp *http2RespWriter) WriteRespHeaders(resp *http.Response) error {
142 dest := rp.w.Header()
143 userHeaders := make(http.Header, len(resp.Header))
144 for header, values := range resp.Header {
145 // Since these are http2 headers, they're required to be lowercase
146 h2name := strings.ToLower(header)
147 for _, v := range values {
148 if h2name == "content-length" {
149 // This header has meaning in HTTP/2 and will be used by the edge,
150 // so it should be sent as an HTTP/2 response header.
151 dest.Add(h2name, v)
152 // Since these are http2 headers, they're required to be lowercase
153 } else if !h2mux.IsControlHeader(h2name) || h2mux.IsWebsocketClientHeader(h2name) {
154 // User headers, on the other hand, must all be serialized so that
155 // HTTP/2 header validation won't be applied to HTTP/1 header values
156 userHeaders.Add(h2name, v)
157 }
158 }
159 }
160
161 // Perform user header serialization and set them in the single header
162 dest.Set(canonicalResponseUserHeadersField, h2mux.SerializeHeaders(userHeaders))
163 rp.setResponseMetaHeader(responseMetaHeaderOrigin)
164 status := resp.StatusCode
165 // HTTP2 removes support for 101 Switching Protocols https://tools.ietf.org/html/rfc7540#section-8.1.1
166 if status == http.StatusSwitchingProtocols {
167 status = http.StatusOK
168 }
169 rp.w.WriteHeader(status)
170 if isServerSentEvent(resp.Header) {
171 rp.shouldFlush = true
172 }
173 if rp.shouldFlush {
174 rp.flusher.Flush()
175 }
176 return nil
177}
178
179func (rp *http2RespWriter) WriteErrorResponse() {
180 rp.setResponseMetaHeader(responseMetaHeaderCfd)
181 rp.w.WriteHeader(http.StatusBadGateway)
182}
183
184func (rp *http2RespWriter) setResponseMetaHeader(value string) {
185 rp.w.Header().Set(canonicalResponseMetaHeaderField, value)
186}
187
188func (rp *http2RespWriter) Read(p []byte) (n int, err error) {
189 return rp.r.Read(p)
190}
191
192func (rp *http2RespWriter) Write(p []byte) (n int, err error) {
193 defer func() {
194 // Implementer of OriginClient should make sure it doesn't write to the connection after Proxy returns
195 // Register a recover routine just in case.
196 if r := recover(); r != nil {
197 println("Recover from http2 response writer panic, error", r)
198 }
199 }()
200 n, err = rp.w.Write(p)
201 if err == nil && rp.shouldFlush {
202 rp.flusher.Flush()
203 }
204 return n, err
205}
206
207func (rp *http2RespWriter) Close() error {
208 return nil
209}
210
211func isControlStreamUpgrade(r *http.Request) bool {
212 return strings.ToLower(r.Header.Get(internalUpgradeHeader)) == controlStreamUpgrade
213}
214
215func isWebsocketUpgrade(r *http.Request) bool {
216 return strings.ToLower(r.Header.Get(internalUpgradeHeader)) == websocketUpgrade
217}
218
219func stripWebsocketUpgradeHeader(r *http.Request) {
220 r.Header.Del(internalUpgradeHeader)
221}