cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.10.3

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

origin/tunnel.go

748lines · modecode

1package origin
2
3import (
4 "bufio"
5 "context"
6 "crypto/tls"
7 "fmt"
8 "io"
9 "net"
10 "net/http"
11 "net/url"
12 "strconv"
13 "strings"
14 "sync"
15 "time"
16
17 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
18 "github.com/cloudflare/cloudflared/h2mux"
19 "github.com/cloudflare/cloudflared/signal"
20 "github.com/cloudflare/cloudflared/streamhandler"
21 "github.com/cloudflare/cloudflared/tunnelrpc"
22 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
23 "github.com/cloudflare/cloudflared/validation"
24 "github.com/cloudflare/cloudflared/websocket"
25
26 raven "github.com/getsentry/raven-go"
27 "github.com/google/uuid"
28 "github.com/pkg/errors"
29 "github.com/prometheus/client_golang/prometheus"
30 _ "github.com/prometheus/client_golang/prometheus"
31 log "github.com/sirupsen/logrus"
32 "golang.org/x/sync/errgroup"
33 rpc "zombiezen.com/go/capnproto2/rpc"
34)
35
36const (
37 dialTimeout = 15 * time.Second
38 openStreamTimeout = 30 * time.Second
39 lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
40 TagHeaderNamePrefix = "Cf-Warp-Tag-"
41 DuplicateConnectionError = "EDUPCONN"
42)
43
44type TunnelConfig struct {
45 BuildInfo *buildinfo.BuildInfo
46 ClientID string
47 ClientTlsConfig *tls.Config
48 CloseConnOnce *sync.Once // Used to close connectedSignal no more than once
49 CompressionQuality uint64
50 EdgeAddrs []string
51 GracePeriod time.Duration
52 HAConnections int
53 HTTPTransport http.RoundTripper
54 HeartbeatInterval time.Duration
55 Hostname string
56 HTTPHostHeader string
57 IncidentLookup IncidentLookup
58 IsAutoupdated bool
59 IsFreeTunnel bool
60 LBPool string
61 Logger *log.Logger
62 MaxHeartbeats uint64
63 Metrics *TunnelMetrics
64 MetricsUpdateFreq time.Duration
65 NoChunkedEncoding bool
66 OriginCert []byte
67 ReportedVersion string
68 Retries uint
69 RunFromTerminal bool
70 Tags []tunnelpogs.Tag
71 TlsConfig *tls.Config
72 TransportLogger *log.Logger
73 UseDeclarativeTunnel bool
74 WSGI bool
75 // OriginUrl may not be used if a user specifies a unix socket.
76 OriginUrl string
77}
78
79type dialError struct {
80 cause error
81}
82
83func (e dialError) Error() string {
84 return e.cause.Error()
85}
86
87type dupConnRegisterTunnelError struct{}
88
89func (e dupConnRegisterTunnelError) Error() string {
90 return "already connected to this server"
91}
92
93type muxerShutdownError struct{}
94
95func (e muxerShutdownError) Error() string {
96 return "muxer shutdown"
97}
98
99// RegisterTunnel error from server
100type serverRegisterTunnelError struct {
101 cause error
102 permanent bool
103}
104
105func (e serverRegisterTunnelError) Error() string {
106 return e.cause.Error()
107}
108
109// RegisterTunnel error from client
110type clientRegisterTunnelError struct {
111 cause error
112}
113
114func newClientRegisterTunnelError(cause error, counter *prometheus.CounterVec) clientRegisterTunnelError {
115 counter.WithLabelValues(cause.Error()).Inc()
116 return clientRegisterTunnelError{cause: cause}
117}
118
119func (e clientRegisterTunnelError) Error() string {
120 return e.cause.Error()
121}
122
123func (c *TunnelConfig) RegistrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions {
124 policy := tunnelrpc.ExistingTunnelPolicy_balance
125 if c.HAConnections <= 1 && c.LBPool == "" {
126 policy = tunnelrpc.ExistingTunnelPolicy_disconnect
127 }
128 return &tunnelpogs.RegistrationOptions{
129 ClientID: c.ClientID,
130 Version: c.ReportedVersion,
131 OS: fmt.Sprintf("%s_%s", c.BuildInfo.GoOS, c.BuildInfo.GoArch),
132 ExistingTunnelPolicy: policy,
133 PoolName: c.LBPool,
134 Tags: c.Tags,
135 ConnectionID: connectionID,
136 OriginLocalIP: OriginLocalIP,
137 IsAutoupdated: c.IsAutoupdated,
138 RunFromTerminal: c.RunFromTerminal,
139 CompressionQuality: c.CompressionQuality,
140 UUID: uuid.String(),
141 }
142}
143
144func StartTunnelDaemon(ctx context.Context, config *TunnelConfig, connectedSignal *signal.Signal, cloudflaredID uuid.UUID) error {
145 return NewSupervisor(config).Run(ctx, connectedSignal, cloudflaredID)
146}
147
148func ServeTunnelLoop(ctx context.Context,
149 config *TunnelConfig,
150 addr *net.TCPAddr,
151 connectionID uint8,
152 connectedSignal *signal.Signal,
153 u uuid.UUID,
154) error {
155 connectionLogger := config.Logger.WithField("connectionID", connectionID)
156 config.Metrics.incrementHaConnections()
157 defer config.Metrics.decrementHaConnections()
158 backoff := BackoffHandler{MaxRetries: config.Retries}
159 connectedFuse := h2mux.NewBooleanFuse()
160 go func() {
161 if connectedFuse.Await() {
162 connectedSignal.Notify()
163 }
164 }()
165 // Ensure the above goroutine will terminate if we return without connecting
166 defer connectedFuse.Fuse(false)
167 for {
168 err, recoverable := ServeTunnel(ctx, config, connectionLogger, addr, connectionID, connectedFuse, &backoff, u)
169 if recoverable {
170 if duration, ok := backoff.GetBackoffDuration(ctx); ok {
171 connectionLogger.Infof("Retrying in %s seconds", duration)
172 backoff.Backoff(ctx)
173 continue
174 }
175 }
176 return err
177 }
178}
179
180func ServeTunnel(
181 ctx context.Context,
182 config *TunnelConfig,
183 logger *log.Entry,
184 addr *net.TCPAddr,
185 connectionID uint8,
186 connectedFuse *h2mux.BooleanFuse,
187 backoff *BackoffHandler,
188 u uuid.UUID,
189) (err error, recoverable bool) {
190 // Treat panics as recoverable errors
191 defer func() {
192 if r := recover(); r != nil {
193 var ok bool
194 err, ok = r.(error)
195 if !ok {
196 err = fmt.Errorf("ServeTunnel: %v", r)
197 }
198 recoverable = true
199 }
200 }()
201
202 connectionTag := uint8ToString(connectionID)
203
204 // additional tags to send other than hostname which is set in cloudflared main package
205 tags := make(map[string]string)
206 tags["ha"] = connectionTag
207
208 // Returns error from parsing the origin URL or handshake errors
209 handler, originLocalIP, err := NewTunnelHandler(ctx, config, addr.String(), connectionID)
210 if err != nil {
211 errLog := logger.WithError(err)
212 switch err.(type) {
213 case dialError:
214 errLog.Error("Unable to dial edge")
215 case h2mux.MuxerHandshakeError:
216 errLog.Error("Handshake failed with edge server")
217 default:
218 errLog.Error("Tunnel creation failure")
219 return err, false
220 }
221 return err, true
222 }
223
224 errGroup, serveCtx := errgroup.WithContext(ctx)
225
226 errGroup.Go(func() error {
227 err := RegisterTunnel(serveCtx, handler.muxer, config, logger, connectionID, originLocalIP, u)
228 if err == nil {
229 connectedFuse.Fuse(true)
230 backoff.SetGracePeriod()
231 }
232 return err
233 })
234
235 errGroup.Go(func() error {
236 updateMetricsTickC := time.Tick(config.MetricsUpdateFreq)
237 for {
238 select {
239 case <-serveCtx.Done():
240 // UnregisterTunnel blocks until the RPC call returns
241 err := UnregisterTunnel(handler.muxer, config.GracePeriod, config.TransportLogger)
242 handler.muxer.Shutdown()
243 return err
244 case <-updateMetricsTickC:
245 handler.UpdateMetrics(connectionTag)
246 }
247 }
248 })
249
250 errGroup.Go(func() error {
251 // All routines should stop when muxer finish serving. When muxer is shutdown
252 // gracefully, it doesn't return an error, so we need to return errMuxerShutdown
253 // here to notify other routines to stop
254 err := handler.muxer.Serve(serveCtx)
255 if err == nil {
256 return muxerShutdownError{}
257 }
258 return err
259 })
260
261 err = errGroup.Wait()
262 if err != nil {
263 switch castedErr := err.(type) {
264 case dupConnRegisterTunnelError:
265 logger.Info("Already connected to this server, selecting a different one")
266 return err, true
267 case serverRegisterTunnelError:
268 logger.WithError(castedErr.cause).Error("Register tunnel error from server side")
269 // Don't send registration error return from server to Sentry. They are
270 // logged on server side
271 if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 {
272 logger.Error(activeIncidentsMsg(incidents))
273 }
274 return castedErr.cause, !castedErr.permanent
275 case clientRegisterTunnelError:
276 logger.WithError(castedErr.cause).Error("Register tunnel error on client side")
277 raven.CaptureError(castedErr.cause, tags)
278 return err, true
279 case muxerShutdownError:
280 logger.Infof("Muxer shutdown")
281 return err, true
282 default:
283 logger.WithError(err).Error("Serve tunnel error")
284 raven.CaptureError(err, tags)
285 return err, true
286 }
287 }
288 return nil, true
289}
290
291func IsRPCStreamResponse(headers []h2mux.Header) bool {
292 if len(headers) != 1 {
293 return false
294 }
295 if headers[0].Name != ":status" || headers[0].Value != "200" {
296 return false
297 }
298 return true
299}
300
301func RegisterTunnel(
302 ctx context.Context,
303 muxer *h2mux.Muxer,
304 config *TunnelConfig,
305 logger *log.Entry,
306 connectionID uint8,
307 originLocalIP string,
308 uuid uuid.UUID,
309) error {
310 config.TransportLogger.Debug("initiating RPC stream to register")
311 stream, err := openStream(ctx, muxer)
312 if err != nil {
313 // RPC stream open error
314 return newClientRegisterTunnelError(err, config.Metrics.rpcFail)
315 }
316 if !IsRPCStreamResponse(stream.Headers) {
317 // stream response error
318 return newClientRegisterTunnelError(err, config.Metrics.rpcFail)
319 }
320 conn := rpc.NewConn(
321 tunnelrpc.NewTransportLogger(config.TransportLogger.WithField("subsystem", "rpc-register"), rpc.StreamTransport(stream)),
322 tunnelrpc.ConnLog(config.TransportLogger.WithField("subsystem", "rpc-transport")),
323 )
324 defer conn.Close()
325 ts := tunnelpogs.TunnelServer_PogsClient{Client: conn.Bootstrap(ctx)}
326 // Request server info without blocking tunnel registration; must use capnp library directly.
327 tsClient := tunnelrpc.TunnelServer{Client: ts.Client}
328 serverInfoPromise := tsClient.GetServerInfo(ctx, func(tunnelrpc.TunnelServer_getServerInfo_Params) error {
329 return nil
330 })
331 registration, err := ts.RegisterTunnel(
332 ctx,
333 config.OriginCert,
334 config.Hostname,
335 config.RegistrationOptions(connectionID, originLocalIP, uuid),
336 )
337 LogServerInfo(serverInfoPromise.Result(), connectionID, config.Metrics, logger)
338 if err != nil {
339 // RegisterTunnel RPC failure
340 return newClientRegisterTunnelError(err, config.Metrics.regFail)
341 }
342 for _, logLine := range registration.LogLines {
343 logger.Info(logLine)
344 }
345
346 if regErr := processRegisterTunnelError(registration.Err, registration.PermanentFailure, config.Metrics); regErr != nil {
347 return regErr
348 }
349
350 if registration.TunnelID != "" {
351 config.Metrics.tunnelsHA.AddTunnelID(connectionID, registration.TunnelID)
352 logger.Infof("Each HA connection's tunnel IDs: %v", config.Metrics.tunnelsHA.String())
353 }
354
355 // Print out the user's trial zone URL in a nice box (if they requested and got one)
356 if isTrialTunnel := config.Hostname == ""; isTrialTunnel {
357 if url, err := url.Parse(registration.Url); err == nil {
358 for _, line := range asciiBox(trialZoneMsg(url.String()), 2) {
359 logger.Infoln(line)
360 }
361 } else {
362 logger.Errorln("Failed to connect tunnel, please try again.")
363 return fmt.Errorf("empty URL in response from Cloudflare edge")
364 }
365 }
366
367 config.Metrics.userHostnamesCounts.WithLabelValues(registration.Url).Inc()
368
369 logger.Infof("Route propagating, it may take up to 1 minute for your new route to become functional")
370 return nil
371}
372
373func processRegisterTunnelError(err string, permanentFailure bool, metrics *TunnelMetrics) error {
374 if err == "" {
375 metrics.regSuccess.Inc()
376 return nil
377 }
378
379 metrics.regFail.WithLabelValues(err).Inc()
380 if err == DuplicateConnectionError {
381 return dupConnRegisterTunnelError{}
382 }
383 return serverRegisterTunnelError{
384 cause: fmt.Errorf("Server error: %s", err),
385 permanent: permanentFailure,
386 }
387}
388
389func UnregisterTunnel(muxer *h2mux.Muxer, gracePeriod time.Duration, logger *log.Logger) error {
390 logger.Debug("initiating RPC stream to unregister")
391 ctx := context.Background()
392 stream, err := openStream(ctx, muxer)
393 if err != nil {
394 // RPC stream open error
395 return err
396 }
397 if !IsRPCStreamResponse(stream.Headers) {
398 // stream response error
399 return err
400 }
401 conn := rpc.NewConn(
402 tunnelrpc.NewTransportLogger(logger.WithField("subsystem", "rpc-unregister"), rpc.StreamTransport(stream)),
403 tunnelrpc.ConnLog(logger.WithField("subsystem", "rpc-transport")),
404 )
405 defer conn.Close()
406 ts := tunnelpogs.TunnelServer_PogsClient{Client: conn.Bootstrap(ctx)}
407 // gracePeriod is encoded in int64 using capnproto
408 return ts.UnregisterTunnel(ctx, gracePeriod.Nanoseconds())
409}
410
411func openStream(ctx context.Context, muxer *h2mux.Muxer) (*h2mux.MuxedStream, error) {
412 openStreamCtx, cancel := context.WithTimeout(ctx, openStreamTimeout)
413 defer cancel()
414 return muxer.OpenStream(openStreamCtx, []h2mux.Header{
415 {Name: ":method", Value: "RPC"},
416 {Name: ":scheme", Value: "capnp"},
417 {Name: ":path", Value: "*"},
418 }, nil)
419}
420
421func LogServerInfo(
422 promise tunnelrpc.ServerInfo_Promise,
423 connectionID uint8,
424 metrics *TunnelMetrics,
425 logger *log.Entry,
426) {
427 serverInfoMessage, err := promise.Struct()
428 if err != nil {
429 logger.WithError(err).Warn("Failed to retrieve server information")
430 return
431 }
432 serverInfo, err := tunnelpogs.UnmarshalServerInfo(serverInfoMessage)
433 if err != nil {
434 logger.WithError(err).Warn("Failed to retrieve server information")
435 return
436 }
437 logger.Infof("Connected to %s", serverInfo.LocationName)
438 metrics.registerServerLocation(uint8ToString(connectionID), serverInfo.LocationName)
439}
440
441func H1ResponseToH2Response(h1 *http.Response) (h2 []h2mux.Header) {
442 h2 = []h2mux.Header{{Name: ":status", Value: fmt.Sprintf("%d", h1.StatusCode)}}
443 for headerName, headerValues := range h1.Header {
444 for _, headerValue := range headerValues {
445 h2 = append(h2, h2mux.Header{Name: strings.ToLower(headerName), Value: headerValue})
446 }
447 }
448 return
449}
450
451type TunnelHandler struct {
452 originUrl string
453 httpHostHeader string
454 muxer *h2mux.Muxer
455 httpClient http.RoundTripper
456 tlsConfig *tls.Config
457 tags []tunnelpogs.Tag
458 metrics *TunnelMetrics
459 // connectionID is only used by metrics, and prometheus requires labels to be string
460 connectionID string
461 logger *log.Logger
462 noChunkedEncoding bool
463}
464
465var dialer = net.Dialer{DualStack: true}
466
467// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error
468func NewTunnelHandler(ctx context.Context,
469 config *TunnelConfig,
470 addr string,
471 connectionID uint8,
472) (*TunnelHandler, string, error) {
473 originURL, err := validation.ValidateUrl(config.OriginUrl)
474 if err != nil {
475 return nil, "", fmt.Errorf("unable to parse origin URL %#v", originURL)
476 }
477 h := &TunnelHandler{
478 originUrl: originURL,
479 httpHostHeader: config.HTTPHostHeader,
480 httpClient: config.HTTPTransport,
481 tlsConfig: config.ClientTlsConfig,
482 tags: config.Tags,
483 metrics: config.Metrics,
484 connectionID: uint8ToString(connectionID),
485 logger: config.Logger,
486 noChunkedEncoding: config.NoChunkedEncoding,
487 }
488 if h.httpClient == nil {
489 h.httpClient = http.DefaultTransport
490 }
491 // Inherit from parent context so we can cancel (Ctrl-C) while dialing
492 dialCtx, dialCancel := context.WithTimeout(ctx, dialTimeout)
493 // TUN-92: enforce a timeout on dial and handshake (as tls.Dial does not support one)
494 plaintextEdgeConn, err := dialer.DialContext(dialCtx, "tcp", addr)
495 dialCancel()
496 if err != nil {
497 return nil, "", dialError{cause: errors.Wrap(err, "DialContext error")}
498 }
499 edgeConn := tls.Client(plaintextEdgeConn, config.TlsConfig)
500 edgeConn.SetDeadline(time.Now().Add(dialTimeout))
501 err = edgeConn.Handshake()
502 if err != nil {
503 return nil, "", dialError{cause: errors.Wrap(err, "Handshake with edge error")}
504 }
505 // clear the deadline on the conn; h2mux has its own timeouts
506 edgeConn.SetDeadline(time.Time{})
507 // Establish a muxed connection with the edge
508 // Client mux handshake with agent server
509 h.muxer, err = h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
510 Timeout: 5 * time.Second,
511 Handler: h,
512 IsClient: true,
513 HeartbeatInterval: config.HeartbeatInterval,
514 MaxHeartbeats: config.MaxHeartbeats,
515 Logger: config.TransportLogger.WithFields(log.Fields{}),
516 CompressionQuality: h2mux.CompressionSetting(config.CompressionQuality),
517 })
518 if err != nil {
519 return h, "", errors.New("TLS handshake error")
520 }
521 return h, edgeConn.LocalAddr().String(), err
522}
523
524func (h *TunnelHandler) AppendTagHeaders(r *http.Request) {
525 for _, tag := range h.tags {
526 r.Header.Add(TagHeaderNamePrefix+tag.Name, tag.Value)
527 }
528}
529
530func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
531 h.metrics.incrementRequests(h.connectionID)
532 defer h.metrics.decrementConcurrentRequests(h.connectionID)
533
534 req, reqErr := h.createRequest(stream)
535 if reqErr != nil {
536 h.logError(stream, reqErr)
537 return reqErr
538 }
539
540 cfRay := streamhandler.FindCfRayHeader(req)
541 lbProbe := streamhandler.IsLBProbeRequest(req)
542 h.logRequest(req, cfRay, lbProbe)
543
544 var resp *http.Response
545 var respErr error
546 if websocket.IsWebSocketUpgrade(req) {
547 resp, respErr = h.serveWebsocket(stream, req)
548 } else {
549 resp, respErr = h.serveHTTP(stream, req)
550 }
551 if respErr != nil {
552 h.logError(stream, respErr)
553 return respErr
554 }
555 h.logResponseOk(resp, cfRay, lbProbe)
556 return nil
557}
558
559func (h *TunnelHandler) createRequest(stream *h2mux.MuxedStream) (*http.Request, error) {
560 req, err := http.NewRequest("GET", h.originUrl, h2mux.MuxedStreamReader{MuxedStream: stream})
561 if err != nil {
562 return nil, errors.Wrap(err, "Unexpected error from http.NewRequest")
563 }
564 err = streamhandler.H2RequestHeadersToH1Request(stream.Headers, req)
565 if err != nil {
566 return nil, errors.Wrap(err, "invalid request received")
567 }
568 h.AppendTagHeaders(req)
569 return req, nil
570}
571
572func (h *TunnelHandler) serveWebsocket(stream *h2mux.MuxedStream, req *http.Request) (*http.Response, error) {
573 if h.httpHostHeader != "" {
574 req.Header.Set("Host", h.httpHostHeader)
575 req.Host = h.httpHostHeader
576 }
577
578 conn, response, err := websocket.ClientConnect(req, h.tlsConfig)
579 if err != nil {
580 return nil, err
581 }
582 defer conn.Close()
583 err = stream.WriteHeaders(H1ResponseToH2Response(response))
584 if err != nil {
585 return nil, errors.Wrap(err, "Error writing response header")
586 }
587 // Copy to/from stream to the undelying connection. Use the underlying
588 // connection because cloudflared doesn't operate on the message themselves
589 websocket.Stream(conn.UnderlyingConn(), stream)
590 return response, nil
591}
592
593func (h *TunnelHandler) serveHTTP(stream *h2mux.MuxedStream, req *http.Request) (*http.Response, error) {
594 // Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
595 if h.noChunkedEncoding {
596 req.TransferEncoding = []string{"gzip", "deflate"}
597 cLength, err := strconv.Atoi(req.Header.Get("Content-Length"))
598 if err == nil {
599 req.ContentLength = int64(cLength)
600 }
601 }
602
603 // Request origin to keep connection alive to improve performance
604 req.Header.Set("Connection", "keep-alive")
605
606 if h.httpHostHeader != "" {
607 req.Header.Set("Host", h.httpHostHeader)
608 req.Host = h.httpHostHeader
609 }
610
611 response, err := h.httpClient.RoundTrip(req)
612 if err != nil {
613 return nil, errors.Wrap(err, "Error proxying request to origin")
614 }
615 defer response.Body.Close()
616
617 err = stream.WriteHeaders(H1ResponseToH2Response(response))
618 if err != nil {
619 return nil, errors.Wrap(err, "Error writing response header")
620 }
621 if h.isEventStream(response) {
622 h.writeEventStream(stream, response.Body)
623 } else {
624 // Use CopyBuffer, because Copy only allocates a 32KiB buffer, and cross-stream
625 // compression generates dictionary on first write
626 io.CopyBuffer(stream, response.Body, make([]byte, 512*1024))
627 }
628 return response, nil
629}
630
631func (h *TunnelHandler) writeEventStream(stream *h2mux.MuxedStream, responseBody io.ReadCloser) {
632 reader := bufio.NewReader(responseBody)
633 for {
634 line, err := reader.ReadBytes('\n')
635 if err != nil {
636 break
637 }
638 stream.Write(line)
639 }
640}
641
642func (h *TunnelHandler) isEventStream(response *http.Response) bool {
643 if response.Header.Get("content-type") == "text/event-stream" {
644 h.logger.Debug("Detected Server-Side Events from Origin")
645 return true
646 }
647 return false
648}
649
650func (h *TunnelHandler) logError(stream *h2mux.MuxedStream, err error) {
651 h.logger.WithError(err).Error("HTTP request error")
652 stream.WriteHeaders([]h2mux.Header{{Name: ":status", Value: "502"}})
653 stream.Write([]byte("502 Bad Gateway"))
654 h.metrics.incrementResponses(h.connectionID, "502")
655}
656
657func (h *TunnelHandler) logRequest(req *http.Request, cfRay string, lbProbe bool) {
658 logger := log.NewEntry(h.logger)
659 if cfRay != "" {
660 logger = logger.WithField("CF-RAY", cfRay)
661 logger.Debugf("%s %s %s", req.Method, req.URL, req.Proto)
662 } else if lbProbe {
663 logger.Debugf("Load Balancer health check %s %s %s", req.Method, req.URL, req.Proto)
664 } else {
665 logger.Warnf("All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s ", req.Method, req.URL, req.Proto)
666 }
667 logger.Debugf("Request Headers %+v", req.Header)
668
669 if contentLen := req.ContentLength; contentLen == -1 {
670 logger.Debugf("Request Content length unknown")
671 } else {
672 logger.Debugf("Request content length %d", contentLen)
673 }
674}
675
676func (h *TunnelHandler) logResponseOk(r *http.Response, cfRay string, lbProbe bool) {
677 h.metrics.incrementResponses(h.connectionID, "200")
678 logger := log.NewEntry(h.logger)
679 if cfRay != "" {
680 logger = logger.WithField("CF-RAY", cfRay)
681 logger.Debugf("%s", r.Status)
682 } else if lbProbe {
683 logger.Debugf("Response to Load Balancer health check %s", r.Status)
684 } else {
685 logger.Infof("%s", r.Status)
686 }
687 logger.Debugf("Response Headers %+v", r.Header)
688
689 if contentLen := r.ContentLength; contentLen == -1 {
690 logger.Debugf("Response content length unknown")
691 } else {
692 logger.Debugf("Response content length %d", contentLen)
693 }
694}
695
696func (h *TunnelHandler) UpdateMetrics(connectionID string) {
697 h.metrics.updateMuxerMetrics(connectionID, h.muxer.Metrics())
698}
699
700func uint8ToString(input uint8) string {
701 return strconv.FormatUint(uint64(input), 10)
702}
703
704// Print out the given lines in a nice ASCII box.
705func asciiBox(lines []string, padding int) (box []string) {
706 maxLen := maxLen(lines)
707 spacer := strings.Repeat(" ", padding)
708
709 border := "+" + strings.Repeat("-", maxLen+(padding*2)) + "+"
710
711 box = append(box, border)
712 for _, line := range lines {
713 box = append(box, "|"+spacer+line+strings.Repeat(" ", maxLen-len(line))+spacer+"|")
714 }
715 box = append(box, border)
716 return
717}
718
719func maxLen(lines []string) int {
720 max := 0
721 for _, line := range lines {
722 if len(line) > max {
723 max = len(line)
724 }
725 }
726 return max
727}
728
729func trialZoneMsg(url string) []string {
730 return []string{
731 "Your free tunnel has started! Visit it:",
732 " " + url,
733 }
734}
735
736func activeIncidentsMsg(incidents []Incident) string {
737 preamble := "There is an active Cloudflare incident that may be related:"
738 if len(incidents) > 1 {
739 preamble = "There are active Cloudflare incidents that may be related:"
740 }
741 incidentStrings := []string{}
742 for _, incident := range incidents {
743 incidentString := fmt.Sprintf("%s (%s)", incident.Name, incident.URL())
744 incidentStrings = append(incidentStrings, incidentString)
745 }
746 return preamble + " " + strings.Join(incidentStrings, "; ")
747
748}
749