1use crate::admin::ReqwestClient;
5use crate::prom_to_mimir::Mimir;
6use crate::remote_write::WriteRequest;
7use anyhow::Result;
8use axum::body::Bytes;
9use axum::http::StatusCode;
10use bytes::buf::Reader;
11use fastcrypto::ed25519::Ed25519PublicKey;
12use multiaddr::Multiaddr;
13use once_cell::sync::Lazy;
14use prometheus::proto::{self, MetricFamily};
15use prometheus::{Counter, CounterVec, HistogramVec};
16use prometheus::{register_counter, register_counter_vec, register_histogram_vec};
17use prost::Message;
18use protobuf::CodedInputStream;
19use std::io::Read;
20use tracing::{debug, error};
21
22static CONSUMER_OPS_SUBMITTED: Lazy<Counter> = Lazy::new(|| {
23 register_counter!(
24 "consumer_operations_submitted",
25 "Operations counter for the number of metric family types we submit, excluding histograms, and not the discrete timeseries counts.",
26 )
27 .unwrap()
28});
29static CONSUMER_OPS: Lazy<CounterVec> = Lazy::new(|| {
30 register_counter_vec!(
31 "consumer_operations",
32 "Operations counters and status from operations performed in the consumer.",
33 &["operation", "status"]
34 )
35 .unwrap()
36});
37static CONSUMER_ENCODE_COMPRESS_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
38 register_histogram_vec!(
39 "protobuf_compression_seconds",
40 "The time it takes to compress a remote_write payload in seconds.",
41 &["operation"],
42 vec![
43 1e-08, 2e-08, 4e-08, 8e-08, 1.6e-07, 3.2e-07, 6.4e-07, 1.28e-06, 2.56e-06, 5.12e-06,
44 1.024e-05, 2.048e-05, 4.096e-05, 8.192e-05
45 ],
46 )
47 .unwrap()
48});
49static CONSUMER_OPERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
50 register_histogram_vec!(
51 "consumer_operations_duration_seconds",
52 "The time it takes to perform various consumer operations in seconds.",
53 &["operation"],
54 vec![
55 0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
56 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 3.25, 3.5, 3.75, 4.0, 4.25, 4.5, 4.75,
57 5.0, 5.25, 5.5, 5.75, 6.0, 6.25, 6.5, 6.75, 7.0, 7.25, 7.5, 7.75, 8.0, 8.25, 8.5, 8.75,
58 9.0, 9.25, 9.5, 9.75, 10.0, 10.25, 10.5, 10.75, 11.0, 11.25, 11.5, 11.75, 12.0, 12.25,
59 12.5, 12.75, 13.0, 13.25, 13.5, 13.75, 14.0, 14.25, 14.5, 14.75, 15.0, 15.25, 15.5,
60 15.75, 16.0, 16.25, 16.5, 16.75, 17.0, 17.25, 17.5, 17.75, 18.0, 18.25, 18.5, 18.75,
61 19.0, 19.25, 19.5, 19.75, 20.0, 20.25, 20.5, 20.75, 21.0, 21.25, 21.5, 21.75, 22.0,
62 22.25, 22.5, 22.75, 23.0, 23.25, 23.5, 23.75, 24.0, 24.25, 24.5, 24.75, 25.0, 26.0,
63 27.0, 28.0, 29.0, 30.0
64 ],
65 )
66 .unwrap()
67});
68
69#[derive(Debug)]
71pub struct NodeMetric {
72 pub peer_addr: Multiaddr, pub public_key: Ed25519PublicKey, pub data: Vec<proto::MetricFamily>, }
76
77pub struct ProtobufDecoder {
81 buf: Reader<Bytes>,
82}
83
84impl ProtobufDecoder {
85 pub fn new(buf: Reader<Bytes>) -> Self {
86 Self { buf }
87 }
88 pub fn parse<T: protobuf::Message>(&mut self) -> Result<Vec<T>> {
90 let timer = CONSUMER_OPERATION_DURATION
91 .with_label_values(&["decode_len_delim_protobuf"])
92 .start_timer();
93 let mut result: Vec<T> = vec![];
94 while !self.buf.get_ref().is_empty() {
95 let len = {
96 let mut is = CodedInputStream::from_buf_read(&mut self.buf);
97 is.read_raw_varint32()
98 }?;
99 let mut buf = vec![0; len as usize];
100 self.buf.read_exact(&mut buf)?;
101 result.push(T::parse_from_bytes(&buf)?);
102 }
103 timer.observe_duration();
104 Ok(result)
105 }
106}
107
108pub fn populate_labels(
110 name: String, network: String, inventory_hostname: String, data: Vec<proto::MetricFamily>,
114) -> Vec<proto::MetricFamily> {
115 let timer = CONSUMER_OPERATION_DURATION
116 .with_label_values(&["populate_labels"])
117 .start_timer();
118 debug!("received metrics from {name} on {inventory_hostname}");
119 let network_label = proto::LabelPair {
122 name: Some("network".into()),
123 value: Some(network),
124 ..Default::default()
125 };
126
127 let host_label = proto::LabelPair {
128 name: Some("host".into()),
129 value: Some(name),
130 ..Default::default()
131 };
132
133 let labels = vec![network_label, host_label];
134
135 let mut data = data;
136 for mf in data.iter_mut() {
138 for m in mf.mut_metric() {
139 m.label.extend(labels.clone());
140 }
141 }
142 timer.observe_duration();
143 data
144}
145
146fn encode_compress(request: &WriteRequest) -> Result<Vec<u8>, (StatusCode, &'static str)> {
147 let observe = || {
148 let timer = CONSUMER_ENCODE_COMPRESS_DURATION
149 .with_label_values(&["encode_compress"])
150 .start_timer();
151 || {
152 timer.observe_duration();
153 }
154 }();
155 let mut buf = Vec::with_capacity(request.encoded_len());
156 if request.encode(&mut buf).is_err() {
157 observe();
158 CONSUMER_OPS
159 .with_label_values(&["encode_compress", "failed"])
160 .inc();
161 error!("unable to encode prompb to mimirpb");
162 return Err((
163 StatusCode::INTERNAL_SERVER_ERROR,
164 "unable to encode prompb to remote_write pb",
165 ));
166 };
167
168 let mut s = snap::raw::Encoder::new();
169 let compressed = match s.compress_vec(&buf) {
170 Ok(compressed) => compressed,
171 Err(error) => {
172 observe();
173 CONSUMER_OPS
174 .with_label_values(&["encode_compress", "failed"])
175 .inc();
176 error!("unable to compress to snappy block format; {error}");
177 return Err((
178 StatusCode::INTERNAL_SERVER_ERROR,
179 "unable to compress to snappy block format",
180 ));
181 }
182 };
183 observe();
184 CONSUMER_OPS
185 .with_label_values(&["encode_compress", "success"])
186 .inc();
187 Ok(compressed)
188}
189
190async fn check_response(
191 request: WriteRequest,
192 response: reqwest::Response,
193) -> Result<(), (StatusCode, &'static str)> {
194 match response.status() {
195 reqwest::StatusCode::OK => {
196 CONSUMER_OPS
197 .with_label_values(&["check_response", "OK"])
198 .inc();
199 debug!("({}) SUCCESS: {:?}", reqwest::StatusCode::OK, request);
200 Ok(())
201 }
202 reqwest::StatusCode::BAD_REQUEST => {
203 let body = response
204 .text()
205 .await
206 .unwrap_or_else(|_| "response body cannot be decoded".into());
207
208 if body.contains("err-mimir-sample-out-of-order") {
211 CONSUMER_OPS
212 .with_label_values(&["check_response", "BAD_REQUEST"])
213 .inc();
214 error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
215 return Err((
216 StatusCode::INTERNAL_SERVER_ERROR,
217 "IGNORING METRICS due to err-mimir-sample-out-of-order",
218 ));
219 }
220 CONSUMER_OPS
221 .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
222 .inc();
223 error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
224 Err((
225 StatusCode::INTERNAL_SERVER_ERROR,
226 "unknown bad request error encountered in remote_push",
227 ))
228 }
229 code => {
230 let body = response
231 .text()
232 .await
233 .unwrap_or_else(|_| "response body cannot be decoded".into());
234 CONSUMER_OPS
235 .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
236 .inc();
237 error!("({}) ERROR: {:?}", code, body);
238 Err((
239 StatusCode::INTERNAL_SERVER_ERROR,
240 "unknown error encountered in remote_push",
241 ))
242 }
243 }
244}
245
246async fn convert(
247 mfs: Vec<MetricFamily>,
248) -> Result<impl Iterator<Item = WriteRequest>, (StatusCode, &'static str)> {
249 let result = tokio::task::spawn_blocking(|| {
250 let timer = CONSUMER_OPERATION_DURATION
251 .with_label_values(&["convert_to_remote_write_task"])
252 .start_timer();
253 let result = Mimir::from(mfs);
254 timer.observe_duration();
255 result.into_iter()
256 })
257 .await;
258
259 let result = match result {
260 Ok(v) => v,
261 Err(err) => {
262 error!("unable to convert to remote_write; {err}");
263 return Err((
264 StatusCode::INTERNAL_SERVER_ERROR,
265 "DROPPING METRICS; unable to convert to remote_write",
266 ));
267 }
268 };
269 Ok(result)
270}
271
272pub async fn convert_to_remote_write(
279 rc: ReqwestClient,
280 node_metric: NodeMetric,
281) -> (StatusCode, &'static str) {
282 let timer = CONSUMER_OPERATION_DURATION
283 .with_label_values(&["convert_to_remote_write"])
284 .start_timer();
285
286 let remote_write_protos = match convert(node_metric.data).await {
287 Ok(v) => v,
288 Err(err) => {
289 timer.stop_and_discard();
290 return err;
291 }
292 };
293
294 let mut mf_cnt = 0;
296 for request in remote_write_protos {
297 mf_cnt += 1;
298 let compressed = match encode_compress(&request) {
299 Ok(compressed) => compressed,
300 Err(error) => return error,
301 };
302
303 let response = match rc
304 .client
305 .post(rc.settings.url.to_owned())
306 .header(reqwest::header::CONTENT_ENCODING, "snappy")
307 .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf")
308 .header("X-Prometheus-Remote-Write-Version", "0.1.0")
309 .basic_auth(
310 rc.settings.username.to_owned(),
311 Some(rc.settings.password.to_owned()),
312 )
313 .body(compressed)
314 .send()
315 .await
316 {
317 Ok(response) => response,
318 Err(error) => {
319 CONSUMER_OPS
320 .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
321 .inc();
322 error!("DROPPING METRICS due to post error: {error}");
323 timer.stop_and_discard();
324 return (
325 StatusCode::INTERNAL_SERVER_ERROR,
326 "DROPPING METRICS due to post error",
327 );
328 }
329 };
330
331 match check_response(request, response).await {
332 Ok(_) => (),
333 Err(err) => {
334 timer.stop_and_discard();
335 return err;
336 }
337 }
338 }
339 CONSUMER_OPS_SUBMITTED.inc_by(mf_cnt as f64);
340 timer.observe_duration();
341 (StatusCode::CREATED, "created")
342}
343
344#[cfg(test)]
345mod tests {
346 use prometheus::proto;
347
348 use crate::{
349 consumer::populate_labels,
350 prom_to_mimir::tests::{
351 create_histogram, create_labels, create_metric_family, create_metric_histogram,
352 },
353 };
354
355 #[test]
356 fn test_populate_labels() {
357 let mf = create_metric_family(
358 "test_histogram",
359 "i'm a help message",
360 Some(proto::MetricType::HISTOGRAM),
361 vec![create_metric_histogram(
362 create_labels(vec![]),
363 create_histogram(),
364 )],
365 );
366
367 let labeled_mf = populate_labels(
368 "validator-0".into(),
369 "unittest-network".into(),
370 "inventory-hostname".into(),
371 vec![mf],
372 );
373 let metric = &labeled_mf[0].get_metric()[0];
374 assert_eq!(
375 metric.get_label(),
376 &create_labels(vec![
377 ("network", "unittest-network"),
378 ("host", "validator-0"),
379 ])
380 );
381 }
382}