cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2020.3.2

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

h2mux/activestreammap.go

182lines · modecode

1package h2mux
2
3import (
4 "sync"
5
6 "github.com/prometheus/client_golang/prometheus"
7 "golang.org/x/net/http2"
8)
9
10// activeStreamMap is used to moderate access to active streams between the read and write
11// threads, and deny access to new peer streams while shutting down.
12type activeStreamMap struct {
13 sync.RWMutex
14 // streams tracks open streams.
15 streams map[uint32]*MuxedStream
16 // nextStreamID is the next ID to use on our side of the connection.
17 // This is odd for clients, even for servers.
18 nextStreamID uint32
19 // maxPeerStreamID is the ID of the most recent stream opened by the peer.
20 maxPeerStreamID uint32
21 // activeStreams is a gauge shared by all muxers of this process to expose the total number of active streams
22 activeStreams prometheus.Gauge
23
24 // ignoreNewStreams is true when the connection is being shut down. New streams
25 // cannot be registered.
26 ignoreNewStreams bool
27 // streamsEmpty is a chan that will be closed when no more streams are open.
28 streamsEmptyChan chan struct{}
29 closeOnce sync.Once
30}
31
32func newActiveStreamMap(useClientStreamNumbers bool, activeStreams prometheus.Gauge) *activeStreamMap {
33 m := &activeStreamMap{
34 streams: make(map[uint32]*MuxedStream),
35 streamsEmptyChan: make(chan struct{}),
36 nextStreamID: 1,
37 activeStreams: activeStreams,
38 }
39 // Client initiated stream uses odd stream ID, server initiated stream uses even stream ID
40 if !useClientStreamNumbers {
41 m.nextStreamID = 2
42 }
43 return m
44}
45
46// This function should be called while `m` is locked.
47func (m *activeStreamMap) notifyStreamsEmpty() {
48 m.closeOnce.Do(func() {
49 close(m.streamsEmptyChan)
50 })
51}
52
53// Len returns the number of active streams.
54func (m *activeStreamMap) Len() int {
55 m.RLock()
56 defer m.RUnlock()
57 return len(m.streams)
58}
59
60func (m *activeStreamMap) Get(streamID uint32) (*MuxedStream, bool) {
61 m.RLock()
62 defer m.RUnlock()
63 stream, ok := m.streams[streamID]
64 return stream, ok
65}
66
67// Set returns true if the stream was assigned successfully. If a stream
68// already existed with that ID or we are shutting down, return false.
69func (m *activeStreamMap) Set(newStream *MuxedStream) bool {
70 m.Lock()
71 defer m.Unlock()
72 if _, ok := m.streams[newStream.streamID]; ok {
73 return false
74 }
75 if m.ignoreNewStreams {
76 return false
77 }
78 m.streams[newStream.streamID] = newStream
79 m.activeStreams.Inc()
80 return true
81}
82
83// Delete stops tracking the stream. It should be called only after it is closed and resetted.
84func (m *activeStreamMap) Delete(streamID uint32) {
85 m.Lock()
86 defer m.Unlock()
87 if _, ok := m.streams[streamID]; ok {
88 delete(m.streams, streamID)
89 m.activeStreams.Dec()
90 }
91
92 // shutting down, and now the map is empty
93 if m.ignoreNewStreams && len(m.streams) == 0 {
94 m.notifyStreamsEmpty()
95 }
96}
97
98// Shutdown blocks new streams from being created.
99// It returns `done`, a channel that is closed once the last stream has closed
100// and `progress`, whether a shutdown was already in progress
101func (m *activeStreamMap) Shutdown() (done <-chan struct{}, alreadyInProgress bool) {
102 m.Lock()
103 defer m.Unlock()
104 if m.ignoreNewStreams {
105 // already shutting down
106 return m.streamsEmptyChan, true
107 }
108 m.ignoreNewStreams = true
109 if len(m.streams) == 0 {
110 // there are no streams to wait for
111 m.notifyStreamsEmpty()
112 }
113 return m.streamsEmptyChan, false
114}
115
116// AcquireLocalID acquires a new stream ID for a stream you're opening.
117func (m *activeStreamMap) AcquireLocalID() uint32 {
118 m.Lock()
119 defer m.Unlock()
120 x := m.nextStreamID
121 m.nextStreamID += 2
122 return x
123}
124
125// ObservePeerID observes the ID of a stream opened by the peer. It returns true if we should accept
126// the new stream, or false to reject it. The ErrCode gives the reason why.
127func (m *activeStreamMap) AcquirePeerID(streamID uint32) (bool, http2.ErrCode) {
128 m.Lock()
129 defer m.Unlock()
130 switch {
131 case m.ignoreNewStreams:
132 return false, http2.ErrCodeStreamClosed
133 case streamID > m.maxPeerStreamID:
134 m.maxPeerStreamID = streamID
135 return true, http2.ErrCodeNo
136 default:
137 return false, http2.ErrCodeStreamClosed
138 }
139}
140
141// IsPeerStreamID is true if the stream ID belongs to the peer.
142func (m *activeStreamMap) IsPeerStreamID(streamID uint32) bool {
143 m.RLock()
144 defer m.RUnlock()
145 return (streamID % 2) != (m.nextStreamID % 2)
146}
147
148// IsLocalStreamID is true if it is a stream we have opened, even if it is now closed.
149func (m *activeStreamMap) IsLocalStreamID(streamID uint32) bool {
150 m.RLock()
151 defer m.RUnlock()
152 return (streamID%2) == (m.nextStreamID%2) && streamID < m.nextStreamID
153}
154
155// LastPeerStreamID returns the most recently opened peer stream ID.
156func (m *activeStreamMap) LastPeerStreamID() uint32 {
157 m.RLock()
158 defer m.RUnlock()
159 return m.maxPeerStreamID
160}
161
162// LastLocalStreamID returns the most recently opened local stream ID.
163func (m *activeStreamMap) LastLocalStreamID() uint32 {
164 m.RLock()
165 defer m.RUnlock()
166 if m.nextStreamID > 1 {
167 return m.nextStreamID - 2
168 }
169 return 0
170}
171
172// Abort closes every active stream and prevents new ones being created. This should be used to
173// return errors in pending read/writes when the underlying connection goes away.
174func (m *activeStreamMap) Abort() {
175 m.Lock()
176 defer m.Unlock()
177 for _, stream := range m.streams {
178 stream.Close()
179 }
180 m.ignoreNewStreams = true
181 m.notifyStreamsEmpty()
182}
183