cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2020.11.6

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/http2_test.go

303lines · modecode

1package connection
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "io/ioutil"
8 "net"
9 "net/http"
10 "net/http/httptest"
11 "sync"
12 "testing"
13 "time"
14
15 "github.com/cloudflare/cloudflared/logger"
16 "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
17 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
18 "github.com/gobwas/ws/wsutil"
19 "github.com/stretchr/testify/require"
20 "golang.org/x/net/http2"
21)
22
23var (
24 testTransport = http2.Transport{}
25)
26
27func newTestHTTP2Connection() (*http2Connection, net.Conn) {
28 edgeConn, originConn := net.Pipe()
29 var connIndex = uint8(0)
30 return NewHTTP2Connection(
31 originConn,
32 testConfig,
33 &NamedTunnelConfig{},
34 &pogs.ConnectionOptions{},
35 testObserver,
36 connIndex,
37 mockConnectedFuse{},
38 ), edgeConn
39}
40
41func TestServeHTTP(t *testing.T) {
42 tests := []testRequest{
43 {
44 name: "ok",
45 endpoint: "ok",
46 expectedStatus: http.StatusOK,
47 expectedBody: []byte(http.StatusText(http.StatusOK)),
48 },
49 {
50 name: "large_file",
51 endpoint: "large_file",
52 expectedStatus: http.StatusOK,
53 expectedBody: testLargeResp,
54 },
55 {
56 name: "Bad request",
57 endpoint: "400",
58 expectedStatus: http.StatusBadRequest,
59 expectedBody: []byte(http.StatusText(http.StatusBadRequest)),
60 },
61 {
62 name: "Internal server error",
63 endpoint: "500",
64 expectedStatus: http.StatusInternalServerError,
65 expectedBody: []byte(http.StatusText(http.StatusInternalServerError)),
66 },
67 {
68 name: "Proxy error",
69 endpoint: "error",
70 expectedStatus: http.StatusBadGateway,
71 expectedBody: nil,
72 isProxyError: true,
73 },
74 }
75
76 http2Conn, edgeConn := newTestHTTP2Connection()
77
78 ctx, cancel := context.WithCancel(context.Background())
79 var wg sync.WaitGroup
80 wg.Add(1)
81 go func() {
82 defer wg.Done()
83 http2Conn.Serve(ctx)
84 }()
85
86 edgeHTTP2Conn, err := testTransport.NewClientConn(edgeConn)
87 require.NoError(t, err)
88
89 for _, test := range tests {
90 endpoint := fmt.Sprintf("http://localhost:8080/%s", test.endpoint)
91 req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
92 require.NoError(t, err)
93
94 resp, err := edgeHTTP2Conn.RoundTrip(req)
95 require.NoError(t, err)
96 require.Equal(t, test.expectedStatus, resp.StatusCode)
97 if test.expectedBody != nil {
98 respBody, err := ioutil.ReadAll(resp.Body)
99 require.NoError(t, err)
100 require.Equal(t, test.expectedBody, respBody)
101 }
102 if test.isProxyError {
103 require.Equal(t, responseMetaHeaderCfd, resp.Header.Get(responseMetaHeaderField))
104 } else {
105 require.Equal(t, responseMetaHeaderOrigin, resp.Header.Get(responseMetaHeaderField))
106 }
107 }
108 cancel()
109 wg.Wait()
110}
111
112type mockNamedTunnelRPCClient struct {
113 registered chan struct{}
114 unregistered chan struct{}
115}
116
117func (mc mockNamedTunnelRPCClient) RegisterConnection(
118 c context.Context,
119 config *NamedTunnelConfig,
120 options *tunnelpogs.ConnectionOptions,
121 connIndex uint8,
122 observer *Observer,
123) error {
124 close(mc.registered)
125 return nil
126}
127
128func (mc mockNamedTunnelRPCClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) {
129 close(mc.unregistered)
130}
131
132func (mockNamedTunnelRPCClient) Close() {}
133
134type mockRPCClientFactory struct {
135 registered chan struct{}
136 unregistered chan struct{}
137}
138
139func (mf *mockRPCClientFactory) newMockRPCClient(context.Context, io.ReadWriteCloser, logger.Service) NamedTunnelRPCClient {
140 return mockNamedTunnelRPCClient{
141 registered: mf.registered,
142 unregistered: mf.unregistered,
143 }
144}
145
146type wsRespWriter struct {
147 *httptest.ResponseRecorder
148 readPipe *io.PipeReader
149 writePipe *io.PipeWriter
150}
151
152func newWSRespWriter() *wsRespWriter {
153 readPipe, writePipe := io.Pipe()
154 return &wsRespWriter{
155 httptest.NewRecorder(),
156 readPipe,
157 writePipe,
158 }
159}
160
161func (w *wsRespWriter) RespBody() io.ReadWriter {
162 return nowriter{w.readPipe}
163}
164
165func (w *wsRespWriter) Write(data []byte) (n int, err error) {
166 return w.writePipe.Write(data)
167}
168
169func TestServeWS(t *testing.T) {
170 http2Conn, _ := newTestHTTP2Connection()
171
172 ctx, cancel := context.WithCancel(context.Background())
173 var wg sync.WaitGroup
174 wg.Add(1)
175 go func() {
176 defer wg.Done()
177 http2Conn.Serve(ctx)
178 }()
179
180 respWriter := newWSRespWriter()
181 readPipe, writePipe := io.Pipe()
182
183 req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:8080/ws", readPipe)
184 require.NoError(t, err)
185 req.Header.Set(internalUpgradeHeader, websocketUpgrade)
186
187 wg.Add(1)
188 go func() {
189 defer wg.Done()
190 http2Conn.ServeHTTP(respWriter, req)
191 }()
192
193 data := []byte("test websocket")
194 err = wsutil.WriteClientText(writePipe, data)
195 require.NoError(t, err)
196
197 respBody, err := wsutil.ReadServerText(respWriter.RespBody())
198 require.NoError(t, err)
199 require.Equal(t, data, respBody, fmt.Sprintf("Expect %s, got %s", string(data), string(respBody)))
200
201 cancel()
202 resp := respWriter.Result()
203 // http2RespWriter should rewrite status 101 to 200
204 require.Equal(t, http.StatusOK, resp.StatusCode)
205 require.Equal(t, responseMetaHeaderOrigin, resp.Header.Get(responseMetaHeaderField))
206
207 wg.Wait()
208}
209
210func TestServeControlStream(t *testing.T) {
211 http2Conn, edgeConn := newTestHTTP2Connection()
212
213 rpcClientFactory := mockRPCClientFactory{
214 registered: make(chan struct{}),
215 unregistered: make(chan struct{}),
216 }
217 http2Conn.newRPCClientFunc = rpcClientFactory.newMockRPCClient
218
219 ctx, cancel := context.WithCancel(context.Background())
220 var wg sync.WaitGroup
221 wg.Add(1)
222 go func() {
223 defer wg.Done()
224 http2Conn.Serve(ctx)
225 }()
226
227 req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:8080/", nil)
228 require.NoError(t, err)
229 req.Header.Set(internalUpgradeHeader, controlStreamUpgrade)
230
231 edgeHTTP2Conn, err := testTransport.NewClientConn(edgeConn)
232 require.NoError(t, err)
233
234 wg.Add(1)
235 go func() {
236 defer wg.Done()
237 edgeHTTP2Conn.RoundTrip(req)
238 }()
239
240 <-rpcClientFactory.registered
241 cancel()
242 <-rpcClientFactory.unregistered
243
244 wg.Wait()
245}
246
247func benchmarkServeHTTP(b *testing.B, test testRequest) {
248 http2Conn, edgeConn := newTestHTTP2Connection()
249
250 ctx, cancel := context.WithCancel(context.Background())
251 var wg sync.WaitGroup
252 wg.Add(1)
253 go func() {
254 defer wg.Done()
255 http2Conn.Serve(ctx)
256 }()
257
258 endpoint := fmt.Sprintf("http://localhost:8080/%s", test.endpoint)
259 req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
260 require.NoError(b, err)
261
262 edgeHTTP2Conn, err := testTransport.NewClientConn(edgeConn)
263 require.NoError(b, err)
264
265 b.ResetTimer()
266 for i := 0; i < b.N; i++ {
267 b.StartTimer()
268 resp, err := edgeHTTP2Conn.RoundTrip(req)
269 b.StopTimer()
270 require.NoError(b, err)
271 require.Equal(b, test.expectedStatus, resp.StatusCode)
272 if test.expectedBody != nil {
273 respBody, err := ioutil.ReadAll(resp.Body)
274 require.NoError(b, err)
275 require.Equal(b, test.expectedBody, respBody)
276 }
277 resp.Body.Close()
278 }
279
280 cancel()
281 wg.Wait()
282}
283func BenchmarkServeHTTPSimple(b *testing.B) {
284 test := testRequest{
285 name: "ok",
286 endpoint: "ok",
287 expectedStatus: http.StatusOK,
288 expectedBody: []byte(http.StatusText(http.StatusOK)),
289 }
290
291 benchmarkServeHTTP(b, test)
292}
293
294func BenchmarkServeHTTPLargeFile(b *testing.B) {
295 test := testRequest{
296 name: "large_file",
297 endpoint: "large_file",
298 expectedStatus: http.StatusOK,
299 expectedBody: testLargeResp,
300 }
301
302 benchmarkServeHTTP(b, test)
303}