1use crate::remote_write;
4use crate::var;
5use itertools::Itertools;
6use prometheus::proto::{Counter, Gauge, Histogram, Metric, MetricFamily, MetricType};
7use tracing::{debug, error};
8
9#[derive(Debug)]
10pub struct Mimir<S> {
11 state: S,
12}
13
14impl From<&Metric> for Mimir<Vec<remote_write::Label>> {
15 fn from(m: &Metric) -> Self {
16 let mut m = m.to_owned();
18 let mut sorted = m.take_label();
19 sorted.sort_by(|a, b| {
20 (a.name(), a.value())
21 .partial_cmp(&(b.name(), b.value()))
22 .unwrap()
23 });
24 let mut r = Vec::<remote_write::Label>::new();
25 for label in sorted {
26 let lp = remote_write::Label {
27 name: label.name().into(),
28 value: label.value().into(),
29 };
30 r.push(lp);
31 }
32 Self { state: r }
33 }
34}
35
36impl IntoIterator for Mimir<Vec<remote_write::Label>> {
37 type Item = remote_write::Label;
38 type IntoIter = std::vec::IntoIter<Self::Item>;
39
40 fn into_iter(self) -> Self::IntoIter {
41 self.state.into_iter()
42 }
43}
44
45impl From<&Counter> for Mimir<remote_write::Sample> {
46 fn from(c: &Counter) -> Self {
47 Self {
48 state: remote_write::Sample {
49 value: c.value(),
50 ..Default::default()
51 },
52 }
53 }
54}
55impl From<&Gauge> for Mimir<remote_write::Sample> {
56 fn from(c: &Gauge) -> Self {
57 Self {
58 state: remote_write::Sample {
59 value: c.value(),
60 ..Default::default()
61 },
62 }
63 }
64}
65impl Mimir<remote_write::Sample> {
66 fn sample(self) -> remote_write::Sample {
67 self.state
68 }
69}
70
71impl From<&Histogram> for Mimir<remote_write::Histogram> {
73 fn from(_h: &Histogram) -> Self {
74 Self {
75 state: remote_write::Histogram::default(),
76 }
77 }
78}
79impl Mimir<remote_write::Histogram> {
81 #[allow(dead_code)]
82 fn histogram(self) -> remote_write::Histogram {
83 self.state
84 }
85}
86impl From<Vec<MetricFamily>> for Mimir<Vec<remote_write::WriteRequest>> {
87 fn from(metric_families: Vec<MetricFamily>) -> Self {
88 let mut timeseries: Vec<remote_write::TimeSeries> =
90 Vec::with_capacity(metric_families.len());
91
92 for mf in metric_families {
93 let mt = match mf.get_field_type() {
95 MetricType::COUNTER => remote_write::metric_metadata::MetricType::Counter,
96 MetricType::GAUGE => remote_write::metric_metadata::MetricType::Gauge,
97 MetricType::HISTOGRAM => remote_write::metric_metadata::MetricType::Histogram,
98 MetricType::SUMMARY => remote_write::metric_metadata::MetricType::Summary,
99 MetricType::UNTYPED => remote_write::metric_metadata::MetricType::Unknown,
100 };
101
102 match mt {
104 remote_write::metric_metadata::MetricType::Counter
105 | remote_write::metric_metadata::MetricType::Gauge => (),
106 other => {
107 debug!("{:?} is not yet implemented, skipping metric", other);
108 continue;
109 }
110 }
111
112 timeseries.extend(Mimir::from(mf.clone()).state);
114 }
115
116 Self {
117 state: timeseries
118 .into_iter()
119 .chunks(var!("MIMIR_MAX_SAMPLE_SIZE", 500))
121 .into_iter()
122 .map(|ts| remote_write::WriteRequest {
123 timeseries: ts.collect(),
124 ..Default::default()
125 })
126 .collect_vec(),
127 }
128 }
129}
130
131impl IntoIterator for Mimir<Vec<remote_write::WriteRequest>> {
132 type Item = remote_write::WriteRequest;
133 type IntoIter = std::vec::IntoIter<Self::Item>;
134
135 fn into_iter(self) -> Self::IntoIter {
136 self.state.into_iter()
137 }
138}
139
140impl Mimir<Vec<remote_write::TimeSeries>> {
141 pub fn repeated(self) -> Vec<remote_write::TimeSeries> {
142 self.state
143 }
144}
145
146impl From<MetricFamily> for Mimir<Vec<remote_write::TimeSeries>> {
147 fn from(mf: MetricFamily) -> Self {
148 let mut timeseries = vec![];
149 for metric in mf.get_metric() {
150 let mut ts = remote_write::TimeSeries::default();
151 ts.labels.extend(vec![
152 remote_write::Label {
155 name: "__name__".into(),
156 value: mf.name().into(),
157 },
158 ]);
159 ts.labels
160 .extend(Mimir::<Vec<remote_write::Label>>::from(metric));
161
162 if metric.counter.is_some() {
167 let counter: &Counter = &metric.counter;
168 let mut s = Mimir::<remote_write::Sample>::from(counter).sample();
169 s.timestamp = metric.timestamp_ms();
170 ts.samples.push(s);
171 } else if metric.gauge.is_some() {
172 let gauge: &Gauge = &metric.gauge;
173 let mut s = Mimir::<remote_write::Sample>::from(gauge).sample();
174 s.timestamp = metric.timestamp_ms();
175 ts.samples.push(s);
176 } else if metric.histogram.is_some() {
177 } else if metric.summary.is_some() {
181 error!("summary is not implemented for a metric type");
183 }
184 timeseries.push(ts);
185 }
186 Self { state: timeseries }
187 }
188}
189
190impl Mimir<remote_write::TimeSeries> {
191 pub fn timeseries(self) -> remote_write::TimeSeries {
192 self.state
193 }
194}
195
196#[cfg(test)]
197pub mod tests {
198 use crate::prom_to_mimir::Mimir;
199 use crate::remote_write;
200 use prometheus::proto;
201
202 pub fn create_metric_family(
204 name: &str,
205 help: &str,
206 field_type: Option<proto::MetricType>,
207 metric: Vec<proto::Metric>,
208 ) -> proto::MetricFamily {
209 let mut mf = proto::MetricFamily {
210 name: Some(name.into()),
211 help: Some(help.into()),
212 ..Default::default()
213 };
214 if let Some(ft) = field_type {
215 mf.set_field_type(ft);
216 }
217 mf.set_metric(metric);
218 mf
219 }
220 #[allow(dead_code)]
221 fn create_metric_gauge(labels: Vec<proto::LabelPair>, gauge: proto::Gauge) -> proto::Metric {
222 let mut m = proto::Metric::default();
223 m.set_label(labels);
224 m.set_gauge(gauge);
225 m.timestamp_ms = Some(12345);
226 m
227 }
228
229 pub fn create_metric_counter(
230 labels: Vec<proto::LabelPair>,
231 counter: proto::Counter,
232 ) -> proto::Metric {
233 let mut m = proto::Metric::default();
234 m.set_label(labels);
235 m.set_counter(counter);
236 m.timestamp_ms = Some(12345);
237 m
238 }
239
240 pub fn create_metric_histogram(
241 labels: Vec<proto::LabelPair>,
242 histogram: proto::Histogram,
243 ) -> proto::Metric {
244 let mut m = proto::Metric::default();
245 m.set_label(labels);
246 m.set_histogram(histogram);
247 m.timestamp_ms = Some(12345);
248 m
249 }
250
251 pub fn create_histogram() -> proto::Histogram {
252 let mut h = proto::Histogram::default();
253 h.set_sample_count(1);
254 h.set_sample_sum(1.0);
255 let mut b = proto::Bucket::default();
256 b.set_cumulative_count(1);
257 b.set_upper_bound(1.0);
258 h.bucket.push(b);
259 h
260 }
261
262 pub fn create_labels(labels: Vec<(&str, &str)>) -> Vec<proto::LabelPair> {
263 labels
264 .into_iter()
265 .map(|(key, value)| proto::LabelPair {
266 name: Some(key.into()),
267 value: Some(value.into()),
268 ..Default::default()
269 })
270 .collect()
271 }
272 #[allow(dead_code)]
273 fn create_gauge(value: f64) -> proto::Gauge {
274 proto::Gauge {
275 value: Some(value),
276 ..Default::default()
277 }
278 }
279
280 pub fn create_counter(value: f64) -> proto::Counter {
281 proto::Counter {
282 value: Some(value),
283 ..Default::default()
284 }
285 }
286
287 fn create_timeseries_with_samples(
291 labels: Vec<remote_write::Label>,
292 samples: Vec<remote_write::Sample>,
293 ) -> remote_write::TimeSeries {
294 remote_write::TimeSeries {
295 labels,
296 samples,
297 ..Default::default()
298 }
299 }
300 #[test]
303 fn metricfamily_to_timeseries() {
304 let tests: Vec<(proto::MetricFamily, Vec<remote_write::TimeSeries>)> = vec![
305 (
306 create_metric_family(
307 "test_gauge",
308 "i'm a help message",
309 Some(proto::MetricType::GAUGE),
310 vec![create_metric_gauge(
311 create_labels(vec![
312 ("host", "local-test-validator"),
313 ("network", "unittest-network"),
314 ]),
315 create_gauge(2046.0),
316 )],
317 ),
318 vec![create_timeseries_with_samples(
319 vec![
320 remote_write::Label {
321 name: "__name__".into(),
322 value: "test_gauge".into(),
323 },
324 remote_write::Label {
325 name: "host".into(),
326 value: "local-test-validator".into(),
327 },
328 remote_write::Label {
329 name: "network".into(),
330 value: "unittest-network".into(),
331 },
332 ],
333 vec![remote_write::Sample {
334 value: 2046.0,
335 timestamp: 12345,
336 }],
337 )],
338 ),
339 (
340 create_metric_family(
341 "test_counter",
342 "i'm a help message",
343 Some(proto::MetricType::GAUGE),
344 vec![create_metric_counter(
345 create_labels(vec![
346 ("host", "local-test-validator"),
347 ("network", "unittest-network"),
348 ]),
349 create_counter(2046.0),
350 )],
351 ),
352 vec![create_timeseries_with_samples(
353 vec![
354 remote_write::Label {
355 name: "__name__".into(),
356 value: "test_counter".into(),
357 },
358 remote_write::Label {
359 name: "host".into(),
360 value: "local-test-validator".into(),
361 },
362 remote_write::Label {
363 name: "network".into(),
364 value: "unittest-network".into(),
365 },
366 ],
367 vec![remote_write::Sample {
368 value: 2046.0,
369 timestamp: 12345,
370 }],
371 )],
372 ),
373 ];
374 for (mf, expected_ts) in tests {
375 for (actual, expected) in Mimir::from(mf).state.into_iter().zip(expected_ts) {
377 assert_eq!(actual.labels, expected.labels);
378 for (actual_sample, expected_sample) in
379 actual.samples.into_iter().zip(expected.samples)
380 {
381 assert_eq!(
382 actual_sample.value, expected_sample.value,
383 "sample values do not match"
384 );
385
386 assert_eq!(
389 actual_sample.timestamp, expected_sample.timestamp,
390 "timestamp should be non-zero"
391 );
392 }
393 }
394 }
395 }
396}