cloudflare/cloudflared
Publicmirrored from https://github.com/cloudflare/cloudflaredAvailable
h2mux/muxreader.go
497lines · modecode
| 1 | package h2mux |
| 2 | |
| 3 | import ( |
| 4 | "bytes" |
| 5 | "encoding/binary" |
| 6 | "io" |
| 7 | "net/url" |
| 8 | "time" |
| 9 | |
| 10 | log "github.com/sirupsen/logrus" |
| 11 | "golang.org/x/net/http2" |
| 12 | ) |
| 13 | |
| 14 | type MuxReader struct { |
| 15 | // f is used to read HTTP2 frames. |
| 16 | f *http2.Framer |
| 17 | // handler provides a callback to receive new streams. if nil, new streams cannot be accepted. |
| 18 | handler MuxedStreamHandler |
| 19 | // streams tracks currently-open streams. |
| 20 | streams *activeStreamMap |
| 21 | // readyList is used to signal writable streams. |
| 22 | readyList *ReadyList |
| 23 | // streamErrors lets us report stream errors to the MuxWriter. |
| 24 | streamErrors *StreamErrorMap |
| 25 | // goAwayChan is used to tell the writer to send a GOAWAY message. |
| 26 | goAwayChan chan<- http2.ErrCode |
| 27 | // abortChan is used when shutting down ungracefully. When this becomes readable, all activity should stop. |
| 28 | abortChan <-chan struct{} |
| 29 | // pingTimestamp is an atomic value containing the latest received ping timestamp. |
| 30 | pingTimestamp *PingTimestamp |
| 31 | // connActive is used to signal to the writer that something happened on the connection. |
| 32 | // This is used to clear idle timeout disconnection deadlines. |
| 33 | connActive Signal |
| 34 | // The initial value for the send and receive window of a new stream. |
| 35 | initialStreamWindow uint32 |
| 36 | // The max value for the send window of a stream. |
| 37 | streamWindowMax uint32 |
| 38 | // r is a reference to the underlying connection used when shutting down. |
| 39 | r io.Closer |
| 40 | // updateRTTChan is the channel to send new RTT measurement to muxerMetricsUpdater |
| 41 | updateRTTChan chan<- *roundTripMeasurement |
| 42 | // updateReceiveWindowChan is the channel to update receiveWindow size to muxerMetricsUpdater |
| 43 | updateReceiveWindowChan chan<- uint32 |
| 44 | // updateSendWindowChan is the channel to update sendWindow size to muxerMetricsUpdater |
| 45 | updateSendWindowChan chan<- uint32 |
| 46 | // bytesRead is the amount of bytes read from data frame since the last time we send bytes read to metrics |
| 47 | bytesRead *AtomicCounter |
| 48 | // updateOutBoundBytesChan is the channel to send bytesWrote to muxerMetricsUpdater |
| 49 | updateInBoundBytesChan chan<- uint64 |
| 50 | // dictionaries holds the h2 cross-stream compression dictionaries |
| 51 | dictionaries h2Dictionaries |
| 52 | } |
| 53 | |
| 54 | func (r *MuxReader) Shutdown() { |
| 55 | done := r.streams.Shutdown() |
| 56 | if done == nil { |
| 57 | return |
| 58 | } |
| 59 | r.sendGoAway(http2.ErrCodeNo) |
| 60 | go func() { |
| 61 | // close reader side when last stream ends; this will cause the writer to abort |
| 62 | <-done |
| 63 | r.r.Close() |
| 64 | }() |
| 65 | } |
| 66 | |
| 67 | func (r *MuxReader) run(parentLogger *log.Entry) error { |
| 68 | logger := parentLogger.WithFields(log.Fields{ |
| 69 | "subsystem": "mux", |
| 70 | "dir": "read", |
| 71 | }) |
| 72 | defer logger.Debug("event loop finished") |
| 73 | |
| 74 | // routine to periodically update bytesRead |
| 75 | go func() { |
| 76 | tickC := time.Tick(updateFreq) |
| 77 | for { |
| 78 | select { |
| 79 | case <-r.abortChan: |
| 80 | return |
| 81 | case <-tickC: |
| 82 | r.updateInBoundBytesChan <- r.bytesRead.Count() |
| 83 | } |
| 84 | } |
| 85 | }() |
| 86 | |
| 87 | for { |
| 88 | frame, err := r.f.ReadFrame() |
| 89 | if err != nil { |
| 90 | switch e := err.(type) { |
| 91 | case http2.StreamError: |
| 92 | logger.WithError(err).Warn("stream error") |
| 93 | r.streamError(e.StreamID, e.Code) |
| 94 | case http2.ConnectionError: |
| 95 | logger.WithError(err).Warn("connection error") |
| 96 | return r.connectionError(err) |
| 97 | default: |
| 98 | if isConnectionClosedError(err) { |
| 99 | if r.streams.Len() == 0 { |
| 100 | logger.Debug("shutting down") |
| 101 | return nil |
| 102 | } |
| 103 | logger.Warn("connection closed unexpectedly") |
| 104 | return err |
| 105 | } else { |
| 106 | logger.WithError(err).Warn("frame read error") |
| 107 | return r.connectionError(err) |
| 108 | } |
| 109 | } |
| 110 | } |
| 111 | r.connActive.Signal() |
| 112 | logger.WithField("data", frame).Debug("read frame") |
| 113 | switch f := frame.(type) { |
| 114 | case *http2.DataFrame: |
| 115 | err = r.receiveFrameData(f, logger) |
| 116 | case *http2.MetaHeadersFrame: |
| 117 | err = r.receiveHeaderData(f) |
| 118 | case *http2.RSTStreamFrame: |
| 119 | streamID := f.Header().StreamID |
| 120 | if streamID == 0 { |
| 121 | return ErrInvalidStream |
| 122 | } |
| 123 | r.streams.Delete(streamID) |
| 124 | case *http2.PingFrame: |
| 125 | r.receivePingData(f) |
| 126 | case *http2.GoAwayFrame: |
| 127 | err = r.receiveGoAway(f) |
| 128 | // The receiver of a flow-controlled frame sends a WINDOW_UPDATE frame as it |
| 129 | // consumes data and frees up space in flow-control windows |
| 130 | case *http2.WindowUpdateFrame: |
| 131 | err = r.updateStreamWindow(f) |
| 132 | case *http2.UnknownFrame: |
| 133 | switch f.Header().Type { |
| 134 | case FrameUseDictionary: |
| 135 | err = r.receiveUseDictionary(f) |
| 136 | case FrameSetDictionary: |
| 137 | err = r.receiveSetDictionary(f) |
| 138 | default: |
| 139 | err = ErrUnexpectedFrameType |
| 140 | } |
| 141 | default: |
| 142 | err = ErrUnexpectedFrameType |
| 143 | } |
| 144 | if err != nil { |
| 145 | logger.WithField("data", frame).WithError(err).Debug("frame error") |
| 146 | return r.connectionError(err) |
| 147 | } |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | func (r *MuxReader) newMuxedStream(streamID uint32) *MuxedStream { |
| 152 | return &MuxedStream{ |
| 153 | streamID: streamID, |
| 154 | readBuffer: NewSharedBuffer(), |
| 155 | writeBuffer: &bytes.Buffer{}, |
| 156 | receiveWindow: r.initialStreamWindow, |
| 157 | receiveWindowCurrentMax: r.initialStreamWindow, |
| 158 | receiveWindowMax: r.streamWindowMax, |
| 159 | sendWindow: r.initialStreamWindow, |
| 160 | readyList: r.readyList, |
| 161 | dictionaries: r.dictionaries, |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | // getStreamForFrame returns a stream if valid, or an error describing why the stream could not be returned. |
| 166 | func (r *MuxReader) getStreamForFrame(frame http2.Frame) (*MuxedStream, error) { |
| 167 | sid := frame.Header().StreamID |
| 168 | if sid == 0 { |
| 169 | return nil, ErrUnexpectedFrameType |
| 170 | } |
| 171 | if stream, ok := r.streams.Get(sid); ok { |
| 172 | return stream, nil |
| 173 | } |
| 174 | if r.streams.IsLocalStreamID(sid) { |
| 175 | // no stream available, but no error |
| 176 | return nil, ErrClosedStream |
| 177 | } |
| 178 | if sid < r.streams.LastPeerStreamID() { |
| 179 | // no stream available, stream closed error |
| 180 | return nil, ErrClosedStream |
| 181 | } |
| 182 | return nil, ErrUnknownStream |
| 183 | } |
| 184 | |
| 185 | func (r *MuxReader) defaultStreamErrorHandler(err error, header http2.FrameHeader) error { |
| 186 | if header.Flags.Has(http2.FlagHeadersEndStream) { |
| 187 | return nil |
| 188 | } else if err == ErrUnknownStream || err == ErrClosedStream { |
| 189 | return r.streamError(header.StreamID, http2.ErrCodeStreamClosed) |
| 190 | } else { |
| 191 | return err |
| 192 | } |
| 193 | } |
| 194 | |
| 195 | // Receives header frames from a stream. A non-nil error is a connection error. |
| 196 | func (r *MuxReader) receiveHeaderData(frame *http2.MetaHeadersFrame) error { |
| 197 | var stream *MuxedStream |
| 198 | sid := frame.Header().StreamID |
| 199 | if sid == 0 { |
| 200 | return ErrUnexpectedFrameType |
| 201 | } |
| 202 | newStream := r.streams.IsPeerStreamID(sid) |
| 203 | if newStream { |
| 204 | // header request |
| 205 | // TODO support trailers (if stream exists) |
| 206 | ok, err := r.streams.AcquirePeerID(sid) |
| 207 | if !ok { |
| 208 | // ignore new streams while shutting down |
| 209 | return r.streamError(sid, err) |
| 210 | } |
| 211 | stream = r.newMuxedStream(sid) |
| 212 | // Set stream. Returns false if a stream already existed with that ID or we are shutting down, return false. |
| 213 | if !r.streams.Set(stream) { |
| 214 | // got HEADERS frame for an existing stream |
| 215 | // TODO support trailers |
| 216 | return r.streamError(sid, http2.ErrCodeInternal) |
| 217 | } |
| 218 | } else { |
| 219 | // header response |
| 220 | var err error |
| 221 | if stream, err = r.getStreamForFrame(frame); err != nil { |
| 222 | return r.defaultStreamErrorHandler(err, frame.Header()) |
| 223 | } |
| 224 | } |
| 225 | headers := make([]Header, 0, len(frame.Fields)) |
| 226 | for _, header := range frame.Fields { |
| 227 | switch header.Name { |
| 228 | case ":method": |
| 229 | stream.method = header.Value |
| 230 | case ":path": |
| 231 | u, err := url.Parse(header.Value) |
| 232 | if err == nil { |
| 233 | stream.path = u.Path |
| 234 | } |
| 235 | case "accept-encoding": |
| 236 | // remove accept-encoding if dictionaries are enabled |
| 237 | if r.dictionaries.write != nil { |
| 238 | continue |
| 239 | } |
| 240 | } |
| 241 | headers = append(headers, Header{Name: header.Name, Value: header.Value}) |
| 242 | } |
| 243 | stream.Headers = headers |
| 244 | if frame.Header().Flags.Has(http2.FlagHeadersEndStream) { |
| 245 | stream.receiveEOF() |
| 246 | return nil |
| 247 | } |
| 248 | if newStream { |
| 249 | go r.handleStream(stream) |
| 250 | } else { |
| 251 | close(stream.responseHeadersReceived) |
| 252 | } |
| 253 | return nil |
| 254 | } |
| 255 | |
| 256 | func (r *MuxReader) handleStream(stream *MuxedStream) { |
| 257 | defer stream.Close() |
| 258 | r.handler.ServeStream(stream) |
| 259 | } |
| 260 | |
| 261 | // Receives a data frame from a stream. A non-nil error is a connection error. |
| 262 | func (r *MuxReader) receiveFrameData(frame *http2.DataFrame, parentLogger *log.Entry) error { |
| 263 | logger := parentLogger.WithField("stream", frame.Header().StreamID) |
| 264 | stream, err := r.getStreamForFrame(frame) |
| 265 | if err != nil { |
| 266 | return r.defaultStreamErrorHandler(err, frame.Header()) |
| 267 | } |
| 268 | data := frame.Data() |
| 269 | if len(data) > 0 { |
| 270 | n, err := stream.readBuffer.Write(data) |
| 271 | if err != nil { |
| 272 | return r.streamError(stream.streamID, http2.ErrCodeInternal) |
| 273 | } |
| 274 | r.bytesRead.IncrementBy(uint64(n)) |
| 275 | } |
| 276 | if frame.Header().Flags.Has(http2.FlagDataEndStream) { |
| 277 | if stream.receiveEOF() { |
| 278 | r.streams.Delete(stream.streamID) |
| 279 | logger.Debug("stream closed") |
| 280 | } else { |
| 281 | logger.Debug("shutdown receive side") |
| 282 | } |
| 283 | return nil |
| 284 | } |
| 285 | if !stream.consumeReceiveWindow(uint32(len(data))) { |
| 286 | return r.streamError(stream.streamID, http2.ErrCodeFlowControl) |
| 287 | } |
| 288 | r.updateReceiveWindowChan <- stream.getReceiveWindow() |
| 289 | return nil |
| 290 | } |
| 291 | |
| 292 | // Receive a PING from the peer. Update RTT and send/receive window metrics if it's an ACK. |
| 293 | func (r *MuxReader) receivePingData(frame *http2.PingFrame) { |
| 294 | ts := int64(binary.LittleEndian.Uint64(frame.Data[:])) |
| 295 | if !frame.IsAck() { |
| 296 | r.pingTimestamp.Set(ts) |
| 297 | return |
| 298 | } |
| 299 | |
| 300 | // Update updates the computed values with a new measurement. |
| 301 | // outgoingTime is the time that the probe was sent. |
| 302 | // We assume that time.Now() is the time we received that probe. |
| 303 | r.updateRTTChan <- &roundTripMeasurement{ |
| 304 | receiveTime: time.Now(), |
| 305 | sendTime: time.Unix(0, ts), |
| 306 | } |
| 307 | } |
| 308 | |
| 309 | // Receive a GOAWAY from the peer. Gracefully shut down our connection. |
| 310 | func (r *MuxReader) receiveGoAway(frame *http2.GoAwayFrame) error { |
| 311 | r.Shutdown() |
| 312 | // Close all streams above the last processed stream |
| 313 | lastStream := r.streams.LastLocalStreamID() |
| 314 | for i := frame.LastStreamID + 2; i <= lastStream; i++ { |
| 315 | if stream, ok := r.streams.Get(i); ok { |
| 316 | stream.Close() |
| 317 | } |
| 318 | } |
| 319 | return nil |
| 320 | } |
| 321 | |
| 322 | // Receive a USE_DICTIONARY from the peer. Setup dictionary for stream. |
| 323 | func (r *MuxReader) receiveUseDictionary(frame *http2.UnknownFrame) error { |
| 324 | payload := frame.Payload() |
| 325 | streamID := frame.StreamID |
| 326 | |
| 327 | // Check frame is formatted properly |
| 328 | if len(payload) != 1 { |
| 329 | return r.streamError(streamID, http2.ErrCodeProtocol) |
| 330 | } |
| 331 | |
| 332 | stream, err := r.getStreamForFrame(frame) |
| 333 | if err != nil { |
| 334 | return err |
| 335 | } |
| 336 | |
| 337 | if stream.receivedUseDict == true || stream.dictionaries.read == nil { |
| 338 | return r.streamError(streamID, http2.ErrCodeInternal) |
| 339 | } |
| 340 | |
| 341 | stream.receivedUseDict = true |
| 342 | dictID := payload[0] |
| 343 | |
| 344 | dictReader := stream.dictionaries.read.newReader(stream.readBuffer.(*SharedBuffer), dictID) |
| 345 | if dictReader == nil { |
| 346 | return r.streamError(streamID, http2.ErrCodeInternal) |
| 347 | } |
| 348 | |
| 349 | stream.readBufferLock.Lock() |
| 350 | stream.readBuffer = dictReader |
| 351 | stream.readBufferLock.Unlock() |
| 352 | |
| 353 | return nil |
| 354 | } |
| 355 | |
| 356 | // Receive a SET_DICTIONARY from the peer. Update dictionaries accordingly. |
| 357 | func (r *MuxReader) receiveSetDictionary(frame *http2.UnknownFrame) (err error) { |
| 358 | |
| 359 | payload := frame.Payload() |
| 360 | flags := frame.Flags |
| 361 | |
| 362 | stream, err := r.getStreamForFrame(frame) |
| 363 | if err != nil && err != ErrClosedStream { |
| 364 | return err |
| 365 | } |
| 366 | reader, ok := stream.readBuffer.(*h2DictionaryReader) |
| 367 | if !ok { |
| 368 | return r.streamError(frame.StreamID, http2.ErrCodeProtocol) |
| 369 | } |
| 370 | |
| 371 | // A SetDictionary frame consists of several |
| 372 | // Dictionary-Entries that specify how existing dictionaries |
| 373 | // are to be updated using the current stream data |
| 374 | // +---------------+---------------+ |
| 375 | // | Dictionary-Entry (+) ... |
| 376 | // +---------------+---------------+ |
| 377 | |
| 378 | for { |
| 379 | // Each Dictionary-Entry is formatted as follows: |
| 380 | // +-------------------------------+ |
| 381 | // | Dictionary-ID (8) | |
| 382 | // +---+---------------------------+ |
| 383 | // | P | Size (7+) | |
| 384 | // +---+---------------------------+ |
| 385 | // | E?| D?| Truncate? (6+) | |
| 386 | // +---+---------------------------+ |
| 387 | // | Offset? (8+) | |
| 388 | // +-------------------------------+ |
| 389 | |
| 390 | var size, truncate, offset uint64 |
| 391 | var p, e, d bool |
| 392 | |
| 393 | // Parse a single Dictionary-Entry |
| 394 | if len(payload) < 2 { // Must have at least id and size |
| 395 | return MuxerStreamError{"unexpected EOF", http2.ErrCodeProtocol} |
| 396 | } |
| 397 | |
| 398 | dictID := uint8(payload[0]) |
| 399 | p = (uint8(payload[1]) >> 7) == 1 |
| 400 | payload, size, err = http2ReadVarInt(7, payload[1:]) |
| 401 | if err != nil { |
| 402 | return |
| 403 | } |
| 404 | |
| 405 | if flags.Has(FlagSetDictionaryAppend) { |
| 406 | // Presence of FlagSetDictionaryAppend means we expect e, d and truncate |
| 407 | if len(payload) < 1 { |
| 408 | return MuxerStreamError{"unexpected EOF", http2.ErrCodeProtocol} |
| 409 | } |
| 410 | e = (uint8(payload[0]) >> 7) == 1 |
| 411 | d = (uint8((payload[0])>>6) & 1) == 1 |
| 412 | payload, truncate, err = http2ReadVarInt(6, payload) |
| 413 | if err != nil { |
| 414 | return |
| 415 | } |
| 416 | } |
| 417 | |
| 418 | if flags.Has(FlagSetDictionaryOffset) { |
| 419 | // Presence of FlagSetDictionaryOffset means we expect offset |
| 420 | if len(payload) < 1 { |
| 421 | return MuxerStreamError{"unexpected EOF", http2.ErrCodeProtocol} |
| 422 | } |
| 423 | payload, offset, err = http2ReadVarInt(8, payload) |
| 424 | if err != nil { |
| 425 | return |
| 426 | } |
| 427 | } |
| 428 | |
| 429 | setdict := setDictRequest{streamID: stream.streamID, |
| 430 | dictID: dictID, |
| 431 | dictSZ: size, |
| 432 | truncate: truncate, |
| 433 | offset: offset, |
| 434 | P: p, |
| 435 | E: e, |
| 436 | D: d} |
| 437 | |
| 438 | // Find the right dictionary |
| 439 | dict, err := r.dictionaries.read.getDictByID(dictID) |
| 440 | if err != nil { |
| 441 | return err |
| 442 | } |
| 443 | |
| 444 | // Register a dictionary update order for the dictionary and reader |
| 445 | updateEntry := &dictUpdate{reader: reader, dictionary: dict, s: setdict} |
| 446 | dict.queue = append(dict.queue, updateEntry) |
| 447 | reader.queue = append(reader.queue, updateEntry) |
| 448 | // End of frame |
| 449 | if len(payload) == 0 { |
| 450 | break |
| 451 | } |
| 452 | } |
| 453 | return nil |
| 454 | } |
| 455 | |
| 456 | // Receives header frames from a stream. A non-nil error is a connection error. |
| 457 | func (r *MuxReader) updateStreamWindow(frame *http2.WindowUpdateFrame) error { |
| 458 | stream, err := r.getStreamForFrame(frame) |
| 459 | if err != nil && err != ErrUnknownStream && err != ErrClosedStream { |
| 460 | return err |
| 461 | } |
| 462 | if stream == nil { |
| 463 | // ignore window updates on closed streams |
| 464 | return nil |
| 465 | } |
| 466 | stream.replenishSendWindow(frame.Increment) |
| 467 | r.updateSendWindowChan <- stream.getSendWindow() |
| 468 | return nil |
| 469 | } |
| 470 | |
| 471 | // Raise a stream processing error, closing the stream. Runs on the write thread. |
| 472 | func (r *MuxReader) streamError(streamID uint32, e http2.ErrCode) error { |
| 473 | r.streamErrors.RaiseError(streamID, e) |
| 474 | return nil |
| 475 | } |
| 476 | |
| 477 | func (r *MuxReader) connectionError(err error) error { |
| 478 | http2Code := http2.ErrCodeInternal |
| 479 | switch e := err.(type) { |
| 480 | case http2.ConnectionError: |
| 481 | http2Code = http2.ErrCode(e) |
| 482 | case MuxerProtocolError: |
| 483 | http2Code = e.h2code |
| 484 | } |
| 485 | r.sendGoAway(http2Code) |
| 486 | return err |
| 487 | } |
| 488 | |
| 489 | // Instruct the writer to send a GOAWAY message if possible. This may fail in |
| 490 | // the case where an existing GOAWAY message is in flight or the writer event |
| 491 | // loop already ended. |
| 492 | func (r *MuxReader) sendGoAway(errCode http2.ErrCode) { |
| 493 | select { |
| 494 | case r.goAwayChan <- errCode: |
| 495 | default: |
| 496 | } |
| 497 | } |
| 498 | |