cloudflare/cloudflared

Public

mirrored fromhttps://github.com/cloudflare/cloudflaredAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
2020.6.1

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

awsuploader/directory_upload_manager.go

106lines · modecode

1package awsuploader
2
3import (
4 "os"
5 "path/filepath"
6 "time"
7
8 "github.com/cloudflare/cloudflared/logger"
9)
10
11// DirectoryUploadManager is used to manage file uploads on an interval from a directory
12type DirectoryUploadManager struct {
13 logger logger.Service
14 uploader Uploader
15 rootDirectory string
16 sweepInterval time.Duration
17 ticker *time.Ticker
18 shutdownC chan struct{}
19 workQueue chan string
20}
21
22// NewDirectoryUploadManager create a new DirectoryUploadManager
23// uploader is an Uploader to use as an actual uploading engine
24// directory is the directory to sweep for files to upload
25// sweepInterval is how often to iterate the directory and upload the files within
26func NewDirectoryUploadManager(logger logger.Service, uploader Uploader, directory string, sweepInterval time.Duration, shutdownC chan struct{}) *DirectoryUploadManager {
27 workerCount := 10
28 manager := &DirectoryUploadManager{
29 logger: logger,
30 uploader: uploader,
31 rootDirectory: directory,
32 sweepInterval: sweepInterval,
33 shutdownC: shutdownC,
34 workQueue: make(chan string, workerCount),
35 }
36
37 //start workers
38 for i := 0; i < workerCount; i++ {
39 go manager.worker()
40 }
41
42 return manager
43}
44
45// Upload a file using the uploader
46// This is useful for "out of band" uploads that need to be triggered immediately instead of waiting for the sweep
47func (m *DirectoryUploadManager) Upload(filepath string) error {
48 return m.uploader.Upload(filepath)
49}
50
51// Start the upload ticker to walk the directories
52func (m *DirectoryUploadManager) Start() {
53 m.ticker = time.NewTicker(m.sweepInterval)
54 go m.run()
55}
56
57func (m *DirectoryUploadManager) run() {
58 for {
59 select {
60 case <-m.shutdownC:
61 m.ticker.Stop()
62 return
63 case <-m.ticker.C:
64 m.sweep()
65 }
66 }
67}
68
69// sweep the directory and kick off uploads
70func (m *DirectoryUploadManager) sweep() {
71 filepath.Walk(m.rootDirectory, func(path string, info os.FileInfo, err error) error {
72 if err != nil || info.IsDir() {
73 return nil
74 }
75 //30 days ago
76 retentionTime := 30 * (time.Hour * 24)
77 checkTime := time.Now().Add(-time.Duration(retentionTime))
78
79 //delete the file it is stale
80 if info.ModTime().Before(checkTime) {
81 os.Remove(path)
82 return nil
83 }
84 //add the upload to the work queue
85 go func() {
86 m.workQueue <- path
87 }()
88 return nil
89 })
90}
91
92// worker handles upload requests
93func (m *DirectoryUploadManager) worker() {
94 for {
95 select {
96 case <-m.shutdownC:
97 return
98 case filepath := <-m.workQueue:
99 if err := m.Upload(filepath); err != nil {
100 m.logger.Errorf("Cannot upload file to s3 bucket: %s", err)
101 } else {
102 os.Remove(filepath)
103 }
104 }
105 }
106}
107