cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2020.11.6

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/h2mux_test.go

242lines · modecode

1package connection
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net"
8 "net/http"
9 "strconv"
10 "sync"
11 "testing"
12 "time"
13
14 "github.com/cloudflare/cloudflared/h2mux"
15 "github.com/gobwas/ws/wsutil"
16 "github.com/stretchr/testify/assert"
17 "github.com/stretchr/testify/require"
18)
19
20var (
21 testMuxerConfig = &MuxerConfig{
22 HeartbeatInterval: time.Second * 5,
23 MaxHeartbeats: 5,
24 CompressionSetting: 0,
25 MetricsUpdateFreq: time.Second * 5,
26 }
27)
28
29func newH2MuxConnection(ctx context.Context, t require.TestingT) (*h2muxConnection, *h2mux.Muxer) {
30 edgeConn, originConn := net.Pipe()
31 edgeMuxChan := make(chan *h2mux.Muxer)
32 go func() {
33 edgeMuxConfig := h2mux.MuxerConfig{
34 Logger: testObserver,
35 }
36 edgeMux, err := h2mux.Handshake(edgeConn, edgeConn, edgeMuxConfig, h2mux.ActiveStreams)
37 require.NoError(t, err)
38 edgeMuxChan <- edgeMux
39 }()
40 var connIndex = uint8(0)
41 h2muxConn, err, _ := NewH2muxConnection(ctx, testConfig, testMuxerConfig, originConn, connIndex, testObserver)
42 require.NoError(t, err)
43 return h2muxConn, <-edgeMuxChan
44}
45
46func TestServeStreamHTTP(t *testing.T) {
47 tests := []testRequest{
48 {
49 name: "ok",
50 endpoint: "/ok",
51 expectedStatus: http.StatusOK,
52 expectedBody: []byte(http.StatusText(http.StatusOK)),
53 },
54 {
55 name: "large_file",
56 endpoint: "/large_file",
57 expectedStatus: http.StatusOK,
58 expectedBody: testLargeResp,
59 },
60 {
61 name: "Bad request",
62 endpoint: "/400",
63 expectedStatus: http.StatusBadRequest,
64 expectedBody: []byte(http.StatusText(http.StatusBadRequest)),
65 },
66 {
67 name: "Internal server error",
68 endpoint: "/500",
69 expectedStatus: http.StatusInternalServerError,
70 expectedBody: []byte(http.StatusText(http.StatusInternalServerError)),
71 },
72 {
73 name: "Proxy error",
74 endpoint: "/error",
75 expectedStatus: http.StatusBadGateway,
76 expectedBody: nil,
77 isProxyError: true,
78 },
79 }
80
81 ctx, cancel := context.WithCancel(context.Background())
82 h2muxConn, edgeMux := newH2MuxConnection(ctx, t)
83
84 var wg sync.WaitGroup
85 wg.Add(2)
86 go func() {
87 defer wg.Done()
88 edgeMux.Serve(ctx)
89 }()
90 go func() {
91 defer wg.Done()
92 err := h2muxConn.serveMuxer(ctx)
93 require.Error(t, err)
94 }()
95
96 for _, test := range tests {
97 headers := []h2mux.Header{
98 {
99 Name: ":path",
100 Value: test.endpoint,
101 },
102 }
103 stream, err := edgeMux.OpenStream(ctx, headers, nil)
104 require.NoError(t, err)
105 require.True(t, hasHeader(stream, ":status", strconv.Itoa(test.expectedStatus)))
106
107 if test.isProxyError {
108 assert.True(t, hasHeader(stream, responseMetaHeaderField, responseMetaHeaderCfd))
109 } else {
110 assert.True(t, hasHeader(stream, responseMetaHeaderField, responseMetaHeaderOrigin))
111 body := make([]byte, len(test.expectedBody))
112 _, err = stream.Read(body)
113 require.NoError(t, err)
114 require.Equal(t, test.expectedBody, body)
115 }
116 }
117 cancel()
118 wg.Wait()
119}
120
121func TestServeStreamWS(t *testing.T) {
122 ctx, cancel := context.WithCancel(context.Background())
123 h2muxConn, edgeMux := newH2MuxConnection(ctx, t)
124
125 var wg sync.WaitGroup
126 wg.Add(2)
127 go func() {
128 defer wg.Done()
129 edgeMux.Serve(ctx)
130 }()
131 go func() {
132 defer wg.Done()
133 err := h2muxConn.serveMuxer(ctx)
134 require.Error(t, err)
135 }()
136
137 headers := []h2mux.Header{
138 {
139 Name: ":path",
140 Value: "/ws",
141 },
142 {
143 Name: "connection",
144 Value: "upgrade",
145 },
146 {
147 Name: "upgrade",
148 Value: "websocket",
149 },
150 }
151
152 readPipe, writePipe := io.Pipe()
153 stream, err := edgeMux.OpenStream(ctx, headers, readPipe)
154 require.NoError(t, err)
155
156 require.True(t, hasHeader(stream, ":status", strconv.Itoa(http.StatusSwitchingProtocols)))
157 assert.True(t, hasHeader(stream, responseMetaHeaderField, responseMetaHeaderOrigin))
158
159 data := []byte("test websocket")
160 err = wsutil.WriteClientText(writePipe, data)
161 require.NoError(t, err)
162
163 respBody, err := wsutil.ReadServerText(stream)
164 require.NoError(t, err)
165 require.Equal(t, data, respBody, fmt.Sprintf("Expect %s, got %s", string(data), string(respBody)))
166
167 cancel()
168 wg.Wait()
169}
170
171func hasHeader(stream *h2mux.MuxedStream, name, val string) bool {
172 for _, header := range stream.Headers {
173 if header.Name == name && header.Value == val {
174 return true
175 }
176 }
177 return false
178}
179
180func benchmarkServeStreamHTTPSimple(b *testing.B, test testRequest) {
181 ctx, cancel := context.WithCancel(context.Background())
182 h2muxConn, edgeMux := newH2MuxConnection(ctx, b)
183
184 var wg sync.WaitGroup
185 wg.Add(2)
186 go func() {
187 defer wg.Done()
188 edgeMux.Serve(ctx)
189 }()
190 go func() {
191 defer wg.Done()
192 err := h2muxConn.serveMuxer(ctx)
193 require.Error(b, err)
194 }()
195
196 headers := []h2mux.Header{
197 {
198 Name: ":path",
199 Value: test.endpoint,
200 },
201 }
202
203 body := make([]byte, len(test.expectedBody))
204 b.ResetTimer()
205 for i := 0; i < b.N; i++ {
206 b.StartTimer()
207 stream, openstreamErr := edgeMux.OpenStream(ctx, headers, nil)
208 _, readBodyErr := stream.Read(body)
209 b.StopTimer()
210
211 require.NoError(b, openstreamErr)
212 assert.True(b, hasHeader(stream, responseMetaHeaderField, responseMetaHeaderOrigin))
213 require.True(b, hasHeader(stream, ":status", strconv.Itoa(http.StatusOK)))
214 require.NoError(b, readBodyErr)
215 require.Equal(b, test.expectedBody, body)
216 }
217
218 cancel()
219 wg.Wait()
220}
221
222func BenchmarkServeStreamHTTPSimple(b *testing.B) {
223 test := testRequest{
224 name: "ok",
225 endpoint: "/ok",
226 expectedStatus: http.StatusOK,
227 expectedBody: []byte(http.StatusText(http.StatusOK)),
228 }
229
230 benchmarkServeStreamHTTPSimple(b, test)
231}
232
233func BenchmarkServeStreamHTTPLargeFile(b *testing.B) {
234 test := testRequest{
235 name: "large_file",
236 endpoint: "/large_file",
237 expectedStatus: http.StatusOK,
238 expectedBody: testLargeResp,
239 }
240
241 benchmarkServeStreamHTTPSimple(b, test)
242}