sui_proxy/
prom_to_mimir.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::remote_write;
4use crate::var;
5use itertools::Itertools;
6use prometheus::proto::{Counter, Gauge, Histogram, Metric, MetricFamily, MetricType};
7use protobuf::RepeatedField;
8use tracing::{debug, error};
9
10#[derive(Debug)]
11pub struct Mimir<S> {
12    state: S,
13}
14
15impl From<&Metric> for Mimir<RepeatedField<remote_write::Label>> {
16    fn from(m: &Metric) -> Self {
17        // we consume metric labels from an owned version so we can sort them
18        let mut m = m.to_owned();
19        let mut sorted = m.take_label();
20        sorted.sort_by(|a, b| {
21            (a.get_name(), a.get_value())
22                .partial_cmp(&(b.get_name(), b.get_value()))
23                .unwrap()
24        });
25        let mut r = RepeatedField::<remote_write::Label>::default();
26        for label in sorted {
27            let lp = remote_write::Label {
28                name: label.get_name().into(),
29                value: label.get_value().into(),
30            };
31            r.push(lp);
32        }
33        Self { state: r }
34    }
35}
36
37impl IntoIterator for Mimir<RepeatedField<remote_write::Label>> {
38    type Item = remote_write::Label;
39    type IntoIter = std::vec::IntoIter<Self::Item>;
40
41    fn into_iter(self) -> Self::IntoIter {
42        self.state.into_iter()
43    }
44}
45
46impl From<&Counter> for Mimir<remote_write::Sample> {
47    fn from(c: &Counter) -> Self {
48        Self {
49            state: remote_write::Sample {
50                value: c.get_value(),
51                ..Default::default()
52            },
53        }
54    }
55}
56impl From<&Gauge> for Mimir<remote_write::Sample> {
57    fn from(c: &Gauge) -> Self {
58        Self {
59            state: remote_write::Sample {
60                value: c.get_value(),
61                ..Default::default()
62            },
63        }
64    }
65}
66impl Mimir<remote_write::Sample> {
67    fn sample(self) -> remote_write::Sample {
68        self.state
69    }
70}
71
72/// TODO implement histogram
73impl From<&Histogram> for Mimir<remote_write::Histogram> {
74    fn from(_h: &Histogram) -> Self {
75        Self {
76            state: remote_write::Histogram::default(),
77        }
78    }
79}
80/// TODO implement histogram
81impl Mimir<remote_write::Histogram> {
82    #[allow(dead_code)]
83    fn histogram(self) -> remote_write::Histogram {
84        self.state
85    }
86}
87impl From<Vec<MetricFamily>> for Mimir<Vec<remote_write::WriteRequest>> {
88    fn from(metric_families: Vec<MetricFamily>) -> Self {
89        // we may have more but we'll have at least this many timeseries
90        let mut timeseries: Vec<remote_write::TimeSeries> =
91            Vec::with_capacity(metric_families.len());
92
93        for mf in metric_families {
94            // TOOD add From impl
95            let mt = match mf.get_field_type() {
96                MetricType::COUNTER => remote_write::metric_metadata::MetricType::Counter,
97                MetricType::GAUGE => remote_write::metric_metadata::MetricType::Gauge,
98                MetricType::HISTOGRAM => remote_write::metric_metadata::MetricType::Histogram,
99                MetricType::SUMMARY => remote_write::metric_metadata::MetricType::Summary,
100                MetricType::UNTYPED => remote_write::metric_metadata::MetricType::Unknown,
101            };
102
103            // filter out the types we don't support
104            match mt {
105                remote_write::metric_metadata::MetricType::Counter
106                | remote_write::metric_metadata::MetricType::Gauge => (),
107                other => {
108                    debug!("{:?} is not yet implemented, skipping metric", other);
109                    continue;
110                }
111            }
112
113            // TODO stop using state directly
114            timeseries.extend(Mimir::from(mf.clone()).state);
115        }
116
117        Self {
118            state: timeseries
119                .into_iter()
120                // the upstream remote_write should have a max sample size per request set to this number
121                .chunks(var!("MIMIR_MAX_SAMPLE_SIZE", 500))
122                .into_iter()
123                .map(|ts| remote_write::WriteRequest {
124                    timeseries: ts.collect(),
125                    ..Default::default()
126                })
127                .collect_vec(),
128        }
129    }
130}
131
132impl IntoIterator for Mimir<Vec<remote_write::WriteRequest>> {
133    type Item = remote_write::WriteRequest;
134    type IntoIter = std::vec::IntoIter<Self::Item>;
135
136    fn into_iter(self) -> Self::IntoIter {
137        self.state.into_iter()
138    }
139}
140
141impl Mimir<RepeatedField<remote_write::TimeSeries>> {
142    pub fn repeated(self) -> RepeatedField<remote_write::TimeSeries> {
143        self.state
144    }
145}
146
147impl From<MetricFamily> for Mimir<Vec<remote_write::TimeSeries>> {
148    fn from(mf: MetricFamily) -> Self {
149        let mut timeseries = vec![];
150        for metric in mf.get_metric() {
151            let mut ts = remote_write::TimeSeries::default();
152            ts.labels.extend(vec![
153                // mimir requires that we use __name__ as a key that points to a value
154                // of the metric name
155                remote_write::Label {
156                    name: "__name__".into(),
157                    value: mf.get_name().into(),
158                },
159            ]);
160            ts.labels
161                .extend(Mimir::<RepeatedField<remote_write::Label>>::from(metric));
162
163            // assumption here is that since a MetricFamily will have one MetricType, we'll only need
164            // to look for one of these types.  Setting two different types on Metric at the same time
165            // in a way that is conflicting with the MetricFamily type will result in undefined mimir
166            // behavior, probably an error.
167            if metric.has_counter() {
168                let mut s = Mimir::<remote_write::Sample>::from(metric.get_counter()).sample();
169                s.timestamp = metric.get_timestamp_ms();
170                ts.samples.push(s);
171            } else if metric.has_gauge() {
172                let mut s = Mimir::<remote_write::Sample>::from(metric.get_gauge()).sample();
173                s.timestamp = metric.get_timestamp_ms();
174                ts.samples.push(s);
175            } else if metric.has_histogram() {
176                // TODO implement
177                // ts.mut_histograms()
178                //     .push(Mimir::<remote_write::Histogram>::from(metric.get_histogram()).histogram());
179            } else if metric.has_summary() {
180                // TODO implement
181                error!("summary is not implemented for a metric type");
182            }
183            timeseries.push(ts);
184        }
185        Self { state: timeseries }
186    }
187}
188
189impl Mimir<remote_write::TimeSeries> {
190    pub fn timeseries(self) -> remote_write::TimeSeries {
191        self.state
192    }
193}
194
195#[cfg(test)]
196pub mod tests {
197    use crate::prom_to_mimir::Mimir;
198    use crate::remote_write;
199    use prometheus::proto;
200    use protobuf::RepeatedField;
201
202    // protobuf stuff
203    pub fn create_metric_family(
204        name: &str,
205        help: &str,
206        field_type: Option<proto::MetricType>,
207        metric: RepeatedField<proto::Metric>,
208    ) -> proto::MetricFamily {
209        // no public fields, cannot use literals
210        let mut mf = proto::MetricFamily::default();
211        mf.set_name(name.into());
212        mf.set_help(help.into());
213        // TODO remove the metric type serialization if we still don't use it
214        // after implementing histogram and summary
215        if let Some(ft) = field_type {
216            mf.set_field_type(ft);
217        }
218        mf.set_metric(metric);
219        mf
220    }
221    #[allow(dead_code)]
222    fn create_metric_gauge(
223        labels: RepeatedField<proto::LabelPair>,
224        gauge: proto::Gauge,
225    ) -> proto::Metric {
226        let mut m = proto::Metric::default();
227        m.set_label(labels);
228        m.set_gauge(gauge);
229        m.set_timestamp_ms(12345);
230        m
231    }
232
233    pub fn create_metric_counter(
234        labels: RepeatedField<proto::LabelPair>,
235        counter: proto::Counter,
236    ) -> proto::Metric {
237        let mut m = proto::Metric::default();
238        m.set_label(labels);
239        m.set_counter(counter);
240        m.set_timestamp_ms(12345);
241        m
242    }
243
244    pub fn create_metric_histogram(
245        labels: RepeatedField<proto::LabelPair>,
246        histogram: proto::Histogram,
247    ) -> proto::Metric {
248        let mut m = proto::Metric::default();
249        m.set_label(labels);
250        m.set_histogram(histogram);
251        m.set_timestamp_ms(12345);
252        m
253    }
254
255    pub fn create_histogram() -> proto::Histogram {
256        let mut h = proto::Histogram::default();
257        h.set_sample_count(1);
258        h.set_sample_sum(1.0);
259        let mut b = proto::Bucket::default();
260        b.set_cumulative_count(1);
261        b.set_upper_bound(1.0);
262        h.mut_bucket().push(b);
263        h
264    }
265
266    pub fn create_labels(labels: Vec<(&str, &str)>) -> Vec<proto::LabelPair> {
267        labels
268            .into_iter()
269            .map(|(key, value)| {
270                let mut lp = proto::LabelPair::default();
271                lp.set_name(key.into());
272                lp.set_value(value.into());
273                lp
274            })
275            .collect()
276    }
277    #[allow(dead_code)]
278    fn create_gauge(value: f64) -> proto::Gauge {
279        let mut g = proto::Gauge::default();
280        g.set_value(value);
281        g
282    }
283
284    pub fn create_counter(value: f64) -> proto::Counter {
285        let mut c = proto::Counter::default();
286        c.set_value(value);
287        c
288    }
289
290    // end protobuf stuff
291
292    // mimir stuff
293    fn create_timeseries_with_samples(
294        labels: Vec<remote_write::Label>,
295        samples: Vec<remote_write::Sample>,
296    ) -> remote_write::TimeSeries {
297        remote_write::TimeSeries {
298            labels,
299            samples,
300            ..Default::default()
301        }
302    }
303    // end mimir stuff
304
305    #[test]
306    fn metricfamily_to_timeseries() {
307        let tests: Vec<(proto::MetricFamily, Vec<remote_write::TimeSeries>)> = vec![
308            (
309                create_metric_family(
310                    "test_gauge",
311                    "i'm a help message",
312                    Some(proto::MetricType::GAUGE),
313                    RepeatedField::from(vec![create_metric_gauge(
314                        RepeatedField::from_vec(create_labels(vec![
315                            ("host", "local-test-validator"),
316                            ("network", "unittest-network"),
317                        ])),
318                        create_gauge(2046.0),
319                    )]),
320                ),
321                vec![create_timeseries_with_samples(
322                    vec![
323                        remote_write::Label {
324                            name: "__name__".into(),
325                            value: "test_gauge".into(),
326                        },
327                        remote_write::Label {
328                            name: "host".into(),
329                            value: "local-test-validator".into(),
330                        },
331                        remote_write::Label {
332                            name: "network".into(),
333                            value: "unittest-network".into(),
334                        },
335                    ],
336                    vec![remote_write::Sample {
337                        value: 2046.0,
338                        timestamp: 12345,
339                    }],
340                )],
341            ),
342            (
343                create_metric_family(
344                    "test_counter",
345                    "i'm a help message",
346                    Some(proto::MetricType::GAUGE),
347                    RepeatedField::from(vec![create_metric_counter(
348                        RepeatedField::from_vec(create_labels(vec![
349                            ("host", "local-test-validator"),
350                            ("network", "unittest-network"),
351                        ])),
352                        create_counter(2046.0),
353                    )]),
354                ),
355                vec![create_timeseries_with_samples(
356                    vec![
357                        remote_write::Label {
358                            name: "__name__".into(),
359                            value: "test_counter".into(),
360                        },
361                        remote_write::Label {
362                            name: "host".into(),
363                            value: "local-test-validator".into(),
364                        },
365                        remote_write::Label {
366                            name: "network".into(),
367                            value: "unittest-network".into(),
368                        },
369                    ],
370                    vec![remote_write::Sample {
371                        value: 2046.0,
372                        timestamp: 12345,
373                    }],
374                )],
375            ),
376        ];
377        for (mf, expected_ts) in tests {
378            // TODO stop using state directly
379            for (actual, expected) in Mimir::from(mf).state.into_iter().zip(expected_ts) {
380                assert_eq!(actual.labels, expected.labels);
381                for (actual_sample, expected_sample) in
382                    actual.samples.into_iter().zip(expected.samples)
383                {
384                    assert_eq!(
385                        actual_sample.value, expected_sample.value,
386                        "sample values do not match"
387                    );
388
389                    // timestamps are injected on the sui-node and we copy it to our sample
390                    // make sure that works
391                    assert_eq!(
392                        actual_sample.timestamp, expected_sample.timestamp,
393                        "timestamp should be non-zero"
394                    );
395                }
396            }
397        }
398    }
399}