cloudflare/pint

Public

mirrored fromhttps://github.com/cloudflare/pintAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v0.75.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

internal/promapi/metadata.go

137lines · modecode

1package promapi
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "time"
12
13 v1 "github.com/prometheus/client_golang/api/prometheus/v1"
14 "github.com/prymitive/current"
15)
16
17const (
18 APIPathMetadata = "/api/v1/metadata"
19)
20
21type MetadataResult struct {
22 URI string
23 Metadata []v1.Metadata
24}
25
26type metadataQuery struct {
27 timestamp time.Time
28 ctx context.Context
29 prom *Prometheus
30 metric string
31}
32
33func (q metadataQuery) Run() queryResult {
34 slog.Debug(
35 "Getting prometheus metrics metadata",
36 slog.String("uri", q.prom.safeURI),
37 slog.String("metric", q.metric),
38 )
39
40 ctx, cancel := q.prom.requestContext(q.ctx)
41 defer cancel()
42
43 var qr queryResult
44
45 args := url.Values{}
46 args.Set("metric", q.metric)
47 resp, err := q.prom.doRequest(ctx, http.MethodGet, q.Endpoint(), args)
48 if err != nil {
49 qr.err = fmt.Errorf("failed to query Prometheus metrics metadata: %w", err)
50 return qr
51 }
52 defer resp.Body.Close()
53
54 if resp.StatusCode/100 != 2 {
55 qr.err = tryDecodingAPIError(resp)
56 return qr
57 }
58
59 meta, err := streamMetadata(resp.Body)
60 qr.value, qr.err = meta, err
61 return qr
62}
63
64func (q metadataQuery) Endpoint() string {
65 return APIPathMetadata
66}
67
68func (q metadataQuery) String() string {
69 return q.metric
70}
71
72func (q metadataQuery) CacheKey() uint64 {
73 return hash(q.prom.unsafeURI, q.Endpoint(), q.metric)
74}
75
76func (q metadataQuery) CacheTTL() time.Duration {
77 return time.Minute * 10
78}
79
80func (prom *Prometheus) Metadata(ctx context.Context, metric string) (*MetadataResult, error) {
81 slog.Debug("Scheduling Prometheus metrics metadata query", slog.String("uri", prom.safeURI), slog.String("metric", metric))
82
83 key := APIPathMetadata + metric
84 prom.locker.lock(key)
85 defer prom.locker.unlock(key)
86
87 resultChan := make(chan queryResult)
88 prom.queries <- queryRequest{
89 query: metadataQuery{prom: prom, ctx: ctx, metric: metric, timestamp: time.Now()},
90 result: resultChan,
91 }
92
93 result := <-resultChan
94 if result.err != nil {
95 return nil, QueryError{err: result.err, msg: decodeError(result.err)}
96 }
97
98 metadata := MetadataResult{
99 URI: prom.publicURI,
100 Metadata: result.value.(map[string][]v1.Metadata)[metric],
101 }
102
103 return &metadata, nil
104}
105
106func streamMetadata(r io.Reader) (meta map[string][]v1.Metadata, err error) {
107 defer dummyReadAll(r)
108
109 var status, errType, errText string
110 errText = "empty response object"
111 meta = map[string][]v1.Metadata{}
112 decoder := current.Object(
113 current.Key("status", current.Value(func(s string, _ bool) {
114 status = s
115 })),
116 current.Key("error", current.Value(func(s string, _ bool) {
117 errText = s
118 })),
119 current.Key("errorType", current.Value(func(s string, _ bool) {
120 errType = s
121 })),
122 current.Key("data", current.Map(func(k string, v []v1.Metadata) {
123 meta[k] = v
124 })),
125 )
126
127 dec := json.NewDecoder(r)
128 if err = decoder.Stream(dec); err != nil {
129 return nil, APIError{Status: status, ErrorType: v1.ErrBadResponse, Err: fmt.Sprintf("JSON parse error: %s", err)}
130 }
131
132 if status != "success" {
133 return nil, APIError{Status: status, ErrorType: decodeErrorType(errType), Err: errText}
134 }
135
136 return meta, nil
137}
138