cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2021.12.1

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/h2mux_test.go

301lines · modecode

1package connection
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net"
8 "net/http"
9 "strconv"
10 "sync"
11 "testing"
12 "time"
13
14 "github.com/gobwas/ws/wsutil"
15 "github.com/rs/zerolog"
16 "github.com/stretchr/testify/assert"
17 "github.com/stretchr/testify/require"
18
19 "github.com/cloudflare/cloudflared/h2mux"
20)
21
22var (
23 testMuxerConfig = &MuxerConfig{
24 HeartbeatInterval: time.Second * 5,
25 MaxHeartbeats: 5,
26 CompressionSetting: 0,
27 MetricsUpdateFreq: time.Second * 5,
28 }
29)
30
31func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) {
32 edgeConn, originConn := net.Pipe()
33 edgeMuxChan := make(chan *h2mux.Muxer)
34 go func() {
35 edgeMuxConfig := h2mux.MuxerConfig{
36 Log: &log,
37 Handler: h2mux.MuxedStreamFunc(func(stream *h2mux.MuxedStream) error {
38 // we only expect RPC traffic in client->edge direction, provide minimal support for mocking
39 require.True(t, stream.IsRPCStream())
40 return stream.WriteHeaders([]h2mux.Header{
41 {Name: ":status", Value: "200"},
42 })
43 }),
44 }
45 edgeMux, err := h2mux.Handshake(edgeConn, edgeConn, edgeMuxConfig, h2mux.ActiveStreams)
46 require.NoError(t, err)
47 edgeMuxChan <- edgeMux
48 }()
49 var connIndex = uint8(0)
50 testObserver := NewObserver(&log, &log, false)
51 h2muxConn, err, _ := NewH2muxConnection(testConfig, testMuxerConfig, originConn, connIndex, testObserver, nil)
52 require.NoError(t, err)
53 return h2muxConn, <-edgeMuxChan
54}
55
56func TestServeStreamHTTP(t *testing.T) {
57 tests := []testRequest{
58 {
59 name: "ok",
60 endpoint: "/ok",
61 expectedStatus: http.StatusOK,
62 expectedBody: []byte(http.StatusText(http.StatusOK)),
63 },
64 {
65 name: "large_file",
66 endpoint: "/large_file",
67 expectedStatus: http.StatusOK,
68 expectedBody: testLargeResp,
69 },
70 {
71 name: "Bad request",
72 endpoint: "/400",
73 expectedStatus: http.StatusBadRequest,
74 expectedBody: []byte(http.StatusText(http.StatusBadRequest)),
75 },
76 {
77 name: "Internal server error",
78 endpoint: "/500",
79 expectedStatus: http.StatusInternalServerError,
80 expectedBody: []byte(http.StatusText(http.StatusInternalServerError)),
81 },
82 {
83 name: "Proxy error",
84 endpoint: "/error",
85 expectedStatus: http.StatusBadGateway,
86 expectedBody: nil,
87 isProxyError: true,
88 },
89 }
90
91 ctx, cancel := context.WithCancel(context.Background())
92 h2muxConn, edgeMux := newH2MuxConnection(t)
93
94 var wg sync.WaitGroup
95 wg.Add(2)
96 go func() {
97 defer wg.Done()
98 _ = edgeMux.Serve(ctx)
99 }()
100 go func() {
101 defer wg.Done()
102 err := h2muxConn.serveMuxer(ctx)
103 require.Error(t, err)
104 }()
105
106 for _, test := range tests {
107 headers := []h2mux.Header{
108 {
109 Name: ":path",
110 Value: test.endpoint,
111 },
112 }
113 stream, err := edgeMux.OpenStream(ctx, headers, nil)
114 require.NoError(t, err)
115 require.True(t, hasHeader(stream, ":status", strconv.Itoa(test.expectedStatus)))
116
117 if test.isProxyError {
118 assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderCfd))
119 } else {
120 assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
121 body := make([]byte, len(test.expectedBody))
122 _, err = stream.Read(body)
123 require.NoError(t, err)
124 require.Equal(t, test.expectedBody, body)
125 }
126 }
127 cancel()
128 wg.Wait()
129}
130
131func TestServeStreamWS(t *testing.T) {
132 ctx, cancel := context.WithCancel(context.Background())
133 h2muxConn, edgeMux := newH2MuxConnection(t)
134
135 var wg sync.WaitGroup
136 wg.Add(2)
137 go func() {
138 defer wg.Done()
139 edgeMux.Serve(ctx)
140 }()
141 go func() {
142 defer wg.Done()
143 err := h2muxConn.serveMuxer(ctx)
144 require.Error(t, err)
145 }()
146
147 headers := []h2mux.Header{
148 {
149 Name: ":path",
150 Value: "/ws",
151 },
152 {
153 Name: "connection",
154 Value: "upgrade",
155 },
156 {
157 Name: "upgrade",
158 Value: "websocket",
159 },
160 }
161
162 readPipe, writePipe := io.Pipe()
163 stream, err := edgeMux.OpenStream(ctx, headers, readPipe)
164 require.NoError(t, err)
165
166 require.True(t, hasHeader(stream, ":status", strconv.Itoa(http.StatusSwitchingProtocols)))
167 assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
168
169 data := []byte("test websocket")
170 err = wsutil.WriteClientText(writePipe, data)
171 require.NoError(t, err)
172
173 respBody, err := wsutil.ReadServerText(stream)
174 require.NoError(t, err)
175 require.Equal(t, data, respBody, fmt.Sprintf("Expect %s, got %s", string(data), string(respBody)))
176
177 cancel()
178 wg.Wait()
179}
180
181func TestGracefulShutdownH2Mux(t *testing.T) {
182 ctx, cancel := context.WithCancel(context.Background())
183 defer cancel()
184
185 h2muxConn, edgeMux := newH2MuxConnection(t)
186
187 shutdownC := make(chan struct{})
188 unregisteredC := make(chan struct{})
189 h2muxConn.gracefulShutdownC = shutdownC
190 h2muxConn.newRPCClientFunc = func(_ context.Context, _ io.ReadWriteCloser, _ *zerolog.Logger) NamedTunnelRPCClient {
191 return &mockNamedTunnelRPCClient{
192 registered: nil,
193 unregistered: unregisteredC,
194 }
195 }
196
197 var wg sync.WaitGroup
198 wg.Add(3)
199 go func() {
200 defer wg.Done()
201 _ = edgeMux.Serve(ctx)
202 }()
203 go func() {
204 defer wg.Done()
205 _ = h2muxConn.serveMuxer(ctx)
206 }()
207
208 go func() {
209 defer wg.Done()
210 h2muxConn.controlLoop(ctx, &mockConnectedFuse{}, true)
211 }()
212
213 time.Sleep(100 * time.Millisecond)
214 close(shutdownC)
215
216 select {
217 case <-unregisteredC:
218 break // ok
219 case <-time.Tick(time.Second):
220 assert.Fail(t, "timed out waiting for control loop to unregister")
221 }
222
223 cancel()
224 wg.Wait()
225
226 assert.True(t, h2muxConn.stoppedGracefully)
227 assert.Nil(t, h2muxConn.gracefulShutdownC)
228}
229
230func hasHeader(stream *h2mux.MuxedStream, name, val string) bool {
231 for _, header := range stream.Headers {
232 if header.Name == name && header.Value == val {
233 return true
234 }
235 }
236 return false
237}
238
239func benchmarkServeStreamHTTPSimple(b *testing.B, test testRequest) {
240 ctx, cancel := context.WithCancel(context.Background())
241 h2muxConn, edgeMux := newH2MuxConnection(b)
242
243 var wg sync.WaitGroup
244 wg.Add(2)
245 go func() {
246 defer wg.Done()
247 edgeMux.Serve(ctx)
248 }()
249 go func() {
250 defer wg.Done()
251 err := h2muxConn.serveMuxer(ctx)
252 require.Error(b, err)
253 }()
254
255 headers := []h2mux.Header{
256 {
257 Name: ":path",
258 Value: test.endpoint,
259 },
260 }
261
262 body := make([]byte, len(test.expectedBody))
263 b.ResetTimer()
264 for i := 0; i < b.N; i++ {
265 b.StartTimer()
266 stream, openstreamErr := edgeMux.OpenStream(ctx, headers, nil)
267 _, readBodyErr := stream.Read(body)
268 b.StopTimer()
269
270 require.NoError(b, openstreamErr)
271 assert.True(b, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
272 require.True(b, hasHeader(stream, ":status", strconv.Itoa(http.StatusOK)))
273 require.NoError(b, readBodyErr)
274 require.Equal(b, test.expectedBody, body)
275 }
276
277 cancel()
278 wg.Wait()
279}
280
281func BenchmarkServeStreamHTTPSimple(b *testing.B) {
282 test := testRequest{
283 name: "ok",
284 endpoint: "/ok",
285 expectedStatus: http.StatusOK,
286 expectedBody: []byte(http.StatusText(http.StatusOK)),
287 }
288
289 benchmarkServeStreamHTTPSimple(b, test)
290}
291
292func BenchmarkServeStreamHTTPLargeFile(b *testing.B) {
293 test := testRequest{
294 name: "large_file",
295 endpoint: "/large_file",
296 expectedStatus: http.StatusOK,
297 expectedBody: testLargeResp,
298 }
299
300 benchmarkServeStreamHTTPSimple(b, test)
301}
302