cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2020.3.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

h2mux/activestreammap_test.go

195lines · modecode

1package h2mux
2
3import (
4 "sync"
5 "testing"
6
7 "github.com/stretchr/testify/assert"
8)
9
10func TestShutdown(t *testing.T) {
11 const numStreams = 1000
12 m := newActiveStreamMap(true, NewActiveStreamsMetrics("test", t.Name()))
13
14 // Add all the streams
15 {
16 var wg sync.WaitGroup
17 wg.Add(numStreams)
18 for i := 0; i < numStreams; i++ {
19 go func(streamID int) {
20 defer wg.Done()
21 stream := &MuxedStream{streamID: uint32(streamID)}
22 ok := m.Set(stream)
23 assert.True(t, ok)
24 }(i)
25 }
26 wg.Wait()
27 }
28 assert.Equal(t, numStreams, m.Len(), "All the streams should have been added")
29
30 shutdownChan, alreadyInProgress := m.Shutdown()
31 select {
32 case <-shutdownChan:
33 assert.Fail(t, "before Shutdown(), shutdownChan shouldn't be closed")
34 default:
35 }
36 assert.False(t, alreadyInProgress)
37
38 shutdownChan2, alreadyInProgress2 := m.Shutdown()
39 assert.Equal(t, shutdownChan, shutdownChan2, "repeated calls to Shutdown() should return the same channel")
40 assert.True(t, alreadyInProgress2, "repeated calls to Shutdown() should return true for 'in progress'")
41
42 // Delete all the streams
43 {
44 var wg sync.WaitGroup
45 wg.Add(numStreams)
46 for i := 0; i < numStreams; i++ {
47 go func(streamID int) {
48 defer wg.Done()
49 m.Delete(uint32(streamID))
50 }(i)
51 }
52 wg.Wait()
53 }
54 assert.Equal(t, 0, m.Len(), "All the streams should have been deleted")
55
56 select {
57 case <-shutdownChan:
58 default:
59 assert.Fail(t, "After all the streams are deleted, shutdownChan should have been closed")
60 }
61}
62
63func TestEmptyBeforeShutdown(t *testing.T) {
64 const numStreams = 1000
65 m := newActiveStreamMap(true, NewActiveStreamsMetrics("test", t.Name()))
66
67 // Add all the streams
68 {
69 var wg sync.WaitGroup
70 wg.Add(numStreams)
71 for i := 0; i < numStreams; i++ {
72 go func(streamID int) {
73 defer wg.Done()
74 stream := &MuxedStream{streamID: uint32(streamID)}
75 ok := m.Set(stream)
76 assert.True(t, ok)
77 }(i)
78 }
79 wg.Wait()
80 }
81 assert.Equal(t, numStreams, m.Len(), "All the streams should have been added")
82
83 // Delete all the streams, bringing m to size 0
84 {
85 var wg sync.WaitGroup
86 wg.Add(numStreams)
87 for i := 0; i < numStreams; i++ {
88 go func(streamID int) {
89 defer wg.Done()
90 m.Delete(uint32(streamID))
91 }(i)
92 }
93 wg.Wait()
94 }
95 assert.Equal(t, 0, m.Len(), "All the streams should have been deleted")
96
97 // Add one stream back
98 const soloStreamID = uint32(0)
99 ok := m.Set(&MuxedStream{streamID: soloStreamID})
100 assert.True(t, ok)
101
102 shutdownChan, alreadyInProgress := m.Shutdown()
103 select {
104 case <-shutdownChan:
105 assert.Fail(t, "before Shutdown(), shutdownChan shouldn't be closed")
106 default:
107 }
108 assert.False(t, alreadyInProgress)
109
110 shutdownChan2, alreadyInProgress2 := m.Shutdown()
111 assert.Equal(t, shutdownChan, shutdownChan2, "repeated calls to Shutdown() should return the same channel")
112 assert.True(t, alreadyInProgress2, "repeated calls to Shutdown() should return true for 'in progress'")
113
114 // Remove the remaining stream
115 m.Delete(soloStreamID)
116
117 select {
118 case <-shutdownChan:
119 default:
120 assert.Fail(t, "After all the streams are deleted, shutdownChan should have been closed")
121 }
122}
123
124type noopBuffer struct {
125 isClosed bool
126}
127
128func (t *noopBuffer) Read(p []byte) (n int, err error) { return len(p), nil }
129func (t *noopBuffer) Write(p []byte) (n int, err error) { return len(p), nil }
130func (t *noopBuffer) Reset() {}
131func (t *noopBuffer) Len() int { return 0 }
132func (t *noopBuffer) Close() error { t.isClosed = true; return nil }
133func (t *noopBuffer) Closed() bool { return t.isClosed }
134
135type noopReadyList struct{}
136
137func (_ *noopReadyList) Signal(streamID uint32) {}
138
139func TestAbort(t *testing.T) {
140 const numStreams = 1000
141 m := newActiveStreamMap(true, NewActiveStreamsMetrics("test", t.Name()))
142
143 var openedStreams sync.Map
144
145 // Add all the streams
146 {
147 var wg sync.WaitGroup
148 wg.Add(numStreams)
149 for i := 0; i < numStreams; i++ {
150 go func(streamID int) {
151 defer wg.Done()
152 stream := &MuxedStream{
153 streamID: uint32(streamID),
154 readBuffer: &noopBuffer{},
155 writeBuffer: &noopBuffer{},
156 readyList: &noopReadyList{},
157 }
158 ok := m.Set(stream)
159 assert.True(t, ok)
160
161 openedStreams.Store(stream.streamID, stream)
162 }(i)
163 }
164 wg.Wait()
165 }
166 assert.Equal(t, numStreams, m.Len(), "All the streams should have been added")
167
168 shutdownChan, alreadyInProgress := m.Shutdown()
169 select {
170 case <-shutdownChan:
171 assert.Fail(t, "before Abort(), shutdownChan shouldn't be closed")
172 default:
173 }
174 assert.False(t, alreadyInProgress)
175
176 m.Abort()
177 assert.Equal(t, numStreams, m.Len(), "Abort() shouldn't delete any streams")
178 openedStreams.Range(func(key interface{}, value interface{}) bool {
179 stream := value.(*MuxedStream)
180 readBuffer := stream.readBuffer.(*noopBuffer)
181 writeBuffer := stream.writeBuffer.(*noopBuffer)
182 return assert.True(t, readBuffer.isClosed && writeBuffer.isClosed, "Abort() should have closed all the streams")
183 })
184
185 select {
186 case <-shutdownChan:
187 default:
188 assert.Fail(t, "after Abort(), shutdownChan should have been closed")
189 }
190
191 // multiple aborts shouldn't cause any issues
192 m.Abort()
193 m.Abort()
194 m.Abort()
195}
196