1use crate::remote_write;
4use crate::var;
5use itertools::Itertools;
6use prometheus::proto::{Counter, Gauge, Histogram, Metric, MetricFamily, MetricType};
7use tracing::{debug, error};
8
9#[derive(Debug)]
10pub struct Mimir<S> {
11 state: S,
12}
13
14impl From<&Metric> for Mimir<Vec<remote_write::Label>> {
15 fn from(m: &Metric) -> Self {
16 let mut m = m.to_owned();
18 let mut sorted = m.take_label();
19 sorted.sort_by(|a, b| {
20 (a.name(), a.value())
21 .partial_cmp(&(b.name(), b.value()))
22 .unwrap()
23 });
24 let mut r = Vec::<remote_write::Label>::new();
25 for label in sorted {
26 let lp = remote_write::Label {
27 name: label.name().into(),
28 value: label.value().into(),
29 };
30 r.push(lp);
31 }
32 Self { state: r }
33 }
34}
35
36impl IntoIterator for Mimir<Vec<remote_write::Label>> {
37 type Item = remote_write::Label;
38 type IntoIter = std::vec::IntoIter<Self::Item>;
39
40 fn into_iter(self) -> Self::IntoIter {
41 self.state.into_iter()
42 }
43}
44
45impl From<&Counter> for Mimir<remote_write::Sample> {
46 fn from(c: &Counter) -> Self {
47 Self {
48 state: remote_write::Sample {
49 value: c.value(),
50 ..Default::default()
51 },
52 }
53 }
54}
55impl From<&Gauge> for Mimir<remote_write::Sample> {
56 fn from(c: &Gauge) -> Self {
57 Self {
58 state: remote_write::Sample {
59 value: c.value(),
60 ..Default::default()
61 },
62 }
63 }
64}
65impl Mimir<remote_write::Sample> {
66 fn sample(self) -> remote_write::Sample {
67 self.state
68 }
69}
70
71impl From<&Histogram> for Mimir<remote_write::Histogram> {
73 fn from(_h: &Histogram) -> Self {
74 Self {
75 state: remote_write::Histogram::default(),
76 }
77 }
78}
79impl Mimir<remote_write::Histogram> {
81 #[allow(dead_code)]
82 fn histogram(self) -> remote_write::Histogram {
83 self.state
84 }
85}
86impl From<Vec<MetricFamily>> for Mimir<Vec<remote_write::WriteRequest>> {
87 fn from(metric_families: Vec<MetricFamily>) -> Self {
88 let mut timeseries: Vec<remote_write::TimeSeries> =
90 Vec::with_capacity(metric_families.len());
91
92 for mf in metric_families {
93 let mt = match mf.get_field_type() {
95 MetricType::COUNTER => remote_write::metric_metadata::MetricType::Counter,
96 MetricType::GAUGE => remote_write::metric_metadata::MetricType::Gauge,
97 MetricType::HISTOGRAM => remote_write::metric_metadata::MetricType::Histogram,
98 MetricType::SUMMARY => remote_write::metric_metadata::MetricType::Summary,
99 MetricType::UNTYPED => remote_write::metric_metadata::MetricType::Unknown,
100 };
101
102 match mt {
104 remote_write::metric_metadata::MetricType::Counter
105 | remote_write::metric_metadata::MetricType::Gauge => (),
106 other => {
107 debug!("{:?} is not yet implemented, skipping metric", other);
108 continue;
109 }
110 }
111
112 timeseries.extend(Mimir::from(mf.clone()).state);
114 }
115
116 Self {
117 state: timeseries
118 .into_iter()
119 .chunks(var!("MIMIR_MAX_SAMPLE_SIZE", 500))
121 .into_iter()
122 .map(|ts| remote_write::WriteRequest {
123 timeseries: ts.collect(),
124 ..Default::default()
125 })
126 .collect_vec(),
127 }
128 }
129}
130
131impl IntoIterator for Mimir<Vec<remote_write::WriteRequest>> {
132 type Item = remote_write::WriteRequest;
133 type IntoIter = std::vec::IntoIter<Self::Item>;
134
135 fn into_iter(self) -> Self::IntoIter {
136 self.state.into_iter()
137 }
138}
139
140impl Mimir<Vec<remote_write::TimeSeries>> {
141 pub fn repeated(self) -> Vec<remote_write::TimeSeries> {
142 self.state
143 }
144}
145
146impl From<MetricFamily> for Mimir<Vec<remote_write::TimeSeries>> {
147 fn from(mf: MetricFamily) -> Self {
148 let mut timeseries = vec![];
149 for metric in mf.get_metric() {
150 let mut ts = remote_write::TimeSeries::default();
151 ts.labels.extend(vec![
152 remote_write::Label {
155 name: "__name__".into(),
156 value: mf.name().into(),
157 },
158 ]);
159 ts.labels
160 .extend(Mimir::<Vec<remote_write::Label>>::from(metric));
161
162 if metric.counter.is_some() {
167 let counter: &Counter = &metric.counter;
168 let mut s = Mimir::<remote_write::Sample>::from(counter).sample();
169 s.timestamp = metric.timestamp_ms();
170 ts.samples.push(s);
171 } else if metric.gauge.is_some() {
172 let gauge: &Gauge = &metric.gauge;
173 let mut s = Mimir::<remote_write::Sample>::from(gauge).sample();
174 s.timestamp = metric.timestamp_ms();
175 ts.samples.push(s);
176 } else if metric.histogram.is_some() {
177 } else if metric.summary.is_some() {
181 error!("summary is not implemented for a metric type");
183 }
184 timeseries.push(ts);
185 }
186 Self { state: timeseries }
187 }
188}
189
190impl Mimir<remote_write::TimeSeries> {
191 pub fn timeseries(self) -> remote_write::TimeSeries {
192 self.state
193 }
194}
195
196#[cfg(test)]
197pub mod tests {
198 use crate::prom_to_mimir::Mimir;
199 use crate::remote_write;
200 use mysten_common::ZipDebugEqIteratorExt;
201 use prometheus::proto;
202
203 pub fn create_metric_family(
205 name: &str,
206 help: &str,
207 field_type: Option<proto::MetricType>,
208 metric: Vec<proto::Metric>,
209 ) -> proto::MetricFamily {
210 let mut mf = proto::MetricFamily {
211 name: Some(name.into()),
212 help: Some(help.into()),
213 ..Default::default()
214 };
215 if let Some(ft) = field_type {
216 mf.set_field_type(ft);
217 }
218 mf.set_metric(metric);
219 mf
220 }
221 #[allow(dead_code)]
222 fn create_metric_gauge(labels: Vec<proto::LabelPair>, gauge: proto::Gauge) -> proto::Metric {
223 let mut m = proto::Metric::default();
224 m.set_label(labels);
225 m.set_gauge(gauge);
226 m.timestamp_ms = Some(12345);
227 m
228 }
229
230 pub fn create_metric_counter(
231 labels: Vec<proto::LabelPair>,
232 counter: proto::Counter,
233 ) -> proto::Metric {
234 let mut m = proto::Metric::default();
235 m.set_label(labels);
236 m.set_counter(counter);
237 m.timestamp_ms = Some(12345);
238 m
239 }
240
241 pub fn create_metric_histogram(
242 labels: Vec<proto::LabelPair>,
243 histogram: proto::Histogram,
244 ) -> proto::Metric {
245 let mut m = proto::Metric::default();
246 m.set_label(labels);
247 m.set_histogram(histogram);
248 m.timestamp_ms = Some(12345);
249 m
250 }
251
252 pub fn create_histogram() -> proto::Histogram {
253 let mut h = proto::Histogram::default();
254 h.set_sample_count(1);
255 h.set_sample_sum(1.0);
256 let mut b = proto::Bucket::default();
257 b.set_cumulative_count(1);
258 b.set_upper_bound(1.0);
259 h.bucket.push(b);
260 h
261 }
262
263 pub fn create_labels(labels: Vec<(&str, &str)>) -> Vec<proto::LabelPair> {
264 labels
265 .into_iter()
266 .map(|(key, value)| proto::LabelPair {
267 name: Some(key.into()),
268 value: Some(value.into()),
269 ..Default::default()
270 })
271 .collect()
272 }
273 #[allow(dead_code)]
274 fn create_gauge(value: f64) -> proto::Gauge {
275 proto::Gauge {
276 value: Some(value),
277 ..Default::default()
278 }
279 }
280
281 pub fn create_counter(value: f64) -> proto::Counter {
282 proto::Counter {
283 value: Some(value),
284 ..Default::default()
285 }
286 }
287
288 fn create_timeseries_with_samples(
292 labels: Vec<remote_write::Label>,
293 samples: Vec<remote_write::Sample>,
294 ) -> remote_write::TimeSeries {
295 remote_write::TimeSeries {
296 labels,
297 samples,
298 ..Default::default()
299 }
300 }
301 #[test]
304 fn metricfamily_to_timeseries() {
305 let tests: Vec<(proto::MetricFamily, Vec<remote_write::TimeSeries>)> = vec![
306 (
307 create_metric_family(
308 "test_gauge",
309 "i'm a help message",
310 Some(proto::MetricType::GAUGE),
311 vec![create_metric_gauge(
312 create_labels(vec![
313 ("host", "local-test-validator"),
314 ("network", "unittest-network"),
315 ]),
316 create_gauge(2046.0),
317 )],
318 ),
319 vec![create_timeseries_with_samples(
320 vec![
321 remote_write::Label {
322 name: "__name__".into(),
323 value: "test_gauge".into(),
324 },
325 remote_write::Label {
326 name: "host".into(),
327 value: "local-test-validator".into(),
328 },
329 remote_write::Label {
330 name: "network".into(),
331 value: "unittest-network".into(),
332 },
333 ],
334 vec![remote_write::Sample {
335 value: 2046.0,
336 timestamp: 12345,
337 }],
338 )],
339 ),
340 (
341 create_metric_family(
342 "test_counter",
343 "i'm a help message",
344 Some(proto::MetricType::GAUGE),
345 vec![create_metric_counter(
346 create_labels(vec![
347 ("host", "local-test-validator"),
348 ("network", "unittest-network"),
349 ]),
350 create_counter(2046.0),
351 )],
352 ),
353 vec![create_timeseries_with_samples(
354 vec![
355 remote_write::Label {
356 name: "__name__".into(),
357 value: "test_counter".into(),
358 },
359 remote_write::Label {
360 name: "host".into(),
361 value: "local-test-validator".into(),
362 },
363 remote_write::Label {
364 name: "network".into(),
365 value: "unittest-network".into(),
366 },
367 ],
368 vec![remote_write::Sample {
369 value: 2046.0,
370 timestamp: 12345,
371 }],
372 )],
373 ),
374 ];
375 for (mf, expected_ts) in tests {
376 for (actual, expected) in Mimir::from(mf).state.into_iter().zip_debug_eq(expected_ts) {
378 assert_eq!(actual.labels, expected.labels);
379 for (actual_sample, expected_sample) in
380 actual.samples.into_iter().zip_debug_eq(expected.samples)
381 {
382 assert_eq!(
383 actual_sample.value, expected_sample.value,
384 "sample values do not match"
385 );
386
387 assert_eq!(
390 actual_sample.timestamp, expected_sample.timestamp,
391 "timestamp should be non-zero"
392 );
393 }
394 }
395 }
396 }
397}