sui_proxy/
prom_to_mimir.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::remote_write;
4use crate::var;
5use itertools::Itertools;
6use prometheus::proto::{Counter, Gauge, Histogram, Metric, MetricFamily, MetricType};
7use tracing::{debug, error};
8
9#[derive(Debug)]
10pub struct Mimir<S> {
11    state: S,
12}
13
14impl From<&Metric> for Mimir<Vec<remote_write::Label>> {
15    fn from(m: &Metric) -> Self {
16        // we consume metric labels from an owned version so we can sort them
17        let mut m = m.to_owned();
18        let mut sorted = m.take_label();
19        sorted.sort_by(|a, b| {
20            (a.name(), a.value())
21                .partial_cmp(&(b.name(), b.value()))
22                .unwrap()
23        });
24        let mut r = Vec::<remote_write::Label>::new();
25        for label in sorted {
26            let lp = remote_write::Label {
27                name: label.name().into(),
28                value: label.value().into(),
29            };
30            r.push(lp);
31        }
32        Self { state: r }
33    }
34}
35
36impl IntoIterator for Mimir<Vec<remote_write::Label>> {
37    type Item = remote_write::Label;
38    type IntoIter = std::vec::IntoIter<Self::Item>;
39
40    fn into_iter(self) -> Self::IntoIter {
41        self.state.into_iter()
42    }
43}
44
45impl From<&Counter> for Mimir<remote_write::Sample> {
46    fn from(c: &Counter) -> Self {
47        Self {
48            state: remote_write::Sample {
49                value: c.value(),
50                ..Default::default()
51            },
52        }
53    }
54}
55impl From<&Gauge> for Mimir<remote_write::Sample> {
56    fn from(c: &Gauge) -> Self {
57        Self {
58            state: remote_write::Sample {
59                value: c.value(),
60                ..Default::default()
61            },
62        }
63    }
64}
65impl Mimir<remote_write::Sample> {
66    fn sample(self) -> remote_write::Sample {
67        self.state
68    }
69}
70
71/// TODO implement histogram
72impl From<&Histogram> for Mimir<remote_write::Histogram> {
73    fn from(_h: &Histogram) -> Self {
74        Self {
75            state: remote_write::Histogram::default(),
76        }
77    }
78}
79/// TODO implement histogram
80impl Mimir<remote_write::Histogram> {
81    #[allow(dead_code)]
82    fn histogram(self) -> remote_write::Histogram {
83        self.state
84    }
85}
86impl From<Vec<MetricFamily>> for Mimir<Vec<remote_write::WriteRequest>> {
87    fn from(metric_families: Vec<MetricFamily>) -> Self {
88        // we may have more but we'll have at least this many timeseries
89        let mut timeseries: Vec<remote_write::TimeSeries> =
90            Vec::with_capacity(metric_families.len());
91
92        for mf in metric_families {
93            // TOOD add From impl
94            let mt = match mf.get_field_type() {
95                MetricType::COUNTER => remote_write::metric_metadata::MetricType::Counter,
96                MetricType::GAUGE => remote_write::metric_metadata::MetricType::Gauge,
97                MetricType::HISTOGRAM => remote_write::metric_metadata::MetricType::Histogram,
98                MetricType::SUMMARY => remote_write::metric_metadata::MetricType::Summary,
99                MetricType::UNTYPED => remote_write::metric_metadata::MetricType::Unknown,
100            };
101
102            // filter out the types we don't support
103            match mt {
104                remote_write::metric_metadata::MetricType::Counter
105                | remote_write::metric_metadata::MetricType::Gauge => (),
106                other => {
107                    debug!("{:?} is not yet implemented, skipping metric", other);
108                    continue;
109                }
110            }
111
112            // TODO stop using state directly
113            timeseries.extend(Mimir::from(mf.clone()).state);
114        }
115
116        Self {
117            state: timeseries
118                .into_iter()
119                // the upstream remote_write should have a max sample size per request set to this number
120                .chunks(var!("MIMIR_MAX_SAMPLE_SIZE", 500))
121                .into_iter()
122                .map(|ts| remote_write::WriteRequest {
123                    timeseries: ts.collect(),
124                    ..Default::default()
125                })
126                .collect_vec(),
127        }
128    }
129}
130
131impl IntoIterator for Mimir<Vec<remote_write::WriteRequest>> {
132    type Item = remote_write::WriteRequest;
133    type IntoIter = std::vec::IntoIter<Self::Item>;
134
135    fn into_iter(self) -> Self::IntoIter {
136        self.state.into_iter()
137    }
138}
139
140impl Mimir<Vec<remote_write::TimeSeries>> {
141    pub fn repeated(self) -> Vec<remote_write::TimeSeries> {
142        self.state
143    }
144}
145
146impl From<MetricFamily> for Mimir<Vec<remote_write::TimeSeries>> {
147    fn from(mf: MetricFamily) -> Self {
148        let mut timeseries = vec![];
149        for metric in mf.get_metric() {
150            let mut ts = remote_write::TimeSeries::default();
151            ts.labels.extend(vec![
152                // mimir requires that we use __name__ as a key that points to a value
153                // of the metric name
154                remote_write::Label {
155                    name: "__name__".into(),
156                    value: mf.name().into(),
157                },
158            ]);
159            ts.labels
160                .extend(Mimir::<Vec<remote_write::Label>>::from(metric));
161
162            // assumption here is that since a MetricFamily will have one MetricType, we'll only need
163            // to look for one of these types.  Setting two different types on Metric at the same time
164            // in a way that is conflicting with the MetricFamily type will result in undefined mimir
165            // behavior, probably an error.
166            if metric.counter.is_some() {
167                let counter: &Counter = &metric.counter;
168                let mut s = Mimir::<remote_write::Sample>::from(counter).sample();
169                s.timestamp = metric.timestamp_ms();
170                ts.samples.push(s);
171            } else if metric.gauge.is_some() {
172                let gauge: &Gauge = &metric.gauge;
173                let mut s = Mimir::<remote_write::Sample>::from(gauge).sample();
174                s.timestamp = metric.timestamp_ms();
175                ts.samples.push(s);
176            } else if metric.histogram.is_some() {
177                // TODO implement
178                // ts.mut_histograms()
179                //     .push(Mimir::<remote_write::Histogram>::from(metric.get_histogram()).histogram());
180            } else if metric.summary.is_some() {
181                // TODO implement
182                error!("summary is not implemented for a metric type");
183            }
184            timeseries.push(ts);
185        }
186        Self { state: timeseries }
187    }
188}
189
190impl Mimir<remote_write::TimeSeries> {
191    pub fn timeseries(self) -> remote_write::TimeSeries {
192        self.state
193    }
194}
195
196#[cfg(test)]
197pub mod tests {
198    use crate::prom_to_mimir::Mimir;
199    use crate::remote_write;
200    use prometheus::proto;
201
202    // protobuf stuff
203    pub fn create_metric_family(
204        name: &str,
205        help: &str,
206        field_type: Option<proto::MetricType>,
207        metric: Vec<proto::Metric>,
208    ) -> proto::MetricFamily {
209        let mut mf = proto::MetricFamily {
210            name: Some(name.into()),
211            help: Some(help.into()),
212            ..Default::default()
213        };
214        if let Some(ft) = field_type {
215            mf.set_field_type(ft);
216        }
217        mf.set_metric(metric);
218        mf
219    }
220    #[allow(dead_code)]
221    fn create_metric_gauge(labels: Vec<proto::LabelPair>, gauge: proto::Gauge) -> proto::Metric {
222        let mut m = proto::Metric::default();
223        m.set_label(labels);
224        m.set_gauge(gauge);
225        m.timestamp_ms = Some(12345);
226        m
227    }
228
229    pub fn create_metric_counter(
230        labels: Vec<proto::LabelPair>,
231        counter: proto::Counter,
232    ) -> proto::Metric {
233        let mut m = proto::Metric::default();
234        m.set_label(labels);
235        m.set_counter(counter);
236        m.timestamp_ms = Some(12345);
237        m
238    }
239
240    pub fn create_metric_histogram(
241        labels: Vec<proto::LabelPair>,
242        histogram: proto::Histogram,
243    ) -> proto::Metric {
244        let mut m = proto::Metric::default();
245        m.set_label(labels);
246        m.set_histogram(histogram);
247        m.timestamp_ms = Some(12345);
248        m
249    }
250
251    pub fn create_histogram() -> proto::Histogram {
252        let mut h = proto::Histogram::default();
253        h.set_sample_count(1);
254        h.set_sample_sum(1.0);
255        let mut b = proto::Bucket::default();
256        b.set_cumulative_count(1);
257        b.set_upper_bound(1.0);
258        h.bucket.push(b);
259        h
260    }
261
262    pub fn create_labels(labels: Vec<(&str, &str)>) -> Vec<proto::LabelPair> {
263        labels
264            .into_iter()
265            .map(|(key, value)| proto::LabelPair {
266                name: Some(key.into()),
267                value: Some(value.into()),
268                ..Default::default()
269            })
270            .collect()
271    }
272    #[allow(dead_code)]
273    fn create_gauge(value: f64) -> proto::Gauge {
274        proto::Gauge {
275            value: Some(value),
276            ..Default::default()
277        }
278    }
279
280    pub fn create_counter(value: f64) -> proto::Counter {
281        proto::Counter {
282            value: Some(value),
283            ..Default::default()
284        }
285    }
286
287    // end protobuf stuff
288
289    // mimir stuff
290    fn create_timeseries_with_samples(
291        labels: Vec<remote_write::Label>,
292        samples: Vec<remote_write::Sample>,
293    ) -> remote_write::TimeSeries {
294        remote_write::TimeSeries {
295            labels,
296            samples,
297            ..Default::default()
298        }
299    }
300    // end mimir stuff
301
302    #[test]
303    fn metricfamily_to_timeseries() {
304        let tests: Vec<(proto::MetricFamily, Vec<remote_write::TimeSeries>)> = vec![
305            (
306                create_metric_family(
307                    "test_gauge",
308                    "i'm a help message",
309                    Some(proto::MetricType::GAUGE),
310                    vec![create_metric_gauge(
311                        create_labels(vec![
312                            ("host", "local-test-validator"),
313                            ("network", "unittest-network"),
314                        ]),
315                        create_gauge(2046.0),
316                    )],
317                ),
318                vec![create_timeseries_with_samples(
319                    vec![
320                        remote_write::Label {
321                            name: "__name__".into(),
322                            value: "test_gauge".into(),
323                        },
324                        remote_write::Label {
325                            name: "host".into(),
326                            value: "local-test-validator".into(),
327                        },
328                        remote_write::Label {
329                            name: "network".into(),
330                            value: "unittest-network".into(),
331                        },
332                    ],
333                    vec![remote_write::Sample {
334                        value: 2046.0,
335                        timestamp: 12345,
336                    }],
337                )],
338            ),
339            (
340                create_metric_family(
341                    "test_counter",
342                    "i'm a help message",
343                    Some(proto::MetricType::GAUGE),
344                    vec![create_metric_counter(
345                        create_labels(vec![
346                            ("host", "local-test-validator"),
347                            ("network", "unittest-network"),
348                        ]),
349                        create_counter(2046.0),
350                    )],
351                ),
352                vec![create_timeseries_with_samples(
353                    vec![
354                        remote_write::Label {
355                            name: "__name__".into(),
356                            value: "test_counter".into(),
357                        },
358                        remote_write::Label {
359                            name: "host".into(),
360                            value: "local-test-validator".into(),
361                        },
362                        remote_write::Label {
363                            name: "network".into(),
364                            value: "unittest-network".into(),
365                        },
366                    ],
367                    vec![remote_write::Sample {
368                        value: 2046.0,
369                        timestamp: 12345,
370                    }],
371                )],
372            ),
373        ];
374        for (mf, expected_ts) in tests {
375            // TODO stop using state directly
376            for (actual, expected) in Mimir::from(mf).state.into_iter().zip(expected_ts) {
377                assert_eq!(actual.labels, expected.labels);
378                for (actual_sample, expected_sample) in
379                    actual.samples.into_iter().zip(expected.samples)
380                {
381                    assert_eq!(
382                        actual_sample.value, expected_sample.value,
383                        "sample values do not match"
384                    );
385
386                    // timestamps are injected on the sui-node and we copy it to our sample
387                    // make sure that works
388                    assert_eq!(
389                        actual_sample.timestamp, expected_sample.timestamp,
390                        "timestamp should be non-zero"
391                    );
392                }
393            }
394        }
395    }
396}