1use anyhow::{Result, bail};
4use axum::{Router, extract::Extension, http::StatusCode, routing::get};
5use once_cell::sync::Lazy;
6use prometheus::proto::{Metric, MetricFamily};
7use prometheus::{CounterVec, HistogramVec};
8use prometheus::{register_counter_vec, register_histogram_vec};
9use std::net::TcpListener;
10use std::time::{SystemTime, UNIX_EPOCH};
11use std::{
12 collections::VecDeque,
13 sync::{Arc, Mutex},
14};
15use tower::ServiceBuilder;
16use tower_http::LatencyUnit;
17use tower_http::trace::{DefaultOnResponse, TraceLayer};
18use tracing::{Level, info};
19
20use crate::var;
21
22const METRICS_ROUTE: &str = "/metrics";
23
24static RELAY_PRESSURE: Lazy<CounterVec> = Lazy::new(|| {
25 register_counter_vec!(
26 "relay_pressure",
27 "HistogramRelay's number of metric families submitted, exported, overflowed to/from the queue.",
28 &["histogram_relay"]
29 )
30 .unwrap()
31});
32static RELAY_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
33 register_histogram_vec!(
34 "relay_duration_seconds",
35 "HistogramRelay's submit/export fn latencies in seconds.",
36 &["histogram_relay"],
37 vec![
38 0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
39 1.0, 1.25, 1.5, 1.75, 2.0, 4.0, 8.0, 10.0, 12.5, 15.0
40 ],
41 )
42 .unwrap()
43});
44
45pub fn start_prometheus_server(listener: TcpListener) -> HistogramRelay {
49 let relay = HistogramRelay::new();
50 let app = Router::new()
51 .route(METRICS_ROUTE, get(metrics))
52 .layer(Extension(relay.clone()))
53 .layer(
54 ServiceBuilder::new().layer(
55 TraceLayer::new_for_http().on_response(
56 DefaultOnResponse::new()
57 .level(Level::INFO)
58 .latency_unit(LatencyUnit::Seconds),
59 ),
60 ),
61 );
62
63 tokio::spawn(async move {
64 listener.set_nonblocking(true).unwrap();
65 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
66 axum::serve(listener, app).await.unwrap();
67 });
68 relay
69}
70
71async fn metrics(Extension(relay): Extension<HistogramRelay>) -> (StatusCode, String) {
72 let Ok(expformat) = relay.export() else {
73 return (
74 StatusCode::INTERNAL_SERVER_ERROR,
75 "unable to pop metrics from HistogramRelay".into(),
76 );
77 };
78 (StatusCode::OK, expformat)
79}
80
81struct Wrapper(i64, Vec<MetricFamily>);
82
83#[derive(Clone)]
84pub struct HistogramRelay(Arc<Mutex<VecDeque<Wrapper>>>);
85
86impl Default for HistogramRelay {
87 fn default() -> Self {
88 HistogramRelay(Arc::new(Mutex::new(VecDeque::new())))
89 }
90}
91impl HistogramRelay {
92 pub fn new() -> Self {
93 Self::default()
94 }
95 pub fn submit(&self, data: Vec<MetricFamily>) {
99 RELAY_PRESSURE.with_label_values(&["submit"]).inc();
100 let timer = RELAY_DURATION.with_label_values(&["submit"]).start_timer();
101 let timestamp_secs = SystemTime::now()
103 .duration_since(UNIX_EPOCH)
104 .unwrap()
105 .as_secs() as i64;
106 let mut queue = self
107 .0
108 .lock()
109 .expect("couldn't get mut lock on HistogramRelay");
110 queue.retain(|v| {
111 if (timestamp_secs - v.0) < var!("MAX_QUEUE_TIME_SECS", 300) {
113 return true;
114 }
115 RELAY_PRESSURE.with_label_values(&["overflow"]).inc();
116 false
117 }); let data: Vec<MetricFamily> = extract_histograms(data).collect();
121 RELAY_PRESSURE
122 .with_label_values(&["submitted"])
123 .inc_by(data.len() as f64);
124 queue.push_back(Wrapper(timestamp_secs, data));
125 timer.observe_duration();
126 }
127 pub fn export(&self) -> Result<String> {
128 RELAY_PRESSURE.with_label_values(&["export"]).inc();
129 let timer = RELAY_DURATION.with_label_values(&["export"]).start_timer();
130 let mut queue = self
132 .0
133 .lock()
134 .expect("couldn't get mut lock on HistogramRelay");
135
136 let data: Vec<Wrapper> = queue.drain(..).collect();
137 let mut histograms = vec![];
138 for mf in data {
139 histograms.extend(mf.1);
140 }
141 info!(
142 "histogram queue drained {} items; remaining count {}",
143 histograms.len(),
144 queue.len()
145 );
146
147 let encoder = prometheus::TextEncoder::new();
148 let string = match encoder.encode_to_string(&histograms) {
149 Ok(s) => s,
150 Err(error) => bail!("{error}"),
151 };
152 RELAY_PRESSURE
153 .with_label_values(&["exported"])
154 .inc_by(histograms.len() as f64);
155 timer.observe_duration();
156 Ok(string)
157 }
158}
159
160fn extract_histograms(data: Vec<MetricFamily>) -> impl Iterator<Item = MetricFamily> {
161 data.into_iter().filter_map(|mf| {
162 let metrics: Vec<Metric> = mf
163 .get_metric()
164 .iter()
165 .filter_map(|m| {
166 if m.histogram.is_none() {
167 return None;
168 }
169 let mut v = Metric::default();
170 v.set_label(m.get_label().to_vec());
171 v.histogram = m.histogram.clone();
172 v.timestamp_ms = m.timestamp_ms;
173 Some(v)
174 })
175 .collect();
176
177 if metrics.is_empty() {
178 return None;
179 }
180
181 let mut v = MetricFamily {
182 name: Some(mf.name().to_owned()),
183 help: Some(mf.help().to_owned()),
184 ..Default::default()
185 };
186 v.set_field_type(mf.get_field_type());
187 v.set_metric(metrics);
188 Some(v)
189 })
190}
191
192#[cfg(test)]
193mod tests {
194 use prometheus::proto;
195
196 use crate::{
197 histogram_relay::extract_histograms,
198 prom_to_mimir::tests::{
199 create_counter, create_histogram, create_labels, create_metric_counter,
200 create_metric_family, create_metric_histogram,
201 },
202 };
203
204 #[test]
205 fn filter_histograms() {
206 struct Test {
207 data: Vec<proto::MetricFamily>,
208 expected: Vec<proto::MetricFamily>,
209 }
210
211 let tests = vec![
212 Test {
213 data: vec![create_metric_family(
214 "test_counter",
215 "i'm a help message",
216 Some(proto::MetricType::GAUGE),
217 vec![create_metric_counter(
218 create_labels(vec![
219 ("host", "local-test-validator"),
220 ("network", "unittest-network"),
221 ]),
222 create_counter(2046.0),
223 )],
224 )],
225 expected: vec![],
226 },
227 Test {
228 data: vec![create_metric_family(
229 "test_histogram",
230 "i'm a help message",
231 Some(proto::MetricType::HISTOGRAM),
232 vec![create_metric_histogram(
233 create_labels(vec![
234 ("host", "local-test-validator"),
235 ("network", "unittest-network"),
236 ]),
237 create_histogram(),
238 )],
239 )],
240 expected: vec![create_metric_family(
241 "test_histogram",
242 "i'm a help message",
243 Some(proto::MetricType::HISTOGRAM),
244 vec![create_metric_histogram(
245 create_labels(vec![
246 ("host", "local-test-validator"),
247 ("network", "unittest-network"),
248 ]),
249 create_histogram(),
250 )],
251 )],
252 },
253 ];
254
255 for test in tests {
256 let extracted: Vec<proto::MetricFamily> = extract_histograms(test.data).collect();
257 assert_eq!(extracted, test.expected);
258 }
259 }
260}