sui_proxy/
prom_to_mimir.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::remote_write;
4use crate::var;
5use itertools::Itertools;
6use prometheus::proto::{Counter, Gauge, Histogram, Metric, MetricFamily, MetricType};
7use tracing::{debug, error};
8
9#[derive(Debug)]
10pub struct Mimir<S> {
11    state: S,
12}
13
14impl From<&Metric> for Mimir<Vec<remote_write::Label>> {
15    fn from(m: &Metric) -> Self {
16        // we consume metric labels from an owned version so we can sort them
17        let mut m = m.to_owned();
18        let mut sorted = m.take_label();
19        sorted.sort_by(|a, b| {
20            (a.name(), a.value())
21                .partial_cmp(&(b.name(), b.value()))
22                .unwrap()
23        });
24        let mut r = Vec::<remote_write::Label>::new();
25        for label in sorted {
26            let lp = remote_write::Label {
27                name: label.name().into(),
28                value: label.value().into(),
29            };
30            r.push(lp);
31        }
32        Self { state: r }
33    }
34}
35
36impl IntoIterator for Mimir<Vec<remote_write::Label>> {
37    type Item = remote_write::Label;
38    type IntoIter = std::vec::IntoIter<Self::Item>;
39
40    fn into_iter(self) -> Self::IntoIter {
41        self.state.into_iter()
42    }
43}
44
45impl From<&Counter> for Mimir<remote_write::Sample> {
46    fn from(c: &Counter) -> Self {
47        Self {
48            state: remote_write::Sample {
49                value: c.value(),
50                ..Default::default()
51            },
52        }
53    }
54}
55impl From<&Gauge> for Mimir<remote_write::Sample> {
56    fn from(c: &Gauge) -> Self {
57        Self {
58            state: remote_write::Sample {
59                value: c.value(),
60                ..Default::default()
61            },
62        }
63    }
64}
65impl Mimir<remote_write::Sample> {
66    fn sample(self) -> remote_write::Sample {
67        self.state
68    }
69}
70
71/// TODO implement histogram
72impl From<&Histogram> for Mimir<remote_write::Histogram> {
73    fn from(_h: &Histogram) -> Self {
74        Self {
75            state: remote_write::Histogram::default(),
76        }
77    }
78}
79/// TODO implement histogram
80impl Mimir<remote_write::Histogram> {
81    #[allow(dead_code)]
82    fn histogram(self) -> remote_write::Histogram {
83        self.state
84    }
85}
86impl From<Vec<MetricFamily>> for Mimir<Vec<remote_write::WriteRequest>> {
87    fn from(metric_families: Vec<MetricFamily>) -> Self {
88        // we may have more but we'll have at least this many timeseries
89        let mut timeseries: Vec<remote_write::TimeSeries> =
90            Vec::with_capacity(metric_families.len());
91
92        for mf in metric_families {
93            // TOOD add From impl
94            let mt = match mf.get_field_type() {
95                MetricType::COUNTER => remote_write::metric_metadata::MetricType::Counter,
96                MetricType::GAUGE => remote_write::metric_metadata::MetricType::Gauge,
97                MetricType::HISTOGRAM => remote_write::metric_metadata::MetricType::Histogram,
98                MetricType::SUMMARY => remote_write::metric_metadata::MetricType::Summary,
99                MetricType::UNTYPED => remote_write::metric_metadata::MetricType::Unknown,
100            };
101
102            // filter out the types we don't support
103            match mt {
104                remote_write::metric_metadata::MetricType::Counter
105                | remote_write::metric_metadata::MetricType::Gauge => (),
106                other => {
107                    debug!("{:?} is not yet implemented, skipping metric", other);
108                    continue;
109                }
110            }
111
112            // TODO stop using state directly
113            timeseries.extend(Mimir::from(mf.clone()).state);
114        }
115
116        Self {
117            state: timeseries
118                .into_iter()
119                // the upstream remote_write should have a max sample size per request set to this number
120                .chunks(var!("MIMIR_MAX_SAMPLE_SIZE", 500))
121                .into_iter()
122                .map(|ts| remote_write::WriteRequest {
123                    timeseries: ts.collect(),
124                    ..Default::default()
125                })
126                .collect_vec(),
127        }
128    }
129}
130
131impl IntoIterator for Mimir<Vec<remote_write::WriteRequest>> {
132    type Item = remote_write::WriteRequest;
133    type IntoIter = std::vec::IntoIter<Self::Item>;
134
135    fn into_iter(self) -> Self::IntoIter {
136        self.state.into_iter()
137    }
138}
139
140impl Mimir<Vec<remote_write::TimeSeries>> {
141    pub fn repeated(self) -> Vec<remote_write::TimeSeries> {
142        self.state
143    }
144}
145
146impl From<MetricFamily> for Mimir<Vec<remote_write::TimeSeries>> {
147    fn from(mf: MetricFamily) -> Self {
148        let mut timeseries = vec![];
149        for metric in mf.get_metric() {
150            let mut ts = remote_write::TimeSeries::default();
151            ts.labels.extend(vec![
152                // mimir requires that we use __name__ as a key that points to a value
153                // of the metric name
154                remote_write::Label {
155                    name: "__name__".into(),
156                    value: mf.name().into(),
157                },
158            ]);
159            ts.labels
160                .extend(Mimir::<Vec<remote_write::Label>>::from(metric));
161
162            // assumption here is that since a MetricFamily will have one MetricType, we'll only need
163            // to look for one of these types.  Setting two different types on Metric at the same time
164            // in a way that is conflicting with the MetricFamily type will result in undefined mimir
165            // behavior, probably an error.
166            if metric.counter.is_some() {
167                let counter: &Counter = &metric.counter;
168                let mut s = Mimir::<remote_write::Sample>::from(counter).sample();
169                s.timestamp = metric.timestamp_ms();
170                ts.samples.push(s);
171            } else if metric.gauge.is_some() {
172                let gauge: &Gauge = &metric.gauge;
173                let mut s = Mimir::<remote_write::Sample>::from(gauge).sample();
174                s.timestamp = metric.timestamp_ms();
175                ts.samples.push(s);
176            } else if metric.histogram.is_some() {
177                // TODO implement
178                // ts.mut_histograms()
179                //     .push(Mimir::<remote_write::Histogram>::from(metric.get_histogram()).histogram());
180            } else if metric.summary.is_some() {
181                // TODO implement
182                error!("summary is not implemented for a metric type");
183            }
184            timeseries.push(ts);
185        }
186        Self { state: timeseries }
187    }
188}
189
190impl Mimir<remote_write::TimeSeries> {
191    pub fn timeseries(self) -> remote_write::TimeSeries {
192        self.state
193    }
194}
195
196#[cfg(test)]
197pub mod tests {
198    use crate::prom_to_mimir::Mimir;
199    use crate::remote_write;
200    use mysten_common::ZipDebugEqIteratorExt;
201    use prometheus::proto;
202
203    // protobuf stuff
204    pub fn create_metric_family(
205        name: &str,
206        help: &str,
207        field_type: Option<proto::MetricType>,
208        metric: Vec<proto::Metric>,
209    ) -> proto::MetricFamily {
210        let mut mf = proto::MetricFamily {
211            name: Some(name.into()),
212            help: Some(help.into()),
213            ..Default::default()
214        };
215        if let Some(ft) = field_type {
216            mf.set_field_type(ft);
217        }
218        mf.set_metric(metric);
219        mf
220    }
221    #[allow(dead_code)]
222    fn create_metric_gauge(labels: Vec<proto::LabelPair>, gauge: proto::Gauge) -> proto::Metric {
223        let mut m = proto::Metric::default();
224        m.set_label(labels);
225        m.set_gauge(gauge);
226        m.timestamp_ms = Some(12345);
227        m
228    }
229
230    pub fn create_metric_counter(
231        labels: Vec<proto::LabelPair>,
232        counter: proto::Counter,
233    ) -> proto::Metric {
234        let mut m = proto::Metric::default();
235        m.set_label(labels);
236        m.set_counter(counter);
237        m.timestamp_ms = Some(12345);
238        m
239    }
240
241    pub fn create_metric_histogram(
242        labels: Vec<proto::LabelPair>,
243        histogram: proto::Histogram,
244    ) -> proto::Metric {
245        let mut m = proto::Metric::default();
246        m.set_label(labels);
247        m.set_histogram(histogram);
248        m.timestamp_ms = Some(12345);
249        m
250    }
251
252    pub fn create_histogram() -> proto::Histogram {
253        let mut h = proto::Histogram::default();
254        h.set_sample_count(1);
255        h.set_sample_sum(1.0);
256        let mut b = proto::Bucket::default();
257        b.set_cumulative_count(1);
258        b.set_upper_bound(1.0);
259        h.bucket.push(b);
260        h
261    }
262
263    pub fn create_labels(labels: Vec<(&str, &str)>) -> Vec<proto::LabelPair> {
264        labels
265            .into_iter()
266            .map(|(key, value)| proto::LabelPair {
267                name: Some(key.into()),
268                value: Some(value.into()),
269                ..Default::default()
270            })
271            .collect()
272    }
273    #[allow(dead_code)]
274    fn create_gauge(value: f64) -> proto::Gauge {
275        proto::Gauge {
276            value: Some(value),
277            ..Default::default()
278        }
279    }
280
281    pub fn create_counter(value: f64) -> proto::Counter {
282        proto::Counter {
283            value: Some(value),
284            ..Default::default()
285        }
286    }
287
288    // end protobuf stuff
289
290    // mimir stuff
291    fn create_timeseries_with_samples(
292        labels: Vec<remote_write::Label>,
293        samples: Vec<remote_write::Sample>,
294    ) -> remote_write::TimeSeries {
295        remote_write::TimeSeries {
296            labels,
297            samples,
298            ..Default::default()
299        }
300    }
301    // end mimir stuff
302
303    #[test]
304    fn metricfamily_to_timeseries() {
305        let tests: Vec<(proto::MetricFamily, Vec<remote_write::TimeSeries>)> = vec![
306            (
307                create_metric_family(
308                    "test_gauge",
309                    "i'm a help message",
310                    Some(proto::MetricType::GAUGE),
311                    vec![create_metric_gauge(
312                        create_labels(vec![
313                            ("host", "local-test-validator"),
314                            ("network", "unittest-network"),
315                        ]),
316                        create_gauge(2046.0),
317                    )],
318                ),
319                vec![create_timeseries_with_samples(
320                    vec![
321                        remote_write::Label {
322                            name: "__name__".into(),
323                            value: "test_gauge".into(),
324                        },
325                        remote_write::Label {
326                            name: "host".into(),
327                            value: "local-test-validator".into(),
328                        },
329                        remote_write::Label {
330                            name: "network".into(),
331                            value: "unittest-network".into(),
332                        },
333                    ],
334                    vec![remote_write::Sample {
335                        value: 2046.0,
336                        timestamp: 12345,
337                    }],
338                )],
339            ),
340            (
341                create_metric_family(
342                    "test_counter",
343                    "i'm a help message",
344                    Some(proto::MetricType::GAUGE),
345                    vec![create_metric_counter(
346                        create_labels(vec![
347                            ("host", "local-test-validator"),
348                            ("network", "unittest-network"),
349                        ]),
350                        create_counter(2046.0),
351                    )],
352                ),
353                vec![create_timeseries_with_samples(
354                    vec![
355                        remote_write::Label {
356                            name: "__name__".into(),
357                            value: "test_counter".into(),
358                        },
359                        remote_write::Label {
360                            name: "host".into(),
361                            value: "local-test-validator".into(),
362                        },
363                        remote_write::Label {
364                            name: "network".into(),
365                            value: "unittest-network".into(),
366                        },
367                    ],
368                    vec![remote_write::Sample {
369                        value: 2046.0,
370                        timestamp: 12345,
371                    }],
372                )],
373            ),
374        ];
375        for (mf, expected_ts) in tests {
376            // TODO stop using state directly
377            for (actual, expected) in Mimir::from(mf).state.into_iter().zip_debug_eq(expected_ts) {
378                assert_eq!(actual.labels, expected.labels);
379                for (actual_sample, expected_sample) in
380                    actual.samples.into_iter().zip_debug_eq(expected.samples)
381                {
382                    assert_eq!(
383                        actual_sample.value, expected_sample.value,
384                        "sample values do not match"
385                    );
386
387                    // timestamps are injected on the sui-node and we copy it to our sample
388                    // make sure that works
389                    assert_eq!(
390                        actual_sample.timestamp, expected_sample.timestamp,
391                        "timestamp should be non-zero"
392                    );
393                }
394            }
395        }
396    }
397}