cloudflare/pint

Public

mirrored from https://github.com/cloudflare/pintAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v0.75.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

internal/promapi/config.go

166lines · modecode

1package promapi
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "time"
12
13 v1 "github.com/prometheus/client_golang/api/prometheus/v1"
14 "github.com/prymitive/current"
15 "go.yaml.in/yaml/v3"
16)
17
18const (
19 APIPathConfig = "/api/v1/status/config"
20)
21
22type ConfigSectionGlobal struct {
23 ExternalLabels map[string]string `yaml:"external_labels"`
24 ScrapeInterval time.Duration `yaml:"scrape_interval"`
25 ScrapeTimeout time.Duration `yaml:"scrape_timeout"`
26 EvaluationInterval time.Duration `yaml:"evaluation_interval"`
27}
28
29type PrometheusConfig struct {
30 RuleFiles []string `yaml:"rule_files"`
31 Global ConfigSectionGlobal `yaml:"global"`
32}
33
34type ConfigResult struct {
35 URI string
36 Config PrometheusConfig
37}
38
39type configQuery struct {
40 prom *Prometheus
41 ctx context.Context
42 timestamp time.Time
43 cacheTTL time.Duration
44}
45
46func (q configQuery) Run() queryResult {
47 slog.Debug("Getting prometheus configuration", slog.String("uri", q.prom.safeURI))
48
49 ctx, cancel := q.prom.requestContext(q.ctx)
50 defer cancel()
51
52 var qr queryResult
53
54 args := url.Values{}
55 resp, err := q.prom.doRequest(ctx, http.MethodGet, q.Endpoint(), args)
56 if err != nil {
57 qr.err = fmt.Errorf("failed to query Prometheus config: %w", err)
58 return qr
59 }
60 defer resp.Body.Close()
61
62 if resp.StatusCode/100 != 2 {
63 qr.err = tryDecodingAPIError(resp)
64 return qr
65 }
66
67 qr.value, err = streamConfig(resp.Body)
68 if err != nil {
69 prometheusQueryErrorsTotal.WithLabelValues(q.prom.name, APIPathConfig, errReason(err)).Inc()
70 qr.err = fmt.Errorf("failed to decode config data in %s response: %w", q.prom.safeURI, err)
71 }
72 return qr
73}
74
75func (q configQuery) Endpoint() string {
76 return APIPathConfig
77}
78
79func (q configQuery) String() string {
80 return APIPathConfig
81}
82
83func (q configQuery) CacheKey() uint64 {
84 return hash(q.prom.unsafeURI, q.Endpoint())
85}
86
87func (q configQuery) CacheTTL() time.Duration {
88 return q.cacheTTL
89}
90
91func (prom *Prometheus) Config(ctx context.Context, cacheTTL time.Duration) (*ConfigResult, error) {
92 slog.Debug("Scheduling Prometheus configuration query", slog.String("uri", prom.safeURI))
93
94 prom.locker.lock(APIPathConfig)
95 defer prom.locker.unlock(APIPathConfig)
96
97 if cacheTTL == 0 {
98 cacheTTL = time.Minute
99 }
100
101 resultChan := make(chan queryResult)
102 prom.queries <- queryRequest{
103 query: configQuery{prom: prom, ctx: ctx, timestamp: time.Now(), cacheTTL: cacheTTL},
104 result: resultChan,
105 }
106
107 result := <-resultChan
108 if result.err != nil {
109 return nil, QueryError{err: result.err, msg: decodeError(result.err)}
110 }
111
112 r := ConfigResult{
113 URI: prom.publicURI,
114 Config: result.value.(PrometheusConfig),
115 }
116
117 return &r, nil
118}
119
120func streamConfig(r io.Reader) (cfg PrometheusConfig, err error) {
121 defer dummyReadAll(r)
122
123 var yamlBody, status, errType, errText string
124 errText = "empty response object"
125 decoder := current.Object(
126 current.Key("status", current.Value(func(s string, _ bool) {
127 status = s
128 })),
129 current.Key("error", current.Value(func(s string, _ bool) {
130 errText = s
131 })),
132 current.Key("errorType", current.Value(func(s string, _ bool) {
133 errType = s
134 })),
135 current.Key("data", current.Object(
136 current.Key("yaml", current.Value(func(s string, _ bool) {
137 yamlBody = s
138 })),
139 )),
140 )
141
142 dec := json.NewDecoder(r)
143 if err = decoder.Stream(dec); err != nil {
144 return cfg, APIError{Status: status, ErrorType: v1.ErrBadResponse, Err: fmt.Sprintf("JSON parse error: %s", err)}
145 }
146
147 if status != "success" {
148 return cfg, APIError{Status: status, ErrorType: decodeErrorType(errType), Err: errText}
149 }
150
151 if err = yaml.Unmarshal([]byte(yamlBody), &cfg); err != nil {
152 return cfg, err
153 }
154
155 if cfg.Global.ScrapeInterval == 0 {
156 cfg.Global.ScrapeInterval = time.Minute
157 }
158 if cfg.Global.ScrapeTimeout == 0 {
159 cfg.Global.ScrapeTimeout = time.Second * 10
160 }
161 if cfg.Global.EvaluationInterval == 0 {
162 cfg.Global.EvaluationInterval = time.Minute
163 }
164
165 return cfg, nil
166}
167