cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2019.8.4

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

origin/tunnel.go

747lines · modecode

1package origin
2
3import (
4 "bufio"
5 "context"
6 "crypto/tls"
7 "fmt"
8 "io"
9 "net"
10 "net/http"
11 "net/url"
12 "strconv"
13 "strings"
14 "sync"
15 "time"
16
17 "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
18 "github.com/cloudflare/cloudflared/h2mux"
19 "github.com/cloudflare/cloudflared/signal"
20 "github.com/cloudflare/cloudflared/streamhandler"
21 "github.com/cloudflare/cloudflared/tunnelrpc"
22 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
23 "github.com/cloudflare/cloudflared/validation"
24 "github.com/cloudflare/cloudflared/websocket"
25
26 raven "github.com/getsentry/raven-go"
27 "github.com/google/uuid"
28 "github.com/pkg/errors"
29 "github.com/prometheus/client_golang/prometheus"
30 _ "github.com/prometheus/client_golang/prometheus"
31 log "github.com/sirupsen/logrus"
32 "golang.org/x/sync/errgroup"
33 rpc "zombiezen.com/go/capnproto2/rpc"
34)
35
36const (
37 dialTimeout = 15 * time.Second
38 openStreamTimeout = 30 * time.Second
39 lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
40 TagHeaderNamePrefix = "Cf-Warp-Tag-"
41 DuplicateConnectionError = "EDUPCONN"
42)
43
44type TunnelConfig struct {
45 BuildInfo *buildinfo.BuildInfo
46 ClientID string
47 ClientTlsConfig *tls.Config
48 CloseConnOnce *sync.Once // Used to close connectedSignal no more than once
49 CompressionQuality uint64
50 EdgeAddrs []string
51 GracePeriod time.Duration
52 HAConnections int
53 HTTPTransport http.RoundTripper
54 HeartbeatInterval time.Duration
55 Hostname string
56 HTTPHostHeader string
57 IncidentLookup IncidentLookup
58 IsAutoupdated bool
59 IsFreeTunnel bool
60 LBPool string
61 Logger *log.Logger
62 MaxHeartbeats uint64
63 Metrics *TunnelMetrics
64 MetricsUpdateFreq time.Duration
65 NoChunkedEncoding bool
66 OriginCert []byte
67 ReportedVersion string
68 Retries uint
69 RunFromTerminal bool
70 Tags []tunnelpogs.Tag
71 TlsConfig *tls.Config
72 TransportLogger *log.Logger
73 UseDeclarativeTunnel bool
74 WSGI bool
75 // OriginUrl may not be used if a user specifies a unix socket.
76 OriginUrl string
77}
78
79type dialError struct {
80 cause error
81}
82
83func (e dialError) Error() string {
84 return e.cause.Error()
85}
86
87type dupConnRegisterTunnelError struct{}
88
89func (e dupConnRegisterTunnelError) Error() string {
90 return "already connected to this server"
91}
92
93type muxerShutdownError struct{}
94
95func (e muxerShutdownError) Error() string {
96 return "muxer shutdown"
97}
98
99// RegisterTunnel error from server
100type serverRegisterTunnelError struct {
101 cause error
102 permanent bool
103}
104
105func (e serverRegisterTunnelError) Error() string {
106 return e.cause.Error()
107}
108
109// RegisterTunnel error from client
110type clientRegisterTunnelError struct {
111 cause error
112}
113
114func newClientRegisterTunnelError(cause error, counter *prometheus.CounterVec) clientRegisterTunnelError {
115 counter.WithLabelValues(cause.Error()).Inc()
116 return clientRegisterTunnelError{cause: cause}
117}
118
119func (e clientRegisterTunnelError) Error() string {
120 return e.cause.Error()
121}
122
123func (c *TunnelConfig) RegistrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions {
124 policy := tunnelrpc.ExistingTunnelPolicy_balance
125 if c.HAConnections <= 1 && c.LBPool == "" {
126 policy = tunnelrpc.ExistingTunnelPolicy_disconnect
127 }
128 return &tunnelpogs.RegistrationOptions{
129 ClientID: c.ClientID,
130 Version: c.ReportedVersion,
131 OS: fmt.Sprintf("%s_%s", c.BuildInfo.GoOS, c.BuildInfo.GoArch),
132 ExistingTunnelPolicy: policy,
133 PoolName: c.LBPool,
134 Tags: c.Tags,
135 ConnectionID: connectionID,
136 OriginLocalIP: OriginLocalIP,
137 IsAutoupdated: c.IsAutoupdated,
138 RunFromTerminal: c.RunFromTerminal,
139 CompressionQuality: c.CompressionQuality,
140 UUID: uuid.String(),
141 }
142}
143
144func StartTunnelDaemon(ctx context.Context, config *TunnelConfig, connectedSignal *signal.Signal, cloudflaredID uuid.UUID) error {
145 return NewSupervisor(config).Run(ctx, connectedSignal, cloudflaredID)
146}
147
148func ServeTunnelLoop(ctx context.Context,
149 config *TunnelConfig,
150 addr *net.TCPAddr,
151 connectionID uint8,
152 connectedSignal *signal.Signal,
153 u uuid.UUID,
154) error {
155 logger := config.Logger
156 config.Metrics.incrementHaConnections()
157 defer config.Metrics.decrementHaConnections()
158 backoff := BackoffHandler{MaxRetries: config.Retries}
159 connectedFuse := h2mux.NewBooleanFuse()
160 go func() {
161 if connectedFuse.Await() {
162 connectedSignal.Notify()
163 }
164 }()
165 // Ensure the above goroutine will terminate if we return without connecting
166 defer connectedFuse.Fuse(false)
167 for {
168 err, recoverable := ServeTunnel(ctx, config, addr, connectionID, connectedFuse, &backoff, u)
169 if recoverable {
170 if duration, ok := backoff.GetBackoffDuration(ctx); ok {
171 logger.Infof("Retrying in %s seconds", duration)
172 backoff.Backoff(ctx)
173 continue
174 }
175 }
176 return err
177 }
178}
179
180func ServeTunnel(
181 ctx context.Context,
182 config *TunnelConfig,
183 addr *net.TCPAddr,
184 connectionID uint8,
185 connectedFuse *h2mux.BooleanFuse,
186 backoff *BackoffHandler,
187 u uuid.UUID,
188) (err error, recoverable bool) {
189 // Treat panics as recoverable errors
190 defer func() {
191 if r := recover(); r != nil {
192 var ok bool
193 err, ok = r.(error)
194 if !ok {
195 err = fmt.Errorf("ServeTunnel: %v", r)
196 }
197 recoverable = true
198 }
199 }()
200
201 connectionTag := uint8ToString(connectionID)
202 logger := config.Logger.WithField("connectionID", connectionTag)
203
204 // additional tags to send other than hostname which is set in cloudflared main package
205 tags := make(map[string]string)
206 tags["ha"] = connectionTag
207
208 // Returns error from parsing the origin URL or handshake errors
209 handler, originLocalIP, err := NewTunnelHandler(ctx, config, addr.String(), connectionID)
210 if err != nil {
211 errLog := config.Logger.WithError(err)
212 switch err.(type) {
213 case dialError:
214 errLog.Error("Unable to dial edge")
215 case h2mux.MuxerHandshakeError:
216 errLog.Error("Handshake failed with edge server")
217 default:
218 errLog.Error("Tunnel creation failure")
219 return err, false
220 }
221 return err, true
222 }
223
224 errGroup, serveCtx := errgroup.WithContext(ctx)
225
226 errGroup.Go(func() error {
227 err := RegisterTunnel(serveCtx, handler.muxer, config, connectionID, originLocalIP, u)
228 if err == nil {
229 connectedFuse.Fuse(true)
230 backoff.SetGracePeriod()
231 }
232 return err
233 })
234
235 errGroup.Go(func() error {
236 updateMetricsTickC := time.Tick(config.MetricsUpdateFreq)
237 for {
238 select {
239 case <-serveCtx.Done():
240 // UnregisterTunnel blocks until the RPC call returns
241 err := UnregisterTunnel(handler.muxer, config.GracePeriod, config.TransportLogger)
242 handler.muxer.Shutdown()
243 return err
244 case <-updateMetricsTickC:
245 handler.UpdateMetrics(connectionTag)
246 }
247 }
248 })
249
250 errGroup.Go(func() error {
251 // All routines should stop when muxer finish serving. When muxer is shutdown
252 // gracefully, it doesn't return an error, so we need to return errMuxerShutdown
253 // here to notify other routines to stop
254 err := handler.muxer.Serve(serveCtx)
255 if err == nil {
256 return muxerShutdownError{}
257 }
258 return err
259 })
260
261 err = errGroup.Wait()
262 if err != nil {
263 switch castedErr := err.(type) {
264 case dupConnRegisterTunnelError:
265 logger.Info("Already connected to this server, selecting a different one")
266 return err, true
267 case serverRegisterTunnelError:
268 logger.WithError(castedErr.cause).Error("Register tunnel error from server side")
269 // Don't send registration error return from server to Sentry. They are
270 // logged on server side
271 if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 {
272 logger.Error(activeIncidentsMsg(incidents))
273 }
274 return castedErr.cause, !castedErr.permanent
275 case clientRegisterTunnelError:
276 logger.WithError(castedErr.cause).Error("Register tunnel error on client side")
277 raven.CaptureError(castedErr.cause, tags)
278 return err, true
279 case muxerShutdownError:
280 logger.Infof("Muxer shutdown")
281 return err, true
282 default:
283 logger.WithError(err).Error("Serve tunnel error")
284 raven.CaptureError(err, tags)
285 return err, true
286 }
287 }
288 return nil, true
289}
290
291func IsRPCStreamResponse(headers []h2mux.Header) bool {
292 if len(headers) != 1 {
293 return false
294 }
295 if headers[0].Name != ":status" || headers[0].Value != "200" {
296 return false
297 }
298 return true
299}
300
301func RegisterTunnel(
302 ctx context.Context,
303 muxer *h2mux.Muxer,
304 config *TunnelConfig,
305 connectionID uint8,
306 originLocalIP string,
307 uuid uuid.UUID,
308) error {
309 config.TransportLogger.Debug("initiating RPC stream to register")
310 stream, err := openStream(ctx, muxer)
311 if err != nil {
312 // RPC stream open error
313 return newClientRegisterTunnelError(err, config.Metrics.rpcFail)
314 }
315 if !IsRPCStreamResponse(stream.Headers) {
316 // stream response error
317 return newClientRegisterTunnelError(err, config.Metrics.rpcFail)
318 }
319 conn := rpc.NewConn(
320 tunnelrpc.NewTransportLogger(config.TransportLogger.WithField("subsystem", "rpc-register"), rpc.StreamTransport(stream)),
321 tunnelrpc.ConnLog(config.TransportLogger.WithField("subsystem", "rpc-transport")),
322 )
323 defer conn.Close()
324 ts := tunnelpogs.TunnelServer_PogsClient{Client: conn.Bootstrap(ctx)}
325 // Request server info without blocking tunnel registration; must use capnp library directly.
326 tsClient := tunnelrpc.TunnelServer{Client: ts.Client}
327 serverInfoPromise := tsClient.GetServerInfo(ctx, func(tunnelrpc.TunnelServer_getServerInfo_Params) error {
328 return nil
329 })
330 registration, err := ts.RegisterTunnel(
331 ctx,
332 config.OriginCert,
333 config.Hostname,
334 config.RegistrationOptions(connectionID, originLocalIP, uuid),
335 )
336 LogServerInfo(serverInfoPromise.Result(), connectionID, config.Metrics, config.Logger)
337 if err != nil {
338 // RegisterTunnel RPC failure
339 return newClientRegisterTunnelError(err, config.Metrics.regFail)
340 }
341 for _, logLine := range registration.LogLines {
342 config.Logger.Info(logLine)
343 }
344
345 if regErr := processRegisterTunnelError(registration.Err, registration.PermanentFailure, config.Metrics); regErr != nil {
346 return regErr
347 }
348
349 if registration.TunnelID != "" {
350 config.Metrics.tunnelsHA.AddTunnelID(connectionID, registration.TunnelID)
351 config.Logger.Infof("Each HA connection's tunnel IDs: %v", config.Metrics.tunnelsHA.String())
352 }
353
354 // Print out the user's trial zone URL in a nice box (if they requested and got one)
355 if isTrialTunnel := config.Hostname == ""; isTrialTunnel {
356 if url, err := url.Parse(registration.Url); err == nil {
357 for _, line := range asciiBox(trialZoneMsg(url.String()), 2) {
358 config.Logger.Infoln(line)
359 }
360 } else {
361 config.Logger.Errorln("Failed to connect tunnel, please try again.")
362 return fmt.Errorf("empty URL in response from Cloudflare edge")
363 }
364 }
365
366 config.Metrics.userHostnamesCounts.WithLabelValues(registration.Url).Inc()
367
368 config.Logger.Infof("Route propagating, it may take up to 1 minute for your new route to become functional")
369 return nil
370}
371
372func processRegisterTunnelError(err string, permanentFailure bool, metrics *TunnelMetrics) error {
373 if err == "" {
374 metrics.regSuccess.Inc()
375 return nil
376 }
377
378 metrics.regFail.WithLabelValues(err).Inc()
379 if err == DuplicateConnectionError {
380 return dupConnRegisterTunnelError{}
381 }
382 return serverRegisterTunnelError{
383 cause: fmt.Errorf("Server error: %s", err),
384 permanent: permanentFailure,
385 }
386}
387
388func UnregisterTunnel(muxer *h2mux.Muxer, gracePeriod time.Duration, logger *log.Logger) error {
389 logger.Debug("initiating RPC stream to unregister")
390 ctx := context.Background()
391 stream, err := openStream(ctx, muxer)
392 if err != nil {
393 // RPC stream open error
394 return err
395 }
396 if !IsRPCStreamResponse(stream.Headers) {
397 // stream response error
398 return err
399 }
400 conn := rpc.NewConn(
401 tunnelrpc.NewTransportLogger(logger.WithField("subsystem", "rpc-unregister"), rpc.StreamTransport(stream)),
402 tunnelrpc.ConnLog(logger.WithField("subsystem", "rpc-transport")),
403 )
404 defer conn.Close()
405 ts := tunnelpogs.TunnelServer_PogsClient{Client: conn.Bootstrap(ctx)}
406 // gracePeriod is encoded in int64 using capnproto
407 return ts.UnregisterTunnel(ctx, gracePeriod.Nanoseconds())
408}
409
410func openStream(ctx context.Context, muxer *h2mux.Muxer) (*h2mux.MuxedStream, error) {
411 openStreamCtx, cancel := context.WithTimeout(ctx, openStreamTimeout)
412 defer cancel()
413 return muxer.OpenStream(openStreamCtx, []h2mux.Header{
414 {Name: ":method", Value: "RPC"},
415 {Name: ":scheme", Value: "capnp"},
416 {Name: ":path", Value: "*"},
417 }, nil)
418}
419
420func LogServerInfo(
421 promise tunnelrpc.ServerInfo_Promise,
422 connectionID uint8,
423 metrics *TunnelMetrics,
424 logger *log.Logger,
425) {
426 serverInfoMessage, err := promise.Struct()
427 if err != nil {
428 logger.WithError(err).Warn("Failed to retrieve server information")
429 return
430 }
431 serverInfo, err := tunnelpogs.UnmarshalServerInfo(serverInfoMessage)
432 if err != nil {
433 logger.WithError(err).Warn("Failed to retrieve server information")
434 return
435 }
436 logger.Infof("Connected to %s", serverInfo.LocationName)
437 metrics.registerServerLocation(uint8ToString(connectionID), serverInfo.LocationName)
438}
439
440func H1ResponseToH2Response(h1 *http.Response) (h2 []h2mux.Header) {
441 h2 = []h2mux.Header{{Name: ":status", Value: fmt.Sprintf("%d", h1.StatusCode)}}
442 for headerName, headerValues := range h1.Header {
443 for _, headerValue := range headerValues {
444 h2 = append(h2, h2mux.Header{Name: strings.ToLower(headerName), Value: headerValue})
445 }
446 }
447 return
448}
449
450type TunnelHandler struct {
451 originUrl string
452 httpHostHeader string
453 muxer *h2mux.Muxer
454 httpClient http.RoundTripper
455 tlsConfig *tls.Config
456 tags []tunnelpogs.Tag
457 metrics *TunnelMetrics
458 // connectionID is only used by metrics, and prometheus requires labels to be string
459 connectionID string
460 logger *log.Logger
461 noChunkedEncoding bool
462}
463
464var dialer = net.Dialer{DualStack: true}
465
466// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error
467func NewTunnelHandler(ctx context.Context,
468 config *TunnelConfig,
469 addr string,
470 connectionID uint8,
471) (*TunnelHandler, string, error) {
472 originURL, err := validation.ValidateUrl(config.OriginUrl)
473 if err != nil {
474 return nil, "", fmt.Errorf("unable to parse origin URL %#v", originURL)
475 }
476 h := &TunnelHandler{
477 originUrl: originURL,
478 httpHostHeader: config.HTTPHostHeader,
479 httpClient: config.HTTPTransport,
480 tlsConfig: config.ClientTlsConfig,
481 tags: config.Tags,
482 metrics: config.Metrics,
483 connectionID: uint8ToString(connectionID),
484 logger: config.Logger,
485 noChunkedEncoding: config.NoChunkedEncoding,
486 }
487 if h.httpClient == nil {
488 h.httpClient = http.DefaultTransport
489 }
490 // Inherit from parent context so we can cancel (Ctrl-C) while dialing
491 dialCtx, dialCancel := context.WithTimeout(ctx, dialTimeout)
492 // TUN-92: enforce a timeout on dial and handshake (as tls.Dial does not support one)
493 plaintextEdgeConn, err := dialer.DialContext(dialCtx, "tcp", addr)
494 dialCancel()
495 if err != nil {
496 return nil, "", dialError{cause: errors.Wrap(err, "DialContext error")}
497 }
498 edgeConn := tls.Client(plaintextEdgeConn, config.TlsConfig)
499 edgeConn.SetDeadline(time.Now().Add(dialTimeout))
500 err = edgeConn.Handshake()
501 if err != nil {
502 return nil, "", dialError{cause: errors.Wrap(err, "Handshake with edge error")}
503 }
504 // clear the deadline on the conn; h2mux has its own timeouts
505 edgeConn.SetDeadline(time.Time{})
506 // Establish a muxed connection with the edge
507 // Client mux handshake with agent server
508 h.muxer, err = h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
509 Timeout: 5 * time.Second,
510 Handler: h,
511 IsClient: true,
512 HeartbeatInterval: config.HeartbeatInterval,
513 MaxHeartbeats: config.MaxHeartbeats,
514 Logger: config.TransportLogger.WithFields(log.Fields{}),
515 CompressionQuality: h2mux.CompressionSetting(config.CompressionQuality),
516 })
517 if err != nil {
518 return h, "", errors.New("TLS handshake error")
519 }
520 return h, edgeConn.LocalAddr().String(), err
521}
522
523func (h *TunnelHandler) AppendTagHeaders(r *http.Request) {
524 for _, tag := range h.tags {
525 r.Header.Add(TagHeaderNamePrefix+tag.Name, tag.Value)
526 }
527}
528
529func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
530 h.metrics.incrementRequests(h.connectionID)
531 defer h.metrics.decrementConcurrentRequests(h.connectionID)
532
533 req, reqErr := h.createRequest(stream)
534 if reqErr != nil {
535 h.logError(stream, reqErr)
536 return reqErr
537 }
538
539 cfRay := streamhandler.FindCfRayHeader(req)
540 lbProbe := streamhandler.IsLBProbeRequest(req)
541 h.logRequest(req, cfRay, lbProbe)
542
543 var resp *http.Response
544 var respErr error
545 if websocket.IsWebSocketUpgrade(req) {
546 resp, respErr = h.serveWebsocket(stream, req)
547 } else {
548 resp, respErr = h.serveHTTP(stream, req)
549 }
550 if respErr != nil {
551 h.logError(stream, respErr)
552 return respErr
553 }
554 h.logResponseOk(resp, cfRay, lbProbe)
555 return nil
556}
557
558func (h *TunnelHandler) createRequest(stream *h2mux.MuxedStream) (*http.Request, error) {
559 req, err := http.NewRequest("GET", h.originUrl, h2mux.MuxedStreamReader{MuxedStream: stream})
560 if err != nil {
561 return nil, errors.Wrap(err, "Unexpected error from http.NewRequest")
562 }
563 err = streamhandler.H2RequestHeadersToH1Request(stream.Headers, req)
564 if err != nil {
565 return nil, errors.Wrap(err, "invalid request received")
566 }
567 h.AppendTagHeaders(req)
568 return req, nil
569}
570
571func (h *TunnelHandler) serveWebsocket(stream *h2mux.MuxedStream, req *http.Request) (*http.Response, error) {
572 if h.httpHostHeader != "" {
573 req.Header.Set("Host", h.httpHostHeader)
574 req.Host = h.httpHostHeader
575 }
576
577 conn, response, err := websocket.ClientConnect(req, h.tlsConfig)
578 if err != nil {
579 return nil, err
580 }
581 defer conn.Close()
582 err = stream.WriteHeaders(H1ResponseToH2Response(response))
583 if err != nil {
584 return nil, errors.Wrap(err, "Error writing response header")
585 }
586 // Copy to/from stream to the undelying connection. Use the underlying
587 // connection because cloudflared doesn't operate on the message themselves
588 websocket.Stream(conn.UnderlyingConn(), stream)
589 return response, nil
590}
591
592func (h *TunnelHandler) serveHTTP(stream *h2mux.MuxedStream, req *http.Request) (*http.Response, error) {
593 // Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
594 if h.noChunkedEncoding {
595 req.TransferEncoding = []string{"gzip", "deflate"}
596 cLength, err := strconv.Atoi(req.Header.Get("Content-Length"))
597 if err == nil {
598 req.ContentLength = int64(cLength)
599 }
600 }
601
602 // Request origin to keep connection alive to improve performance
603 req.Header.Set("Connection", "keep-alive")
604
605 if h.httpHostHeader != "" {
606 req.Header.Set("Host", h.httpHostHeader)
607 req.Host = h.httpHostHeader
608 }
609
610 response, err := h.httpClient.RoundTrip(req)
611 if err != nil {
612 return nil, errors.Wrap(err, "Error proxying request to origin")
613 }
614 defer response.Body.Close()
615
616 err = stream.WriteHeaders(H1ResponseToH2Response(response))
617 if err != nil {
618 return nil, errors.Wrap(err, "Error writing response header")
619 }
620 if h.isEventStream(response) {
621 h.writeEventStream(stream, response.Body)
622 } else {
623 // Use CopyBuffer, because Copy only allocates a 32KiB buffer, and cross-stream
624 // compression generates dictionary on first write
625 io.CopyBuffer(stream, response.Body, make([]byte, 512*1024))
626 }
627 return response, nil
628}
629
630func (h *TunnelHandler) writeEventStream(stream *h2mux.MuxedStream, responseBody io.ReadCloser) {
631 reader := bufio.NewReader(responseBody)
632 for {
633 line, err := reader.ReadBytes('\n')
634 if err != nil {
635 break
636 }
637 stream.Write(line)
638 }
639}
640
641func (h *TunnelHandler) isEventStream(response *http.Response) bool {
642 if response.Header.Get("content-type") == "text/event-stream" {
643 h.logger.Debug("Detected Server-Side Events from Origin")
644 return true
645 }
646 return false
647}
648
649func (h *TunnelHandler) logError(stream *h2mux.MuxedStream, err error) {
650 h.logger.WithError(err).Error("HTTP request error")
651 stream.WriteHeaders([]h2mux.Header{{Name: ":status", Value: "502"}})
652 stream.Write([]byte("502 Bad Gateway"))
653 h.metrics.incrementResponses(h.connectionID, "502")
654}
655
656func (h *TunnelHandler) logRequest(req *http.Request, cfRay string, lbProbe bool) {
657 logger := log.NewEntry(h.logger)
658 if cfRay != "" {
659 logger = logger.WithField("CF-RAY", cfRay)
660 logger.Debugf("%s %s %s", req.Method, req.URL, req.Proto)
661 } else if lbProbe {
662 logger.Debugf("Load Balancer health check %s %s %s", req.Method, req.URL, req.Proto)
663 } else {
664 logger.Warnf("All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s ", req.Method, req.URL, req.Proto)
665 }
666 logger.Debugf("Request Headers %+v", req.Header)
667
668 if contentLen := req.ContentLength; contentLen == -1 {
669 logger.Debugf("Request Content length unknown")
670 } else {
671 logger.Debugf("Request content length %d", contentLen)
672 }
673}
674
675func (h *TunnelHandler) logResponseOk(r *http.Response, cfRay string, lbProbe bool) {
676 h.metrics.incrementResponses(h.connectionID, "200")
677 logger := log.NewEntry(h.logger)
678 if cfRay != "" {
679 logger = logger.WithField("CF-RAY", cfRay)
680 logger.Debugf("%s", r.Status)
681 } else if lbProbe {
682 logger.Debugf("Response to Load Balancer health check %s", r.Status)
683 } else {
684 logger.Infof("%s", r.Status)
685 }
686 logger.Debugf("Response Headers %+v", r.Header)
687
688 if contentLen := r.ContentLength; contentLen == -1 {
689 logger.Debugf("Response content length unknown")
690 } else {
691 logger.Debugf("Response content length %d", contentLen)
692 }
693}
694
695func (h *TunnelHandler) UpdateMetrics(connectionID string) {
696 h.metrics.updateMuxerMetrics(connectionID, h.muxer.Metrics())
697}
698
699func uint8ToString(input uint8) string {
700 return strconv.FormatUint(uint64(input), 10)
701}
702
703// Print out the given lines in a nice ASCII box.
704func asciiBox(lines []string, padding int) (box []string) {
705 maxLen := maxLen(lines)
706 spacer := strings.Repeat(" ", padding)
707
708 border := "+" + strings.Repeat("-", maxLen+(padding*2)) + "+"
709
710 box = append(box, border)
711 for _, line := range lines {
712 box = append(box, "|"+spacer+line+strings.Repeat(" ", maxLen-len(line))+spacer+"|")
713 }
714 box = append(box, border)
715 return
716}
717
718func maxLen(lines []string) int {
719 max := 0
720 for _, line := range lines {
721 if len(line) > max {
722 max = len(line)
723 }
724 }
725 return max
726}
727
728func trialZoneMsg(url string) []string {
729 return []string{
730 "Your free tunnel has started! Visit it:",
731 " " + url,
732 }
733}
734
735func activeIncidentsMsg(incidents []Incident) string {
736 preamble := "There is an active Cloudflare incident that may be related:"
737 if len(incidents) > 1 {
738 preamble = "There are active Cloudflare incidents that may be related:"
739 }
740 incidentStrings := []string{}
741 for _, incident := range incidents {
742 incidentString := fmt.Sprintf("%s (%s)", incident.Name, incident.URL())
743 incidentStrings = append(incidentStrings, incidentString)
744 }
745 return preamble + " " + strings.Join(incidentStrings, "; ")
746
747}
748