cloudflare/cloudflared

Public

mirrored from https://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2021.12.4

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

connection/control.go

100lines · modecode

1package connection
2
3import (
4 "context"
5 "io"
6 "time"
7
8 "github.com/rs/zerolog"
9
10 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
11)
12
13// RPCClientFunc derives a named tunnel rpc client that can then be used to register and unregister connections.
14type RPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient
15
16type controlStream struct {
17 observer *Observer
18
19 connectedFuse ConnectedFuse
20 namedTunnelConfig *NamedTunnelConfig
21 connIndex uint8
22
23 newRPCClientFunc RPCClientFunc
24
25 gracefulShutdownC <-chan struct{}
26 gracePeriod time.Duration
27 stoppedGracefully bool
28}
29
30// ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown.
31type ControlStreamHandler interface {
32 ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, shouldWaitForUnregister bool) error
33 IsStopped() bool
34}
35
36// NewControlStream returns a new instance of ControlStreamHandler
37func NewControlStream(
38 observer *Observer,
39 connectedFuse ConnectedFuse,
40 namedTunnelConfig *NamedTunnelConfig,
41 connIndex uint8,
42 newRPCClientFunc RPCClientFunc,
43 gracefulShutdownC <-chan struct{},
44 gracePeriod time.Duration,
45) ControlStreamHandler {
46 if newRPCClientFunc == nil {
47 newRPCClientFunc = newRegistrationRPCClient
48 }
49 return &controlStream{
50 observer: observer,
51 connectedFuse: connectedFuse,
52 namedTunnelConfig: namedTunnelConfig,
53 newRPCClientFunc: newRPCClientFunc,
54 connIndex: connIndex,
55 gracefulShutdownC: gracefulShutdownC,
56 gracePeriod: gracePeriod,
57 }
58}
59
60func (c *controlStream) ServeControlStream(
61 ctx context.Context,
62 rw io.ReadWriteCloser,
63 connOptions *tunnelpogs.ConnectionOptions,
64 shouldWaitForUnregister bool,
65) error {
66 rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log)
67
68 if err := rpcClient.RegisterConnection(ctx, c.namedTunnelConfig, connOptions, c.connIndex, c.observer); err != nil {
69 rpcClient.Close()
70 return err
71 }
72 c.connectedFuse.Connected()
73
74 if shouldWaitForUnregister {
75 c.waitForUnregister(ctx, rpcClient)
76 } else {
77 go c.waitForUnregister(ctx, rpcClient)
78 }
79
80 return nil
81}
82
83func (c *controlStream) waitForUnregister(ctx context.Context, rpcClient NamedTunnelRPCClient) {
84 // wait for connection termination or start of graceful shutdown
85 defer rpcClient.Close()
86 select {
87 case <-ctx.Done():
88 break
89 case <-c.gracefulShutdownC:
90 c.stoppedGracefully = true
91 }
92
93 c.observer.sendUnregisteringEvent(c.connIndex)
94 rpcClient.GracefulShutdown(ctx, c.gracePeriod)
95 c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection")
96}
97
98func (c *controlStream) IsStopped() bool {
99 return c.stoppedGracefully
100}
101