cloudflare/pint

Public

mirrored fromhttps://github.com/cloudflare/pintAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v0.75.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

internal/promapi/failover.go

303lines · modecode

1package promapi
2
3import (
4 "context"
5 "errors"
6 "log/slog"
7 "regexp"
8 "slices"
9 "sync"
10 "time"
11
12 "github.com/prometheus/client_golang/prometheus"
13)
14
15type FailoverGroupError struct {
16 err error
17 uri string
18 isStrict bool
19}
20
21func (e *FailoverGroupError) Unwrap() error {
22 return e.err
23}
24
25func (e *FailoverGroupError) Error() string {
26 return e.err.Error()
27}
28
29func (e *FailoverGroupError) URI() string {
30 return e.uri
31}
32
33func (e *FailoverGroupError) IsStrict() bool {
34 return e.isStrict
35}
36
37func cacheCleaner(cache *queryCache, interval time.Duration, quit chan bool) {
38 ticker := time.NewTicker(interval)
39 for {
40 select {
41 case <-quit:
42 return
43 case <-ticker.C:
44 cache.gc()
45 }
46 }
47}
48
49type disabledChecks struct {
50 // Key is the name of the unsupported API, value is the list of checks disabled because of it.
51 apis map[string][]string
52 mtx sync.Mutex
53}
54
55func (dc *disabledChecks) disable(api, check string) {
56 dc.mtx.Lock()
57 if _, ok := dc.apis[api]; !ok {
58 dc.apis[api] = []string{}
59 }
60 if !slices.Contains(dc.apis[api], check) {
61 dc.apis[api] = append(dc.apis[api], check)
62 }
63 dc.mtx.Unlock()
64}
65
66func (dc *disabledChecks) read() map[string][]string {
67 dc.mtx.Lock()
68 defer dc.mtx.Unlock()
69 return dc.apis
70}
71
72type FailoverGroup struct {
73 disabledChecks disabledChecks
74
75 name string
76 uri string
77 servers []*Prometheus
78 uptimeMetric string
79 cacheCollector *cacheCollector
80 quitChan chan bool
81
82 pathsInclude []*regexp.Regexp
83 pathsExclude []*regexp.Regexp
84 tags []string
85 started bool
86 strictErrors bool
87}
88
89func NewFailoverGroup(name, uri string, servers []*Prometheus, strictErrors bool, uptimeMetric string, include, exclude []*regexp.Regexp, tags []string) *FailoverGroup {
90 return &FailoverGroup{ // nolint: exhaustruct
91 name: name,
92 uri: uri,
93 servers: servers,
94 strictErrors: strictErrors,
95 uptimeMetric: uptimeMetric,
96 pathsInclude: include,
97 pathsExclude: exclude,
98 tags: tags,
99 disabledChecks: disabledChecks{apis: map[string][]string{}}, // nolint: exhaustruct
100 }
101}
102
103func (fg *FailoverGroup) Name() string {
104 return fg.name
105}
106
107func (fg *FailoverGroup) URI() string {
108 return fg.uri
109}
110
111func (fg *FailoverGroup) DisableCheck(api, s string) {
112 fg.disabledChecks.disable(api, s)
113}
114
115func (fg *FailoverGroup) GetDisabledChecks() map[string][]string {
116 return fg.disabledChecks.read()
117}
118
119func (fg *FailoverGroup) Include() []string {
120 sl := []string{}
121 for _, re := range fg.pathsInclude {
122 sl = append(sl, re.String())
123 }
124 return sl
125}
126
127func (fg *FailoverGroup) Exclude() []string {
128 sl := []string{}
129 for _, re := range fg.pathsExclude {
130 sl = append(sl, re.String())
131 }
132 return sl
133}
134
135func (fg *FailoverGroup) Tags() []string {
136 return fg.tags
137}
138
139func (fg *FailoverGroup) UptimeMetric() string {
140 return fg.uptimeMetric
141}
142
143func (fg *FailoverGroup) ServerCount() int {
144 return len(fg.servers)
145}
146
147func (fg *FailoverGroup) MergeUpstreams(src *FailoverGroup) {
148 for _, ns := range src.servers {
149 var present bool
150 for _, ol := range fg.servers {
151 if ol.unsafeURI == ns.unsafeURI {
152 present = true
153 break
154 }
155 }
156 if !present {
157 fg.servers = append(fg.servers, ns)
158 slog.Debug(
159 "Added new failover URI",
160 slog.String("name", ns.name),
161 slog.String("uri", ns.safeURI),
162 )
163 }
164 }
165}
166
167func (fg *FailoverGroup) IsEnabledForPath(path string) bool {
168 if len(fg.pathsInclude) == 0 && len(fg.pathsExclude) == 0 {
169 return true
170 }
171 for _, re := range fg.pathsExclude {
172 if re.MatchString(path) {
173 return false
174 }
175 }
176 for _, re := range fg.pathsInclude {
177 if re.MatchString(path) {
178 return true
179 }
180 }
181 return false
182}
183
184func (fg *FailoverGroup) StartWorkers(reg *prometheus.Registry) {
185 if fg.started {
186 return
187 }
188
189 queryCache := newQueryCache(time.Hour, time.Now)
190 fg.quitChan = make(chan bool)
191 go cacheCleaner(queryCache, time.Minute*2, fg.quitChan)
192
193 fg.cacheCollector = newCacheCollector(queryCache, fg.name)
194 reg.MustRegister(fg.cacheCollector)
195 for _, prom := range fg.servers {
196 prom.cache = queryCache
197 prom.StartWorkers()
198 }
199 fg.started = true
200}
201
202func (fg *FailoverGroup) Close(reg *prometheus.Registry) {
203 if !fg.started {
204 return
205 }
206 for _, prom := range fg.servers {
207 prom.Close()
208 }
209 reg.Unregister(fg.cacheCollector)
210 fg.quitChan <- true
211}
212
213func (fg *FailoverGroup) CleanCache() {
214 for _, prom := range fg.servers {
215 if prom.cache != nil {
216 prom.cache.gc()
217 return
218 }
219 }
220}
221
222func (fg *FailoverGroup) Config(ctx context.Context, cacheTTL time.Duration) (cfg *ConfigResult, err error) {
223 var uri string
224 for _, prom := range fg.servers {
225 uri = prom.safeURI
226 cfg, err = prom.Config(ctx, cacheTTL)
227 if err == nil {
228 return cfg, nil
229 }
230 if !IsUnavailableError(err) && !errors.Is(err, ErrUnsupported) {
231 return nil, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
232 }
233 }
234 return nil, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
235}
236
237func (fg *FailoverGroup) Query(ctx context.Context, expr string) (qr *QueryResult, err error) {
238 var uri string
239 for try, prom := range fg.servers {
240 if try > 0 {
241 slog.Debug(
242 "Using failover URI",
243 slog.String("name", fg.name),
244 slog.Int("retry", try),
245 slog.String("uri", prom.safeURI),
246 )
247 }
248 uri = prom.safeURI
249 qr, err = prom.Query(ctx, expr)
250 if err == nil {
251 return qr, nil
252 }
253 if !IsUnavailableError(err) {
254 return qr, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
255 }
256 }
257 return nil, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
258}
259
260func (fg *FailoverGroup) RangeQuery(ctx context.Context, expr string, params RangeQueryTimes) (rqr *RangeQueryResult, err error) {
261 var uri string
262 for _, prom := range fg.servers {
263 uri = prom.safeURI
264 rqr, err = prom.RangeQuery(ctx, expr, params)
265 if err == nil {
266 return rqr, nil
267 }
268 if !IsUnavailableError(err) {
269 return rqr, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
270 }
271 }
272 return nil, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
273}
274
275func (fg *FailoverGroup) Metadata(ctx context.Context, metric string) (metadata *MetadataResult, err error) {
276 var uri string
277 for _, prom := range fg.servers {
278 uri = prom.safeURI
279 metadata, err = prom.Metadata(ctx, metric)
280 if err == nil {
281 return metadata, nil
282 }
283 if !IsUnavailableError(err) && !errors.Is(err, ErrUnsupported) {
284 return metadata, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
285 }
286 }
287 return nil, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
288}
289
290func (fg *FailoverGroup) Flags(ctx context.Context) (flags *FlagsResult, err error) {
291 var uri string
292 for _, prom := range fg.servers {
293 uri = prom.safeURI
294 flags, err = prom.Flags(ctx)
295 if err == nil {
296 return flags, nil
297 }
298 if !IsUnavailableError(err) && !errors.Is(err, ErrUnsupported) {
299 return nil, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
300 }
301 }
302 return nil, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
303}
304