cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.11.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

h2mux/activestreammap.go

165lines · modecode

1package h2mux
2
3import (
4 "sync"
5
6 "golang.org/x/net/http2"
7)
8
9// activeStreamMap is used to moderate access to active streams between the read and write
10// threads, and deny access to new peer streams while shutting down.
11type activeStreamMap struct {
12 sync.RWMutex
13 // streams tracks open streams.
14 streams map[uint32]*MuxedStream
15 // streamsEmpty is a chan that should be closed when no more streams are open.
16 streamsEmpty chan struct{}
17 // nextStreamID is the next ID to use on our side of the connection.
18 // This is odd for clients, even for servers.
19 nextStreamID uint32
20 // maxPeerStreamID is the ID of the most recent stream opened by the peer.
21 maxPeerStreamID uint32
22 // ignoreNewStreams is true when the connection is being shut down. New streams
23 // cannot be registered.
24 ignoreNewStreams bool
25}
26
27func newActiveStreamMap(useClientStreamNumbers bool) *activeStreamMap {
28 m := &activeStreamMap{
29 streams: make(map[uint32]*MuxedStream),
30 streamsEmpty: make(chan struct{}),
31 nextStreamID: 1,
32 }
33 // Client initiated stream uses odd stream ID, server initiated stream uses even stream ID
34 if !useClientStreamNumbers {
35 m.nextStreamID = 2
36 }
37 return m
38}
39
40// Len returns the number of active streams.
41func (m *activeStreamMap) Len() int {
42 m.RLock()
43 defer m.RUnlock()
44 return len(m.streams)
45}
46
47func (m *activeStreamMap) Get(streamID uint32) (*MuxedStream, bool) {
48 m.RLock()
49 defer m.RUnlock()
50 stream, ok := m.streams[streamID]
51 return stream, ok
52}
53
54// Set returns true if the stream was assigned successfully. If a stream
55// already existed with that ID or we are shutting down, return false.
56func (m *activeStreamMap) Set(newStream *MuxedStream) bool {
57 m.Lock()
58 defer m.Unlock()
59 if _, ok := m.streams[newStream.streamID]; ok {
60 return false
61 }
62 if m.ignoreNewStreams {
63 return false
64 }
65 m.streams[newStream.streamID] = newStream
66 return true
67}
68
69// Delete stops tracking the stream. It should be called only after it is closed and resetted.
70func (m *activeStreamMap) Delete(streamID uint32) {
71 m.Lock()
72 defer m.Unlock()
73 delete(m.streams, streamID)
74 if len(m.streams) == 0 && m.streamsEmpty != nil {
75 close(m.streamsEmpty)
76 m.streamsEmpty = nil
77 }
78}
79
80// Shutdown blocks new streams from being created. It returns a channel that receives an event
81// once the last stream has closed, or nil if a shutdown is in progress.
82func (m *activeStreamMap) Shutdown() <-chan struct{} {
83 m.Lock()
84 defer m.Unlock()
85 if m.ignoreNewStreams {
86 // already shutting down
87 return nil
88 }
89 m.ignoreNewStreams = true
90 done := make(chan struct{})
91 if len(m.streams) == 0 {
92 // nothing to shut down
93 close(done)
94 return done
95 }
96 m.streamsEmpty = done
97 return done
98}
99
100// AcquireLocalID acquires a new stream ID for a stream you're opening.
101func (m *activeStreamMap) AcquireLocalID() uint32 {
102 m.Lock()
103 defer m.Unlock()
104 x := m.nextStreamID
105 m.nextStreamID += 2
106 return x
107}
108
109// ObservePeerID observes the ID of a stream opened by the peer. It returns true if we should accept
110// the new stream, or false to reject it. The ErrCode gives the reason why.
111func (m *activeStreamMap) AcquirePeerID(streamID uint32) (bool, http2.ErrCode) {
112 m.Lock()
113 defer m.Unlock()
114 switch {
115 case m.ignoreNewStreams:
116 return false, http2.ErrCodeStreamClosed
117 case streamID > m.maxPeerStreamID:
118 m.maxPeerStreamID = streamID
119 return true, http2.ErrCodeNo
120 default:
121 return false, http2.ErrCodeStreamClosed
122 }
123}
124
125// IsPeerStreamID is true if the stream ID belongs to the peer.
126func (m *activeStreamMap) IsPeerStreamID(streamID uint32) bool {
127 m.RLock()
128 defer m.RUnlock()
129 return (streamID % 2) != (m.nextStreamID % 2)
130}
131
132// IsLocalStreamID is true if it is a stream we have opened, even if it is now closed.
133func (m *activeStreamMap) IsLocalStreamID(streamID uint32) bool {
134 m.RLock()
135 defer m.RUnlock()
136 return (streamID%2) == (m.nextStreamID%2) && streamID < m.nextStreamID
137}
138
139// LastPeerStreamID returns the most recently opened peer stream ID.
140func (m *activeStreamMap) LastPeerStreamID() uint32 {
141 m.RLock()
142 defer m.RUnlock()
143 return m.maxPeerStreamID
144}
145
146// LastLocalStreamID returns the most recently opened local stream ID.
147func (m *activeStreamMap) LastLocalStreamID() uint32 {
148 m.RLock()
149 defer m.RUnlock()
150 if m.nextStreamID > 1 {
151 return m.nextStreamID - 2
152 }
153 return 0
154}
155
156// Abort closes every active stream and prevents new ones being created. This should be used to
157// return errors in pending read/writes when the underlying connection goes away.
158func (m *activeStreamMap) Abort() {
159 m.Lock()
160 defer m.Unlock()
161 for _, stream := range m.streams {
162 stream.Close()
163 }
164 m.ignoreNewStreams = true
165}
166