1use crate::remote_write;
4use crate::var;
5use itertools::Itertools;
6use prometheus::proto::{Counter, Gauge, Histogram, Metric, MetricFamily, MetricType};
7use protobuf::RepeatedField;
8use tracing::{debug, error};
9
10#[derive(Debug)]
11pub struct Mimir<S> {
12 state: S,
13}
14
15impl From<&Metric> for Mimir<RepeatedField<remote_write::Label>> {
16 fn from(m: &Metric) -> Self {
17 let mut m = m.to_owned();
19 let mut sorted = m.take_label();
20 sorted.sort_by(|a, b| {
21 (a.get_name(), a.get_value())
22 .partial_cmp(&(b.get_name(), b.get_value()))
23 .unwrap()
24 });
25 let mut r = RepeatedField::<remote_write::Label>::default();
26 for label in sorted {
27 let lp = remote_write::Label {
28 name: label.get_name().into(),
29 value: label.get_value().into(),
30 };
31 r.push(lp);
32 }
33 Self { state: r }
34 }
35}
36
37impl IntoIterator for Mimir<RepeatedField<remote_write::Label>> {
38 type Item = remote_write::Label;
39 type IntoIter = std::vec::IntoIter<Self::Item>;
40
41 fn into_iter(self) -> Self::IntoIter {
42 self.state.into_iter()
43 }
44}
45
46impl From<&Counter> for Mimir<remote_write::Sample> {
47 fn from(c: &Counter) -> Self {
48 Self {
49 state: remote_write::Sample {
50 value: c.get_value(),
51 ..Default::default()
52 },
53 }
54 }
55}
56impl From<&Gauge> for Mimir<remote_write::Sample> {
57 fn from(c: &Gauge) -> Self {
58 Self {
59 state: remote_write::Sample {
60 value: c.get_value(),
61 ..Default::default()
62 },
63 }
64 }
65}
66impl Mimir<remote_write::Sample> {
67 fn sample(self) -> remote_write::Sample {
68 self.state
69 }
70}
71
72impl From<&Histogram> for Mimir<remote_write::Histogram> {
74 fn from(_h: &Histogram) -> Self {
75 Self {
76 state: remote_write::Histogram::default(),
77 }
78 }
79}
80impl Mimir<remote_write::Histogram> {
82 #[allow(dead_code)]
83 fn histogram(self) -> remote_write::Histogram {
84 self.state
85 }
86}
87impl From<Vec<MetricFamily>> for Mimir<Vec<remote_write::WriteRequest>> {
88 fn from(metric_families: Vec<MetricFamily>) -> Self {
89 let mut timeseries: Vec<remote_write::TimeSeries> =
91 Vec::with_capacity(metric_families.len());
92
93 for mf in metric_families {
94 let mt = match mf.get_field_type() {
96 MetricType::COUNTER => remote_write::metric_metadata::MetricType::Counter,
97 MetricType::GAUGE => remote_write::metric_metadata::MetricType::Gauge,
98 MetricType::HISTOGRAM => remote_write::metric_metadata::MetricType::Histogram,
99 MetricType::SUMMARY => remote_write::metric_metadata::MetricType::Summary,
100 MetricType::UNTYPED => remote_write::metric_metadata::MetricType::Unknown,
101 };
102
103 match mt {
105 remote_write::metric_metadata::MetricType::Counter
106 | remote_write::metric_metadata::MetricType::Gauge => (),
107 other => {
108 debug!("{:?} is not yet implemented, skipping metric", other);
109 continue;
110 }
111 }
112
113 timeseries.extend(Mimir::from(mf.clone()).state);
115 }
116
117 Self {
118 state: timeseries
119 .into_iter()
120 .chunks(var!("MIMIR_MAX_SAMPLE_SIZE", 500))
122 .into_iter()
123 .map(|ts| remote_write::WriteRequest {
124 timeseries: ts.collect(),
125 ..Default::default()
126 })
127 .collect_vec(),
128 }
129 }
130}
131
132impl IntoIterator for Mimir<Vec<remote_write::WriteRequest>> {
133 type Item = remote_write::WriteRequest;
134 type IntoIter = std::vec::IntoIter<Self::Item>;
135
136 fn into_iter(self) -> Self::IntoIter {
137 self.state.into_iter()
138 }
139}
140
141impl Mimir<RepeatedField<remote_write::TimeSeries>> {
142 pub fn repeated(self) -> RepeatedField<remote_write::TimeSeries> {
143 self.state
144 }
145}
146
147impl From<MetricFamily> for Mimir<Vec<remote_write::TimeSeries>> {
148 fn from(mf: MetricFamily) -> Self {
149 let mut timeseries = vec![];
150 for metric in mf.get_metric() {
151 let mut ts = remote_write::TimeSeries::default();
152 ts.labels.extend(vec![
153 remote_write::Label {
156 name: "__name__".into(),
157 value: mf.get_name().into(),
158 },
159 ]);
160 ts.labels
161 .extend(Mimir::<RepeatedField<remote_write::Label>>::from(metric));
162
163 if metric.has_counter() {
168 let mut s = Mimir::<remote_write::Sample>::from(metric.get_counter()).sample();
169 s.timestamp = metric.get_timestamp_ms();
170 ts.samples.push(s);
171 } else if metric.has_gauge() {
172 let mut s = Mimir::<remote_write::Sample>::from(metric.get_gauge()).sample();
173 s.timestamp = metric.get_timestamp_ms();
174 ts.samples.push(s);
175 } else if metric.has_histogram() {
176 } else if metric.has_summary() {
180 error!("summary is not implemented for a metric type");
182 }
183 timeseries.push(ts);
184 }
185 Self { state: timeseries }
186 }
187}
188
189impl Mimir<remote_write::TimeSeries> {
190 pub fn timeseries(self) -> remote_write::TimeSeries {
191 self.state
192 }
193}
194
195#[cfg(test)]
196pub mod tests {
197 use crate::prom_to_mimir::Mimir;
198 use crate::remote_write;
199 use prometheus::proto;
200 use protobuf::RepeatedField;
201
202 pub fn create_metric_family(
204 name: &str,
205 help: &str,
206 field_type: Option<proto::MetricType>,
207 metric: RepeatedField<proto::Metric>,
208 ) -> proto::MetricFamily {
209 let mut mf = proto::MetricFamily::default();
211 mf.set_name(name.into());
212 mf.set_help(help.into());
213 if let Some(ft) = field_type {
216 mf.set_field_type(ft);
217 }
218 mf.set_metric(metric);
219 mf
220 }
221 #[allow(dead_code)]
222 fn create_metric_gauge(
223 labels: RepeatedField<proto::LabelPair>,
224 gauge: proto::Gauge,
225 ) -> proto::Metric {
226 let mut m = proto::Metric::default();
227 m.set_label(labels);
228 m.set_gauge(gauge);
229 m.set_timestamp_ms(12345);
230 m
231 }
232
233 pub fn create_metric_counter(
234 labels: RepeatedField<proto::LabelPair>,
235 counter: proto::Counter,
236 ) -> proto::Metric {
237 let mut m = proto::Metric::default();
238 m.set_label(labels);
239 m.set_counter(counter);
240 m.set_timestamp_ms(12345);
241 m
242 }
243
244 pub fn create_metric_histogram(
245 labels: RepeatedField<proto::LabelPair>,
246 histogram: proto::Histogram,
247 ) -> proto::Metric {
248 let mut m = proto::Metric::default();
249 m.set_label(labels);
250 m.set_histogram(histogram);
251 m.set_timestamp_ms(12345);
252 m
253 }
254
255 pub fn create_histogram() -> proto::Histogram {
256 let mut h = proto::Histogram::default();
257 h.set_sample_count(1);
258 h.set_sample_sum(1.0);
259 let mut b = proto::Bucket::default();
260 b.set_cumulative_count(1);
261 b.set_upper_bound(1.0);
262 h.mut_bucket().push(b);
263 h
264 }
265
266 pub fn create_labels(labels: Vec<(&str, &str)>) -> Vec<proto::LabelPair> {
267 labels
268 .into_iter()
269 .map(|(key, value)| {
270 let mut lp = proto::LabelPair::default();
271 lp.set_name(key.into());
272 lp.set_value(value.into());
273 lp
274 })
275 .collect()
276 }
277 #[allow(dead_code)]
278 fn create_gauge(value: f64) -> proto::Gauge {
279 let mut g = proto::Gauge::default();
280 g.set_value(value);
281 g
282 }
283
284 pub fn create_counter(value: f64) -> proto::Counter {
285 let mut c = proto::Counter::default();
286 c.set_value(value);
287 c
288 }
289
290 fn create_timeseries_with_samples(
294 labels: Vec<remote_write::Label>,
295 samples: Vec<remote_write::Sample>,
296 ) -> remote_write::TimeSeries {
297 remote_write::TimeSeries {
298 labels,
299 samples,
300 ..Default::default()
301 }
302 }
303 #[test]
306 fn metricfamily_to_timeseries() {
307 let tests: Vec<(proto::MetricFamily, Vec<remote_write::TimeSeries>)> = vec![
308 (
309 create_metric_family(
310 "test_gauge",
311 "i'm a help message",
312 Some(proto::MetricType::GAUGE),
313 RepeatedField::from(vec![create_metric_gauge(
314 RepeatedField::from_vec(create_labels(vec![
315 ("host", "local-test-validator"),
316 ("network", "unittest-network"),
317 ])),
318 create_gauge(2046.0),
319 )]),
320 ),
321 vec![create_timeseries_with_samples(
322 vec![
323 remote_write::Label {
324 name: "__name__".into(),
325 value: "test_gauge".into(),
326 },
327 remote_write::Label {
328 name: "host".into(),
329 value: "local-test-validator".into(),
330 },
331 remote_write::Label {
332 name: "network".into(),
333 value: "unittest-network".into(),
334 },
335 ],
336 vec![remote_write::Sample {
337 value: 2046.0,
338 timestamp: 12345,
339 }],
340 )],
341 ),
342 (
343 create_metric_family(
344 "test_counter",
345 "i'm a help message",
346 Some(proto::MetricType::GAUGE),
347 RepeatedField::from(vec![create_metric_counter(
348 RepeatedField::from_vec(create_labels(vec![
349 ("host", "local-test-validator"),
350 ("network", "unittest-network"),
351 ])),
352 create_counter(2046.0),
353 )]),
354 ),
355 vec![create_timeseries_with_samples(
356 vec![
357 remote_write::Label {
358 name: "__name__".into(),
359 value: "test_counter".into(),
360 },
361 remote_write::Label {
362 name: "host".into(),
363 value: "local-test-validator".into(),
364 },
365 remote_write::Label {
366 name: "network".into(),
367 value: "unittest-network".into(),
368 },
369 ],
370 vec![remote_write::Sample {
371 value: 2046.0,
372 timestamp: 12345,
373 }],
374 )],
375 ),
376 ];
377 for (mf, expected_ts) in tests {
378 for (actual, expected) in Mimir::from(mf).state.into_iter().zip(expected_ts) {
380 assert_eq!(actual.labels, expected.labels);
381 for (actual_sample, expected_sample) in
382 actual.samples.into_iter().zip(expected.samples)
383 {
384 assert_eq!(
385 actual_sample.value, expected_sample.value,
386 "sample values do not match"
387 );
388
389 assert_eq!(
392 actual_sample.timestamp, expected_sample.timestamp,
393 "timestamp should be non-zero"
394 );
395 }
396 }
397 }
398 }
399}