cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2018.10.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

h2mux/muxreader.go

497lines · modecode

1package h2mux
2
3import (
4 "bytes"
5 "encoding/binary"
6 "io"
7 "net/url"
8 "time"
9
10 log "github.com/sirupsen/logrus"
11 "golang.org/x/net/http2"
12)
13
14type MuxReader struct {
15 // f is used to read HTTP2 frames.
16 f *http2.Framer
17 // handler provides a callback to receive new streams. if nil, new streams cannot be accepted.
18 handler MuxedStreamHandler
19 // streams tracks currently-open streams.
20 streams *activeStreamMap
21 // readyList is used to signal writable streams.
22 readyList *ReadyList
23 // streamErrors lets us report stream errors to the MuxWriter.
24 streamErrors *StreamErrorMap
25 // goAwayChan is used to tell the writer to send a GOAWAY message.
26 goAwayChan chan<- http2.ErrCode
27 // abortChan is used when shutting down ungracefully. When this becomes readable, all activity should stop.
28 abortChan <-chan struct{}
29 // pingTimestamp is an atomic value containing the latest received ping timestamp.
30 pingTimestamp *PingTimestamp
31 // connActive is used to signal to the writer that something happened on the connection.
32 // This is used to clear idle timeout disconnection deadlines.
33 connActive Signal
34 // The initial value for the send and receive window of a new stream.
35 initialStreamWindow uint32
36 // The max value for the send window of a stream.
37 streamWindowMax uint32
38 // r is a reference to the underlying connection used when shutting down.
39 r io.Closer
40 // updateRTTChan is the channel to send new RTT measurement to muxerMetricsUpdater
41 updateRTTChan chan<- *roundTripMeasurement
42 // updateReceiveWindowChan is the channel to update receiveWindow size to muxerMetricsUpdater
43 updateReceiveWindowChan chan<- uint32
44 // updateSendWindowChan is the channel to update sendWindow size to muxerMetricsUpdater
45 updateSendWindowChan chan<- uint32
46 // bytesRead is the amount of bytes read from data frame since the last time we send bytes read to metrics
47 bytesRead *AtomicCounter
48 // updateOutBoundBytesChan is the channel to send bytesWrote to muxerMetricsUpdater
49 updateInBoundBytesChan chan<- uint64
50 // dictionaries holds the h2 cross-stream compression dictionaries
51 dictionaries h2Dictionaries
52}
53
54func (r *MuxReader) Shutdown() {
55 done := r.streams.Shutdown()
56 if done == nil {
57 return
58 }
59 r.sendGoAway(http2.ErrCodeNo)
60 go func() {
61 // close reader side when last stream ends; this will cause the writer to abort
62 <-done
63 r.r.Close()
64 }()
65}
66
67func (r *MuxReader) run(parentLogger *log.Entry) error {
68 logger := parentLogger.WithFields(log.Fields{
69 "subsystem": "mux",
70 "dir": "read",
71 })
72 defer logger.Debug("event loop finished")
73
74 // routine to periodically update bytesRead
75 go func() {
76 tickC := time.Tick(updateFreq)
77 for {
78 select {
79 case <-r.abortChan:
80 return
81 case <-tickC:
82 r.updateInBoundBytesChan <- r.bytesRead.Count()
83 }
84 }
85 }()
86
87 for {
88 frame, err := r.f.ReadFrame()
89 if err != nil {
90 switch e := err.(type) {
91 case http2.StreamError:
92 logger.WithError(err).Warn("stream error")
93 r.streamError(e.StreamID, e.Code)
94 case http2.ConnectionError:
95 logger.WithError(err).Warn("connection error")
96 return r.connectionError(err)
97 default:
98 if isConnectionClosedError(err) {
99 if r.streams.Len() == 0 {
100 logger.Debug("shutting down")
101 return nil
102 }
103 logger.Warn("connection closed unexpectedly")
104 return err
105 } else {
106 logger.WithError(err).Warn("frame read error")
107 return r.connectionError(err)
108 }
109 }
110 }
111 r.connActive.Signal()
112 logger.WithField("data", frame).Debug("read frame")
113 switch f := frame.(type) {
114 case *http2.DataFrame:
115 err = r.receiveFrameData(f, logger)
116 case *http2.MetaHeadersFrame:
117 err = r.receiveHeaderData(f)
118 case *http2.RSTStreamFrame:
119 streamID := f.Header().StreamID
120 if streamID == 0 {
121 return ErrInvalidStream
122 }
123 r.streams.Delete(streamID)
124 case *http2.PingFrame:
125 r.receivePingData(f)
126 case *http2.GoAwayFrame:
127 err = r.receiveGoAway(f)
128 // The receiver of a flow-controlled frame sends a WINDOW_UPDATE frame as it
129 // consumes data and frees up space in flow-control windows
130 case *http2.WindowUpdateFrame:
131 err = r.updateStreamWindow(f)
132 case *http2.UnknownFrame:
133 switch f.Header().Type {
134 case FrameUseDictionary:
135 err = r.receiveUseDictionary(f)
136 case FrameSetDictionary:
137 err = r.receiveSetDictionary(f)
138 default:
139 err = ErrUnexpectedFrameType
140 }
141 default:
142 err = ErrUnexpectedFrameType
143 }
144 if err != nil {
145 logger.WithField("data", frame).WithError(err).Debug("frame error")
146 return r.connectionError(err)
147 }
148 }
149}
150
151func (r *MuxReader) newMuxedStream(streamID uint32) *MuxedStream {
152 return &MuxedStream{
153 streamID: streamID,
154 readBuffer: NewSharedBuffer(),
155 writeBuffer: &bytes.Buffer{},
156 receiveWindow: r.initialStreamWindow,
157 receiveWindowCurrentMax: r.initialStreamWindow,
158 receiveWindowMax: r.streamWindowMax,
159 sendWindow: r.initialStreamWindow,
160 readyList: r.readyList,
161 dictionaries: r.dictionaries,
162 }
163}
164
165// getStreamForFrame returns a stream if valid, or an error describing why the stream could not be returned.
166func (r *MuxReader) getStreamForFrame(frame http2.Frame) (*MuxedStream, error) {
167 sid := frame.Header().StreamID
168 if sid == 0 {
169 return nil, ErrUnexpectedFrameType
170 }
171 if stream, ok := r.streams.Get(sid); ok {
172 return stream, nil
173 }
174 if r.streams.IsLocalStreamID(sid) {
175 // no stream available, but no error
176 return nil, ErrClosedStream
177 }
178 if sid < r.streams.LastPeerStreamID() {
179 // no stream available, stream closed error
180 return nil, ErrClosedStream
181 }
182 return nil, ErrUnknownStream
183}
184
185func (r *MuxReader) defaultStreamErrorHandler(err error, header http2.FrameHeader) error {
186 if header.Flags.Has(http2.FlagHeadersEndStream) {
187 return nil
188 } else if err == ErrUnknownStream || err == ErrClosedStream {
189 return r.streamError(header.StreamID, http2.ErrCodeStreamClosed)
190 } else {
191 return err
192 }
193}
194
195// Receives header frames from a stream. A non-nil error is a connection error.
196func (r *MuxReader) receiveHeaderData(frame *http2.MetaHeadersFrame) error {
197 var stream *MuxedStream
198 sid := frame.Header().StreamID
199 if sid == 0 {
200 return ErrUnexpectedFrameType
201 }
202 newStream := r.streams.IsPeerStreamID(sid)
203 if newStream {
204 // header request
205 // TODO support trailers (if stream exists)
206 ok, err := r.streams.AcquirePeerID(sid)
207 if !ok {
208 // ignore new streams while shutting down
209 return r.streamError(sid, err)
210 }
211 stream = r.newMuxedStream(sid)
212 // Set stream. Returns false if a stream already existed with that ID or we are shutting down, return false.
213 if !r.streams.Set(stream) {
214 // got HEADERS frame for an existing stream
215 // TODO support trailers
216 return r.streamError(sid, http2.ErrCodeInternal)
217 }
218 } else {
219 // header response
220 var err error
221 if stream, err = r.getStreamForFrame(frame); err != nil {
222 return r.defaultStreamErrorHandler(err, frame.Header())
223 }
224 }
225 headers := make([]Header, 0, len(frame.Fields))
226 for _, header := range frame.Fields {
227 switch header.Name {
228 case ":method":
229 stream.method = header.Value
230 case ":path":
231 u, err := url.Parse(header.Value)
232 if err == nil {
233 stream.path = u.Path
234 }
235 case "accept-encoding":
236 // remove accept-encoding if dictionaries are enabled
237 if r.dictionaries.write != nil {
238 continue
239 }
240 }
241 headers = append(headers, Header{Name: header.Name, Value: header.Value})
242 }
243 stream.Headers = headers
244 if frame.Header().Flags.Has(http2.FlagHeadersEndStream) {
245 stream.receiveEOF()
246 return nil
247 }
248 if newStream {
249 go r.handleStream(stream)
250 } else {
251 close(stream.responseHeadersReceived)
252 }
253 return nil
254}
255
256func (r *MuxReader) handleStream(stream *MuxedStream) {
257 defer stream.Close()
258 r.handler.ServeStream(stream)
259}
260
261// Receives a data frame from a stream. A non-nil error is a connection error.
262func (r *MuxReader) receiveFrameData(frame *http2.DataFrame, parentLogger *log.Entry) error {
263 logger := parentLogger.WithField("stream", frame.Header().StreamID)
264 stream, err := r.getStreamForFrame(frame)
265 if err != nil {
266 return r.defaultStreamErrorHandler(err, frame.Header())
267 }
268 data := frame.Data()
269 if len(data) > 0 {
270 n, err := stream.readBuffer.Write(data)
271 if err != nil {
272 return r.streamError(stream.streamID, http2.ErrCodeInternal)
273 }
274 r.bytesRead.IncrementBy(uint64(n))
275 }
276 if frame.Header().Flags.Has(http2.FlagDataEndStream) {
277 if stream.receiveEOF() {
278 r.streams.Delete(stream.streamID)
279 logger.Debug("stream closed")
280 } else {
281 logger.Debug("shutdown receive side")
282 }
283 return nil
284 }
285 if !stream.consumeReceiveWindow(uint32(len(data))) {
286 return r.streamError(stream.streamID, http2.ErrCodeFlowControl)
287 }
288 r.updateReceiveWindowChan <- stream.getReceiveWindow()
289 return nil
290}
291
292// Receive a PING from the peer. Update RTT and send/receive window metrics if it's an ACK.
293func (r *MuxReader) receivePingData(frame *http2.PingFrame) {
294 ts := int64(binary.LittleEndian.Uint64(frame.Data[:]))
295 if !frame.IsAck() {
296 r.pingTimestamp.Set(ts)
297 return
298 }
299
300 // Update updates the computed values with a new measurement.
301 // outgoingTime is the time that the probe was sent.
302 // We assume that time.Now() is the time we received that probe.
303 r.updateRTTChan <- &roundTripMeasurement{
304 receiveTime: time.Now(),
305 sendTime: time.Unix(0, ts),
306 }
307}
308
309// Receive a GOAWAY from the peer. Gracefully shut down our connection.
310func (r *MuxReader) receiveGoAway(frame *http2.GoAwayFrame) error {
311 r.Shutdown()
312 // Close all streams above the last processed stream
313 lastStream := r.streams.LastLocalStreamID()
314 for i := frame.LastStreamID + 2; i <= lastStream; i++ {
315 if stream, ok := r.streams.Get(i); ok {
316 stream.Close()
317 }
318 }
319 return nil
320}
321
322// Receive a USE_DICTIONARY from the peer. Setup dictionary for stream.
323func (r *MuxReader) receiveUseDictionary(frame *http2.UnknownFrame) error {
324 payload := frame.Payload()
325 streamID := frame.StreamID
326
327 // Check frame is formatted properly
328 if len(payload) != 1 {
329 return r.streamError(streamID, http2.ErrCodeProtocol)
330 }
331
332 stream, err := r.getStreamForFrame(frame)
333 if err != nil {
334 return err
335 }
336
337 if stream.receivedUseDict == true || stream.dictionaries.read == nil {
338 return r.streamError(streamID, http2.ErrCodeInternal)
339 }
340
341 stream.receivedUseDict = true
342 dictID := payload[0]
343
344 dictReader := stream.dictionaries.read.newReader(stream.readBuffer.(*SharedBuffer), dictID)
345 if dictReader == nil {
346 return r.streamError(streamID, http2.ErrCodeInternal)
347 }
348
349 stream.readBufferLock.Lock()
350 stream.readBuffer = dictReader
351 stream.readBufferLock.Unlock()
352
353 return nil
354}
355
356// Receive a SET_DICTIONARY from the peer. Update dictionaries accordingly.
357func (r *MuxReader) receiveSetDictionary(frame *http2.UnknownFrame) (err error) {
358
359 payload := frame.Payload()
360 flags := frame.Flags
361
362 stream, err := r.getStreamForFrame(frame)
363 if err != nil && err != ErrClosedStream {
364 return err
365 }
366 reader, ok := stream.readBuffer.(*h2DictionaryReader)
367 if !ok {
368 return r.streamError(frame.StreamID, http2.ErrCodeProtocol)
369 }
370
371 // A SetDictionary frame consists of several
372 // Dictionary-Entries that specify how existing dictionaries
373 // are to be updated using the current stream data
374 // +---------------+---------------+
375 // | Dictionary-Entry (+) ...
376 // +---------------+---------------+
377
378 for {
379 // Each Dictionary-Entry is formatted as follows:
380 // +-------------------------------+
381 // | Dictionary-ID (8) |
382 // +---+---------------------------+
383 // | P | Size (7+) |
384 // +---+---------------------------+
385 // | E?| D?| Truncate? (6+) |
386 // +---+---------------------------+
387 // | Offset? (8+) |
388 // +-------------------------------+
389
390 var size, truncate, offset uint64
391 var p, e, d bool
392
393 // Parse a single Dictionary-Entry
394 if len(payload) < 2 { // Must have at least id and size
395 return MuxerStreamError{"unexpected EOF", http2.ErrCodeProtocol}
396 }
397
398 dictID := uint8(payload[0])
399 p = (uint8(payload[1]) >> 7) == 1
400 payload, size, err = http2ReadVarInt(7, payload[1:])
401 if err != nil {
402 return
403 }
404
405 if flags.Has(FlagSetDictionaryAppend) {
406 // Presence of FlagSetDictionaryAppend means we expect e, d and truncate
407 if len(payload) < 1 {
408 return MuxerStreamError{"unexpected EOF", http2.ErrCodeProtocol}
409 }
410 e = (uint8(payload[0]) >> 7) == 1
411 d = (uint8((payload[0])>>6) & 1) == 1
412 payload, truncate, err = http2ReadVarInt(6, payload)
413 if err != nil {
414 return
415 }
416 }
417
418 if flags.Has(FlagSetDictionaryOffset) {
419 // Presence of FlagSetDictionaryOffset means we expect offset
420 if len(payload) < 1 {
421 return MuxerStreamError{"unexpected EOF", http2.ErrCodeProtocol}
422 }
423 payload, offset, err = http2ReadVarInt(8, payload)
424 if err != nil {
425 return
426 }
427 }
428
429 setdict := setDictRequest{streamID: stream.streamID,
430 dictID: dictID,
431 dictSZ: size,
432 truncate: truncate,
433 offset: offset,
434 P: p,
435 E: e,
436 D: d}
437
438 // Find the right dictionary
439 dict, err := r.dictionaries.read.getDictByID(dictID)
440 if err != nil {
441 return err
442 }
443
444 // Register a dictionary update order for the dictionary and reader
445 updateEntry := &dictUpdate{reader: reader, dictionary: dict, s: setdict}
446 dict.queue = append(dict.queue, updateEntry)
447 reader.queue = append(reader.queue, updateEntry)
448 // End of frame
449 if len(payload) == 0 {
450 break
451 }
452 }
453 return nil
454}
455
456// Receives header frames from a stream. A non-nil error is a connection error.
457func (r *MuxReader) updateStreamWindow(frame *http2.WindowUpdateFrame) error {
458 stream, err := r.getStreamForFrame(frame)
459 if err != nil && err != ErrUnknownStream && err != ErrClosedStream {
460 return err
461 }
462 if stream == nil {
463 // ignore window updates on closed streams
464 return nil
465 }
466 stream.replenishSendWindow(frame.Increment)
467 r.updateSendWindowChan <- stream.getSendWindow()
468 return nil
469}
470
471// Raise a stream processing error, closing the stream. Runs on the write thread.
472func (r *MuxReader) streamError(streamID uint32, e http2.ErrCode) error {
473 r.streamErrors.RaiseError(streamID, e)
474 return nil
475}
476
477func (r *MuxReader) connectionError(err error) error {
478 http2Code := http2.ErrCodeInternal
479 switch e := err.(type) {
480 case http2.ConnectionError:
481 http2Code = http2.ErrCode(e)
482 case MuxerProtocolError:
483 http2Code = e.h2code
484 }
485 r.sendGoAway(http2Code)
486 return err
487}
488
489// Instruct the writer to send a GOAWAY message if possible. This may fail in
490// the case where an existing GOAWAY message is in flight or the writer event
491// loop already ended.
492func (r *MuxReader) sendGoAway(errCode http2.ErrCode) {
493 select {
494 case r.goAwayChan <- errCode:
495 default:
496 }
497}
498