1use anyhow::{Result, bail};
4use axum::{Router, extract::Extension, http::StatusCode, routing::get};
5use once_cell::sync::Lazy;
6use prometheus::proto::{Metric, MetricFamily};
7use prometheus::{CounterVec, HistogramVec};
8use prometheus::{register_counter_vec, register_histogram_vec};
9use std::net::TcpListener;
10use std::time::{SystemTime, UNIX_EPOCH};
11use std::{
12 collections::VecDeque,
13 sync::{Arc, Mutex},
14};
15use tower::ServiceBuilder;
16use tower_http::LatencyUnit;
17use tower_http::trace::{DefaultOnResponse, TraceLayer};
18use tracing::{Level, info};
19
20use crate::var;
21
22const METRICS_ROUTE: &str = "/metrics";
23
24static RELAY_PRESSURE: Lazy<CounterVec> = Lazy::new(|| {
25 register_counter_vec!(
26 "relay_pressure",
27 "HistogramRelay's number of metric families submitted, exported, overflowed to/from the queue.",
28 &["histogram_relay"]
29 )
30 .unwrap()
31});
32static RELAY_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
33 register_histogram_vec!(
34 "relay_duration_seconds",
35 "HistogramRelay's submit/export fn latencies in seconds.",
36 &["histogram_relay"],
37 vec![
38 0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
39 1.0, 1.25, 1.5, 1.75, 2.0, 4.0, 8.0, 10.0, 12.5, 15.0
40 ],
41 )
42 .unwrap()
43});
44
45pub fn start_prometheus_server(listener: TcpListener) -> HistogramRelay {
49 let relay = HistogramRelay::new();
50 let app = Router::new()
51 .route(METRICS_ROUTE, get(metrics))
52 .layer(Extension(relay.clone()))
53 .layer(
54 ServiceBuilder::new().layer(
55 TraceLayer::new_for_http().on_response(
56 DefaultOnResponse::new()
57 .level(Level::INFO)
58 .latency_unit(LatencyUnit::Seconds),
59 ),
60 ),
61 );
62
63 tokio::spawn(async move {
64 listener.set_nonblocking(true).unwrap();
65 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
66 axum::serve(listener, app).await.unwrap();
67 });
68 relay
69}
70
71async fn metrics(Extension(relay): Extension<HistogramRelay>) -> (StatusCode, String) {
72 let Ok(expformat) = relay.export() else {
73 return (
74 StatusCode::INTERNAL_SERVER_ERROR,
75 "unable to pop metrics from HistogramRelay".into(),
76 );
77 };
78 (StatusCode::OK, expformat)
79}
80
81struct Wrapper(i64, Vec<MetricFamily>);
82
83#[derive(Clone)]
84pub struct HistogramRelay(Arc<Mutex<VecDeque<Wrapper>>>);
85
86impl Default for HistogramRelay {
87 fn default() -> Self {
88 HistogramRelay(Arc::new(Mutex::new(VecDeque::new())))
89 }
90}
91impl HistogramRelay {
92 pub fn new() -> Self {
93 Self::default()
94 }
95 pub fn submit(&self, data: Vec<MetricFamily>) {
99 RELAY_PRESSURE.with_label_values(&["submit"]).inc();
100 let timer = RELAY_DURATION.with_label_values(&["submit"]).start_timer();
101 let timestamp_secs = SystemTime::now()
103 .duration_since(UNIX_EPOCH)
104 .unwrap()
105 .as_secs() as i64;
106 let mut queue = self
107 .0
108 .lock()
109 .expect("couldn't get mut lock on HistogramRelay");
110 queue.retain(|v| {
111 if (timestamp_secs - v.0) < var!("MAX_QUEUE_TIME_SECS", 300) {
113 return true;
114 }
115 RELAY_PRESSURE.with_label_values(&["overflow"]).inc();
116 false
117 }); let data: Vec<MetricFamily> = extract_histograms(data).collect();
121 RELAY_PRESSURE
122 .with_label_values(&["submitted"])
123 .inc_by(data.len() as f64);
124 queue.push_back(Wrapper(timestamp_secs, data));
125 timer.observe_duration();
126 }
127 pub fn export(&self) -> Result<String> {
128 RELAY_PRESSURE.with_label_values(&["export"]).inc();
129 let timer = RELAY_DURATION.with_label_values(&["export"]).start_timer();
130 let mut queue = self
132 .0
133 .lock()
134 .expect("couldn't get mut lock on HistogramRelay");
135
136 let data: Vec<Wrapper> = queue.drain(..).collect();
137 let mut histograms = vec![];
138 for mf in data {
139 histograms.extend(mf.1);
140 }
141 info!(
142 "histogram queue drained {} items; remaining count {}",
143 histograms.len(),
144 queue.len()
145 );
146
147 let encoder = prometheus::TextEncoder::new();
148 let string = match encoder.encode_to_string(&histograms) {
149 Ok(s) => s,
150 Err(error) => bail!("{error}"),
151 };
152 RELAY_PRESSURE
153 .with_label_values(&["exported"])
154 .inc_by(histograms.len() as f64);
155 timer.observe_duration();
156 Ok(string)
157 }
158}
159
160fn extract_histograms(data: Vec<MetricFamily>) -> impl Iterator<Item = MetricFamily> {
161 data.into_iter().filter_map(|mf| {
162 let metrics = mf.get_metric().iter().filter_map(|m| {
163 if !m.has_histogram() {
164 return None;
165 }
166 let mut v = Metric::default();
167 v.set_label(protobuf::RepeatedField::from_slice(m.get_label()));
168 v.set_histogram(m.get_histogram().to_owned());
169 v.set_timestamp_ms(m.get_timestamp_ms());
170 Some(v)
171 });
172
173 let only_histograms = protobuf::RepeatedField::from_iter(metrics);
174 if only_histograms.is_empty() {
175 return None;
176 }
177
178 let mut v = MetricFamily::default();
179 v.set_name(mf.get_name().to_owned());
180 v.set_help(mf.get_help().to_owned());
181 v.set_field_type(mf.get_field_type());
182 v.set_metric(only_histograms);
183 Some(v)
184 })
185}
186
187#[cfg(test)]
188mod tests {
189 use prometheus::proto;
190 use protobuf;
191
192 use crate::{
193 histogram_relay::extract_histograms,
194 prom_to_mimir::tests::{
195 create_counter, create_histogram, create_labels, create_metric_counter,
196 create_metric_family, create_metric_histogram,
197 },
198 };
199
200 #[test]
201 fn filter_histograms() {
202 struct Test {
203 data: Vec<proto::MetricFamily>,
204 expected: Vec<proto::MetricFamily>,
205 }
206
207 let tests = vec![
208 Test {
209 data: vec![create_metric_family(
210 "test_counter",
211 "i'm a help message",
212 Some(proto::MetricType::GAUGE),
213 protobuf::RepeatedField::from(vec![create_metric_counter(
214 protobuf::RepeatedField::from_vec(create_labels(vec![
215 ("host", "local-test-validator"),
216 ("network", "unittest-network"),
217 ])),
218 create_counter(2046.0),
219 )]),
220 )],
221 expected: vec![],
222 },
223 Test {
224 data: vec![create_metric_family(
225 "test_histogram",
226 "i'm a help message",
227 Some(proto::MetricType::HISTOGRAM),
228 protobuf::RepeatedField::from(vec![create_metric_histogram(
229 protobuf::RepeatedField::from_vec(create_labels(vec![
230 ("host", "local-test-validator"),
231 ("network", "unittest-network"),
232 ])),
233 create_histogram(),
234 )]),
235 )],
236 expected: vec![create_metric_family(
237 "test_histogram",
238 "i'm a help message",
239 Some(proto::MetricType::HISTOGRAM),
240 protobuf::RepeatedField::from(vec![create_metric_histogram(
241 protobuf::RepeatedField::from_vec(create_labels(vec![
242 ("host", "local-test-validator"),
243 ("network", "unittest-network"),
244 ])),
245 create_histogram(),
246 )]),
247 )],
248 },
249 ];
250
251 for test in tests {
252 let extracted: Vec<proto::MetricFamily> = extract_histograms(test.data).collect();
253 assert_eq!(extracted, test.expected);
254 }
255 }
256}