cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.7.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

origin/tunnel.go

734lines · modecode

1package origin
2
3import (
4 "bufio"
5 "context"
6 "crypto/tls"
7 "fmt"
8 "io"
9 "net"
10 "net/http"
11 "net/url"
12 "strconv"
13 "strings"
14 "sync"
15 "time"
16
17 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
18 "github.com/cloudflare/cloudflared/h2mux"
19 "github.com/cloudflare/cloudflared/signal"
20 "github.com/cloudflare/cloudflared/streamhandler"
21 "github.com/cloudflare/cloudflared/tunnelrpc"
22 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
23 "github.com/cloudflare/cloudflared/validation"
24 "github.com/cloudflare/cloudflared/websocket"
25
26 raven "github.com/getsentry/raven-go"
27 "github.com/google/uuid"
28 "github.com/pkg/errors"
29 "github.com/prometheus/client_golang/prometheus"
30 _ "github.com/prometheus/client_golang/prometheus"
31 log "github.com/sirupsen/logrus"
32 "golang.org/x/sync/errgroup"
33 rpc "zombiezen.com/go/capnproto2/rpc"
34)
35
36const (
37 dialTimeout = 15 * time.Second
38 openStreamTimeout = 30 * time.Second
39 lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
40 TagHeaderNamePrefix = "Cf-Warp-Tag-"
41 DuplicateConnectionError = "EDUPCONN"
42)
43
44type TunnelConfig struct {
45 BuildInfo *buildinfo.BuildInfo
46 ClientID string
47 ClientTlsConfig *tls.Config
48 CloseConnOnce *sync.Once // Used to close connectedSignal no more than once
49 CompressionQuality uint64
50 EdgeAddrs []string
51 GracePeriod time.Duration
52 HAConnections int
53 HTTPTransport http.RoundTripper
54 HeartbeatInterval time.Duration
55 Hostname string
56 IncidentLookup IncidentLookup
57 IsAutoupdated bool
58 IsFreeTunnel bool
59 LBPool string
60 Logger *log.Logger
61 MaxHeartbeats uint64
62 Metrics *TunnelMetrics
63 MetricsUpdateFreq time.Duration
64 NoChunkedEncoding bool
65 OriginCert []byte
66 ReportedVersion string
67 Retries uint
68 RunFromTerminal bool
69 Tags []tunnelpogs.Tag
70 TlsConfig *tls.Config
71 TransportLogger *log.Logger
72 UseDeclarativeTunnel bool
73 WSGI bool
74 // OriginUrl may not be used if a user specifies a unix socket.
75 OriginUrl string
76}
77
78type dialError struct {
79 cause error
80}
81
82func (e dialError) Error() string {
83 return e.cause.Error()
84}
85
86type dupConnRegisterTunnelError struct{}
87
88func (e dupConnRegisterTunnelError) Error() string {
89 return "already connected to this server"
90}
91
92type muxerShutdownError struct{}
93
94func (e muxerShutdownError) Error() string {
95 return "muxer shutdown"
96}
97
98// RegisterTunnel error from server
99type serverRegisterTunnelError struct {
100 cause error
101 permanent bool
102}
103
104func (e serverRegisterTunnelError) Error() string {
105 return e.cause.Error()
106}
107
108// RegisterTunnel error from client
109type clientRegisterTunnelError struct {
110 cause error
111}
112
113func newClientRegisterTunnelError(cause error, counter *prometheus.CounterVec) clientRegisterTunnelError {
114 counter.WithLabelValues(cause.Error()).Inc()
115 return clientRegisterTunnelError{cause: cause}
116}
117
118func (e clientRegisterTunnelError) Error() string {
119 return e.cause.Error()
120}
121
122func (c *TunnelConfig) RegistrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions {
123 policy := tunnelrpc.ExistingTunnelPolicy_balance
124 if c.HAConnections <= 1 && c.LBPool == "" {
125 policy = tunnelrpc.ExistingTunnelPolicy_disconnect
126 }
127 return &tunnelpogs.RegistrationOptions{
128 ClientID: c.ClientID,
129 Version: c.ReportedVersion,
130 OS: fmt.Sprintf("%s_%s", c.BuildInfo.GoOS, c.BuildInfo.GoArch),
131 ExistingTunnelPolicy: policy,
132 PoolName: c.LBPool,
133 Tags: c.Tags,
134 ConnectionID: connectionID,
135 OriginLocalIP: OriginLocalIP,
136 IsAutoupdated: c.IsAutoupdated,
137 RunFromTerminal: c.RunFromTerminal,
138 CompressionQuality: c.CompressionQuality,
139 UUID: uuid.String(),
140 }
141}
142
143func StartTunnelDaemon(ctx context.Context, config *TunnelConfig, connectedSignal *signal.Signal, cloudflaredID uuid.UUID) error {
144 return NewSupervisor(config).Run(ctx, connectedSignal, cloudflaredID)
145}
146
147func ServeTunnelLoop(ctx context.Context,
148 config *TunnelConfig,
149 addr *net.TCPAddr,
150 connectionID uint8,
151 connectedSignal *signal.Signal,
152 u uuid.UUID,
153) error {
154 logger := config.Logger
155 config.Metrics.incrementHaConnections()
156 defer config.Metrics.decrementHaConnections()
157 backoff := BackoffHandler{MaxRetries: config.Retries}
158 connectedFuse := h2mux.NewBooleanFuse()
159 go func() {
160 if connectedFuse.Await() {
161 connectedSignal.Notify()
162 }
163 }()
164 // Ensure the above goroutine will terminate if we return without connecting
165 defer connectedFuse.Fuse(false)
166 for {
167 err, recoverable := ServeTunnel(ctx, config, addr, connectionID, connectedFuse, &backoff, u)
168 if recoverable {
169 if duration, ok := backoff.GetBackoffDuration(ctx); ok {
170 logger.Infof("Retrying in %s seconds", duration)
171 backoff.Backoff(ctx)
172 continue
173 }
174 }
175 return err
176 }
177}
178
179func ServeTunnel(
180 ctx context.Context,
181 config *TunnelConfig,
182 addr *net.TCPAddr,
183 connectionID uint8,
184 connectedFuse *h2mux.BooleanFuse,
185 backoff *BackoffHandler,
186 u uuid.UUID,
187) (err error, recoverable bool) {
188 // Treat panics as recoverable errors
189 defer func() {
190 if r := recover(); r != nil {
191 var ok bool
192 err, ok = r.(error)
193 if !ok {
194 err = fmt.Errorf("ServeTunnel: %v", r)
195 }
196 recoverable = true
197 }
198 }()
199
200 connectionTag := uint8ToString(connectionID)
201 logger := config.Logger.WithField("connectionID", connectionTag)
202
203 // additional tags to send other than hostname which is set in cloudflared main package
204 tags := make(map[string]string)
205 tags["ha"] = connectionTag
206
207 // Returns error from parsing the origin URL or handshake errors
208 handler, originLocalIP, err := NewTunnelHandler(ctx, config, addr.String(), connectionID)
209 if err != nil {
210 errLog := config.Logger.WithError(err)
211 switch err.(type) {
212 case dialError:
213 errLog.Error("Unable to dial edge")
214 case h2mux.MuxerHandshakeError:
215 errLog.Error("Handshake failed with edge server")
216 default:
217 errLog.Error("Tunnel creation failure")
218 return err, false
219 }
220 return err, true
221 }
222
223 errGroup, serveCtx := errgroup.WithContext(ctx)
224
225 errGroup.Go(func() error {
226 err := RegisterTunnel(serveCtx, handler.muxer, config, connectionID, originLocalIP, u)
227 if err == nil {
228 connectedFuse.Fuse(true)
229 backoff.SetGracePeriod()
230 }
231 return err
232 })
233
234 errGroup.Go(func() error {
235 updateMetricsTickC := time.Tick(config.MetricsUpdateFreq)
236 for {
237 select {
238 case <-serveCtx.Done():
239 // UnregisterTunnel blocks until the RPC call returns
240 err := UnregisterTunnel(handler.muxer, config.GracePeriod, config.TransportLogger)
241 handler.muxer.Shutdown()
242 return err
243 case <-updateMetricsTickC:
244 handler.UpdateMetrics(connectionTag)
245 }
246 }
247 })
248
249 errGroup.Go(func() error {
250 // All routines should stop when muxer finish serving. When muxer is shutdown
251 // gracefully, it doesn't return an error, so we need to return errMuxerShutdown
252 // here to notify other routines to stop
253 err := handler.muxer.Serve(serveCtx)
254 if err == nil {
255 return muxerShutdownError{}
256 }
257 return err
258 })
259
260 err = errGroup.Wait()
261 if err != nil {
262 switch castedErr := err.(type) {
263 case dupConnRegisterTunnelError:
264 logger.Info("Already connected to this server, selecting a different one")
265 return err, true
266 case serverRegisterTunnelError:
267 logger.WithError(castedErr.cause).Error("Register tunnel error from server side")
268 // Don't send registration error return from server to Sentry. They are
269 // logged on server side
270 if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 {
271 logger.Error(activeIncidentsMsg(incidents))
272 }
273 return castedErr.cause, !castedErr.permanent
274 case clientRegisterTunnelError:
275 logger.WithError(castedErr.cause).Error("Register tunnel error on client side")
276 raven.CaptureError(castedErr.cause, tags)
277 return err, true
278 case muxerShutdownError:
279 logger.Infof("Muxer shutdown")
280 return err, true
281 default:
282 logger.WithError(err).Error("Serve tunnel error")
283 raven.CaptureError(err, tags)
284 return err, true
285 }
286 }
287 return nil, true
288}
289
290func IsRPCStreamResponse(headers []h2mux.Header) bool {
291 if len(headers) != 1 {
292 return false
293 }
294 if headers[0].Name != ":status" || headers[0].Value != "200" {
295 return false
296 }
297 return true
298}
299
300func RegisterTunnel(
301 ctx context.Context,
302 muxer *h2mux.Muxer,
303 config *TunnelConfig,
304 connectionID uint8,
305 originLocalIP string,
306 uuid uuid.UUID,
307) error {
308 config.TransportLogger.Debug("initiating RPC stream to register")
309 stream, err := openStream(ctx, muxer)
310 if err != nil {
311 // RPC stream open error
312 return newClientRegisterTunnelError(err, config.Metrics.rpcFail)
313 }
314 if !IsRPCStreamResponse(stream.Headers) {
315 // stream response error
316 return newClientRegisterTunnelError(err, config.Metrics.rpcFail)
317 }
318 conn := rpc.NewConn(
319 tunnelrpc.NewTransportLogger(config.TransportLogger.WithField("subsystem", "rpc-register"), rpc.StreamTransport(stream)),
320 tunnelrpc.ConnLog(config.TransportLogger.WithField("subsystem", "rpc-transport")),
321 )
322 defer conn.Close()
323 ts := tunnelpogs.TunnelServer_PogsClient{Client: conn.Bootstrap(ctx)}
324 // Request server info without blocking tunnel registration; must use capnp library directly.
325 tsClient := tunnelrpc.TunnelServer{Client: ts.Client}
326 serverInfoPromise := tsClient.GetServerInfo(ctx, func(tunnelrpc.TunnelServer_getServerInfo_Params) error {
327 return nil
328 })
329 registration, err := ts.RegisterTunnel(
330 ctx,
331 config.OriginCert,
332 config.Hostname,
333 config.RegistrationOptions(connectionID, originLocalIP, uuid),
334 )
335 LogServerInfo(serverInfoPromise.Result(), connectionID, config.Metrics, config.Logger)
336 if err != nil {
337 // RegisterTunnel RPC failure
338 return newClientRegisterTunnelError(err, config.Metrics.regFail)
339 }
340 for _, logLine := range registration.LogLines {
341 config.Logger.Info(logLine)
342 }
343
344 if regErr := processRegisterTunnelError(registration.Err, registration.PermanentFailure, config.Metrics); regErr != nil {
345 return regErr
346 }
347
348 if registration.TunnelID != "" {
349 config.Metrics.tunnelsHA.AddTunnelID(connectionID, registration.TunnelID)
350 config.Logger.Infof("Each HA connection's tunnel IDs: %v", config.Metrics.tunnelsHA.String())
351 }
352
353 // Print out the user's trial zone URL in a nice box (if they requested and got one)
354 if isTrialTunnel := config.Hostname == ""; isTrialTunnel {
355 if url, err := url.Parse(registration.Url); err == nil {
356 for _, line := range asciiBox(trialZoneMsg(url.String()), 2) {
357 config.Logger.Infoln(line)
358 }
359 } else {
360 config.Logger.Errorln("Failed to connect tunnel, please try again.")
361 return fmt.Errorf("empty URL in response from Cloudflare edge")
362 }
363 }
364
365 config.Metrics.userHostnamesCounts.WithLabelValues(registration.Url).Inc()
366
367 config.Logger.Infof("Route propagating, it may take up to 1 minute for your new route to become functional")
368 return nil
369}
370
371func processRegisterTunnelError(err string, permanentFailure bool, metrics *TunnelMetrics) error {
372 if err == "" {
373 metrics.regSuccess.Inc()
374 return nil
375 }
376
377 metrics.regFail.WithLabelValues(err).Inc()
378 if err == DuplicateConnectionError {
379 return dupConnRegisterTunnelError{}
380 }
381 return serverRegisterTunnelError{
382 cause: fmt.Errorf("Server error: %s", err),
383 permanent: permanentFailure,
384 }
385}
386
387func UnregisterTunnel(muxer *h2mux.Muxer, gracePeriod time.Duration, logger *log.Logger) error {
388 logger.Debug("initiating RPC stream to unregister")
389 ctx := context.Background()
390 stream, err := openStream(ctx, muxer)
391 if err != nil {
392 // RPC stream open error
393 return err
394 }
395 if !IsRPCStreamResponse(stream.Headers) {
396 // stream response error
397 return err
398 }
399 conn := rpc.NewConn(
400 tunnelrpc.NewTransportLogger(logger.WithField("subsystem", "rpc-unregister"), rpc.StreamTransport(stream)),
401 tunnelrpc.ConnLog(logger.WithField("subsystem", "rpc-transport")),
402 )
403 defer conn.Close()
404 ts := tunnelpogs.TunnelServer_PogsClient{Client: conn.Bootstrap(ctx)}
405 // gracePeriod is encoded in int64 using capnproto
406 return ts.UnregisterTunnel(ctx, gracePeriod.Nanoseconds())
407}
408
409func openStream(ctx context.Context, muxer *h2mux.Muxer) (*h2mux.MuxedStream, error) {
410 openStreamCtx, cancel := context.WithTimeout(ctx, openStreamTimeout)
411 defer cancel()
412 return muxer.OpenStream(openStreamCtx, []h2mux.Header{
413 {Name: ":method", Value: "RPC"},
414 {Name: ":scheme", Value: "capnp"},
415 {Name: ":path", Value: "*"},
416 }, nil)
417}
418
419func LogServerInfo(
420 promise tunnelrpc.ServerInfo_Promise,
421 connectionID uint8,
422 metrics *TunnelMetrics,
423 logger *log.Logger,
424) {
425 serverInfoMessage, err := promise.Struct()
426 if err != nil {
427 logger.WithError(err).Warn("Failed to retrieve server information")
428 return
429 }
430 serverInfo, err := tunnelpogs.UnmarshalServerInfo(serverInfoMessage)
431 if err != nil {
432 logger.WithError(err).Warn("Failed to retrieve server information")
433 return
434 }
435 logger.Infof("Connected to %s", serverInfo.LocationName)
436 metrics.registerServerLocation(uint8ToString(connectionID), serverInfo.LocationName)
437}
438
439func H1ResponseToH2Response(h1 *http.Response) (h2 []h2mux.Header) {
440 h2 = []h2mux.Header{{Name: ":status", Value: fmt.Sprintf("%d", h1.StatusCode)}}
441 for headerName, headerValues := range h1.Header {
442 for _, headerValue := range headerValues {
443 h2 = append(h2, h2mux.Header{Name: strings.ToLower(headerName), Value: headerValue})
444 }
445 }
446 return
447}
448
449type TunnelHandler struct {
450 originUrl string
451 muxer *h2mux.Muxer
452 httpClient http.RoundTripper
453 tlsConfig *tls.Config
454 tags []tunnelpogs.Tag
455 metrics *TunnelMetrics
456 // connectionID is only used by metrics, and prometheus requires labels to be string
457 connectionID string
458 logger *log.Logger
459 noChunkedEncoding bool
460}
461
462var dialer = net.Dialer{DualStack: true}
463
464// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error
465func NewTunnelHandler(ctx context.Context,
466 config *TunnelConfig,
467 addr string,
468 connectionID uint8,
469) (*TunnelHandler, string, error) {
470 originURL, err := validation.ValidateUrl(config.OriginUrl)
471 if err != nil {
472 return nil, "", fmt.Errorf("unable to parse origin URL %#v", originURL)
473 }
474 h := &TunnelHandler{
475 originUrl: originURL,
476 httpClient: config.HTTPTransport,
477 tlsConfig: config.ClientTlsConfig,
478 tags: config.Tags,
479 metrics: config.Metrics,
480 connectionID: uint8ToString(connectionID),
481 logger: config.Logger,
482 noChunkedEncoding: config.NoChunkedEncoding,
483 }
484 if h.httpClient == nil {
485 h.httpClient = http.DefaultTransport
486 }
487 // Inherit from parent context so we can cancel (Ctrl-C) while dialing
488 dialCtx, dialCancel := context.WithTimeout(ctx, dialTimeout)
489 // TUN-92: enforce a timeout on dial and handshake (as tls.Dial does not support one)
490 plaintextEdgeConn, err := dialer.DialContext(dialCtx, "tcp", addr)
491 dialCancel()
492 if err != nil {
493 return nil, "", dialError{cause: errors.Wrap(err, "DialContext error")}
494 }
495 edgeConn := tls.Client(plaintextEdgeConn, config.TlsConfig)
496 edgeConn.SetDeadline(time.Now().Add(dialTimeout))
497 err = edgeConn.Handshake()
498 if err != nil {
499 return nil, "", dialError{cause: errors.Wrap(err, "Handshake with edge error")}
500 }
501 // clear the deadline on the conn; h2mux has its own timeouts
502 edgeConn.SetDeadline(time.Time{})
503 // Establish a muxed connection with the edge
504 // Client mux handshake with agent server
505 h.muxer, err = h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
506 Timeout: 5 * time.Second,
507 Handler: h,
508 IsClient: true,
509 HeartbeatInterval: config.HeartbeatInterval,
510 MaxHeartbeats: config.MaxHeartbeats,
511 Logger: config.TransportLogger.WithFields(log.Fields{}),
512 CompressionQuality: h2mux.CompressionSetting(config.CompressionQuality),
513 })
514 if err != nil {
515 return h, "", errors.New("TLS handshake error")
516 }
517 return h, edgeConn.LocalAddr().String(), err
518}
519
520func (h *TunnelHandler) AppendTagHeaders(r *http.Request) {
521 for _, tag := range h.tags {
522 r.Header.Add(TagHeaderNamePrefix+tag.Name, tag.Value)
523 }
524}
525
526func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
527 h.metrics.incrementRequests(h.connectionID)
528 defer h.metrics.decrementConcurrentRequests(h.connectionID)
529
530 req, reqErr := h.createRequest(stream)
531 if reqErr != nil {
532 h.logError(stream, reqErr)
533 return reqErr
534 }
535
536 cfRay := streamhandler.FindCfRayHeader(req)
537 lbProbe := streamhandler.IsLBProbeRequest(req)
538 h.logRequest(req, cfRay, lbProbe)
539
540 var resp *http.Response
541 var respErr error
542 if websocket.IsWebSocketUpgrade(req) {
543 resp, respErr = h.serveWebsocket(stream, req)
544 } else {
545 resp, respErr = h.serveHTTP(stream, req)
546 }
547 if respErr != nil {
548 h.logError(stream, respErr)
549 return respErr
550 }
551 h.logResponseOk(resp, cfRay, lbProbe)
552 return nil
553}
554
555func (h *TunnelHandler) createRequest(stream *h2mux.MuxedStream) (*http.Request, error) {
556 req, err := http.NewRequest("GET", h.originUrl, h2mux.MuxedStreamReader{MuxedStream: stream})
557 if err != nil {
558 return nil, errors.Wrap(err, "Unexpected error from http.NewRequest")
559 }
560 err = streamhandler.H2RequestHeadersToH1Request(stream.Headers, req)
561 if err != nil {
562 return nil, errors.Wrap(err, "invalid request received")
563 }
564 h.AppendTagHeaders(req)
565 return req, nil
566}
567
568func (h *TunnelHandler) serveWebsocket(stream *h2mux.MuxedStream, req *http.Request) (*http.Response, error) {
569 conn, response, err := websocket.ClientConnect(req, h.tlsConfig)
570 if err != nil {
571 return nil, err
572 }
573 defer conn.Close()
574 err = stream.WriteHeaders(H1ResponseToH2Response(response))
575 if err != nil {
576 return nil, errors.Wrap(err, "Error writing response header")
577 }
578 // Copy to/from stream to the undelying connection. Use the underlying
579 // connection because cloudflared doesn't operate on the message themselves
580 websocket.Stream(conn.UnderlyingConn(), stream)
581 return response, nil
582}
583
584func (h *TunnelHandler) serveHTTP(stream *h2mux.MuxedStream, req *http.Request) (*http.Response, error) {
585 // Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
586 if h.noChunkedEncoding {
587 req.TransferEncoding = []string{"gzip", "deflate"}
588 cLength, err := strconv.Atoi(req.Header.Get("Content-Length"))
589 if err == nil {
590 req.ContentLength = int64(cLength)
591 }
592 }
593
594 // Request origin to keep connection alive to improve performance
595 req.Header.Set("Connection", "keep-alive")
596
597 response, err := h.httpClient.RoundTrip(req)
598 if err != nil {
599 return nil, errors.Wrap(err, "Error proxying request to origin")
600 }
601 defer response.Body.Close()
602
603 err = stream.WriteHeaders(H1ResponseToH2Response(response))
604 if err != nil {
605 return nil, errors.Wrap(err, "Error writing response header")
606 }
607 if h.isEventStream(response) {
608 h.writeEventStream(stream, response.Body)
609 } else {
610 // Use CopyBuffer, because Copy only allocates a 32KiB buffer, and cross-stream
611 // compression generates dictionary on first write
612 io.CopyBuffer(stream, response.Body, make([]byte, 512*1024))
613 }
614 return response, nil
615}
616
617func (h *TunnelHandler) writeEventStream(stream *h2mux.MuxedStream, responseBody io.ReadCloser) {
618 reader := bufio.NewReader(responseBody)
619 for {
620 line, err := reader.ReadBytes('\n')
621 if err != nil {
622 break
623 }
624 stream.Write(line)
625 }
626}
627
628func (h *TunnelHandler) isEventStream(response *http.Response) bool {
629 if response.Header.Get("content-type") == "text/event-stream" {
630 h.logger.Debug("Detected Server-Side Events from Origin")
631 return true
632 }
633 return false
634}
635
636func (h *TunnelHandler) logError(stream *h2mux.MuxedStream, err error) {
637 h.logger.WithError(err).Error("HTTP request error")
638 stream.WriteHeaders([]h2mux.Header{{Name: ":status", Value: "502"}})
639 stream.Write([]byte("502 Bad Gateway"))
640 h.metrics.incrementResponses(h.connectionID, "502")
641}
642
643func (h *TunnelHandler) logRequest(req *http.Request, cfRay string, lbProbe bool) {
644 logger := log.NewEntry(h.logger)
645 if cfRay != "" {
646 logger = logger.WithField("CF-RAY", cfRay)
647 logger.Debugf("%s %s %s", req.Method, req.URL, req.Proto)
648 } else if lbProbe {
649 logger.Debugf("Load Balancer health check %s %s %s", req.Method, req.URL, req.Proto)
650 } else {
651 logger.Warnf("All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s ", req.Method, req.URL, req.Proto)
652 }
653 logger.Debugf("Request Headers %+v", req.Header)
654
655 if contentLen := req.ContentLength; contentLen == -1 {
656 logger.Debugf("Request Content length unknown")
657 } else {
658 logger.Debugf("Request content length %d", contentLen)
659 }
660}
661
662func (h *TunnelHandler) logResponseOk(r *http.Response, cfRay string, lbProbe bool) {
663 h.metrics.incrementResponses(h.connectionID, "200")
664 logger := log.NewEntry(h.logger)
665 if cfRay != "" {
666 logger = logger.WithField("CF-RAY", cfRay)
667 logger.Debugf("%s", r.Status)
668 } else if lbProbe {
669 logger.Debugf("Response to Load Balancer health check %s", r.Status)
670 } else {
671 logger.Infof("%s", r.Status)
672 }
673 logger.Debugf("Response Headers %+v", r.Header)
674
675 if contentLen := r.ContentLength; contentLen == -1 {
676 logger.Debugf("Response content length unknown")
677 } else {
678 logger.Debugf("Response content length %d", contentLen)
679 }
680}
681
682func (h *TunnelHandler) UpdateMetrics(connectionID string) {
683 h.metrics.updateMuxerMetrics(connectionID, h.muxer.Metrics())
684}
685
686func uint8ToString(input uint8) string {
687 return strconv.FormatUint(uint64(input), 10)
688}
689
690// Print out the given lines in a nice ASCII box.
691func asciiBox(lines []string, padding int) (box []string) {
692 maxLen := maxLen(lines)
693 spacer := strings.Repeat(" ", padding)
694
695 border := "+" + strings.Repeat("-", maxLen+(padding*2)) + "+"
696
697 box = append(box, border)
698 for _, line := range lines {
699 box = append(box, "|"+spacer+line+strings.Repeat(" ", maxLen-len(line))+spacer+"|")
700 }
701 box = append(box, border)
702 return
703}
704
705func maxLen(lines []string) int {
706 max := 0
707 for _, line := range lines {
708 if len(line) > max {
709 max = len(line)
710 }
711 }
712 return max
713}
714
715func trialZoneMsg(url string) []string {
716 return []string{
717 "Your free tunnel has started! Visit it:",
718 " " + url,
719 }
720}
721
722func activeIncidentsMsg(incidents []Incident) string {
723 preamble := "There is an active Cloudflare incident that may be related:"
724 if len(incidents) > 1 {
725 preamble = "There are active Cloudflare incidents that may be related:"
726 }
727 incidentStrings := []string{}
728 for _, incident := range incidents {
729 incidentString := fmt.Sprintf("%s (%s)", incident.Name, incident.URL())
730 incidentStrings = append(incidentStrings, incidentString)
731 }
732 return preamble + " " + strings.Join(incidentStrings, "; ")
733
734}