sui_proxy/
histogram_relay.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use anyhow::{Result, bail};
4use axum::{Router, extract::Extension, http::StatusCode, routing::get};
5use once_cell::sync::Lazy;
6use prometheus::proto::{Metric, MetricFamily};
7use prometheus::{CounterVec, HistogramVec};
8use prometheus::{register_counter_vec, register_histogram_vec};
9use std::net::TcpListener;
10use std::time::{SystemTime, UNIX_EPOCH};
11use std::{
12    collections::VecDeque,
13    sync::{Arc, Mutex},
14};
15use tower::ServiceBuilder;
16use tower_http::LatencyUnit;
17use tower_http::trace::{DefaultOnResponse, TraceLayer};
18use tracing::{Level, info};
19
20use crate::var;
21
22const METRICS_ROUTE: &str = "/metrics";
23
24static RELAY_PRESSURE: Lazy<CounterVec> = Lazy::new(|| {
25    register_counter_vec!(
26        "relay_pressure",
27        "HistogramRelay's number of metric families submitted, exported, overflowed to/from the queue.",
28        &["histogram_relay"]
29    )
30    .unwrap()
31});
32static RELAY_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
33    register_histogram_vec!(
34        "relay_duration_seconds",
35        "HistogramRelay's submit/export fn latencies in seconds.",
36        &["histogram_relay"],
37        vec![
38            0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
39            1.0, 1.25, 1.5, 1.75, 2.0, 4.0, 8.0, 10.0, 12.5, 15.0
40        ],
41    )
42    .unwrap()
43});
44
45// Creates a new http server that has as a sole purpose to expose
46// and endpoint that prometheus agent can use to poll for the metrics.
47// A RegistryService is returned that can be used to get access in prometheus Registries.
48pub fn start_prometheus_server(listener: TcpListener) -> HistogramRelay {
49    let relay = HistogramRelay::new();
50    let app = Router::new()
51        .route(METRICS_ROUTE, get(metrics))
52        .layer(Extension(relay.clone()))
53        .layer(
54            ServiceBuilder::new().layer(
55                TraceLayer::new_for_http().on_response(
56                    DefaultOnResponse::new()
57                        .level(Level::INFO)
58                        .latency_unit(LatencyUnit::Seconds),
59                ),
60            ),
61        );
62
63    tokio::spawn(async move {
64        listener.set_nonblocking(true).unwrap();
65        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
66        axum::serve(listener, app).await.unwrap();
67    });
68    relay
69}
70
71async fn metrics(Extension(relay): Extension<HistogramRelay>) -> (StatusCode, String) {
72    let Ok(expformat) = relay.export() else {
73        return (
74            StatusCode::INTERNAL_SERVER_ERROR,
75            "unable to pop metrics from HistogramRelay".into(),
76        );
77    };
78    (StatusCode::OK, expformat)
79}
80
81struct Wrapper(i64, Vec<MetricFamily>);
82
83#[derive(Clone)]
84pub struct HistogramRelay(Arc<Mutex<VecDeque<Wrapper>>>);
85
86impl Default for HistogramRelay {
87    fn default() -> Self {
88        HistogramRelay(Arc::new(Mutex::new(VecDeque::new())))
89    }
90}
91impl HistogramRelay {
92    pub fn new() -> Self {
93        Self::default()
94    }
95    /// submit will take metric family submissions and store them for scraping
96    /// in doing so, it will also wrap each entry in a timestamp which will be use
97    /// for pruning old entires on each submission call. this may not be ideal long term.
98    pub fn submit(&self, data: Vec<MetricFamily>) {
99        RELAY_PRESSURE.with_label_values(&["submit"]).inc();
100        let timer = RELAY_DURATION.with_label_values(&["submit"]).start_timer();
101        //  represents a collection timestamp
102        let timestamp_secs = SystemTime::now()
103            .duration_since(UNIX_EPOCH)
104            .unwrap()
105            .as_secs() as i64;
106        let mut queue = self
107            .0
108            .lock()
109            .expect("couldn't get mut lock on HistogramRelay");
110        queue.retain(|v| {
111            // 5 mins is the max time in the queue allowed
112            if (timestamp_secs - v.0) < var!("MAX_QUEUE_TIME_SECS", 300) {
113                return true;
114            }
115            RELAY_PRESSURE.with_label_values(&["overflow"]).inc();
116            false
117        }); // drain anything 5 mins or older
118
119        // filter out our histograms from normal metrics
120        let data: Vec<MetricFamily> = extract_histograms(data).collect();
121        RELAY_PRESSURE
122            .with_label_values(&["submitted"])
123            .inc_by(data.len() as f64);
124        queue.push_back(Wrapper(timestamp_secs, data));
125        timer.observe_duration();
126    }
127    pub fn export(&self) -> Result<String> {
128        RELAY_PRESSURE.with_label_values(&["export"]).inc();
129        let timer = RELAY_DURATION.with_label_values(&["export"]).start_timer();
130        // totally drain all metrics whenever we get a scrape request from the metrics handler
131        let mut queue = self
132            .0
133            .lock()
134            .expect("couldn't get mut lock on HistogramRelay");
135
136        let data: Vec<Wrapper> = queue.drain(..).collect();
137        let mut histograms = vec![];
138        for mf in data {
139            histograms.extend(mf.1);
140        }
141        info!(
142            "histogram queue drained {} items; remaining count {}",
143            histograms.len(),
144            queue.len()
145        );
146
147        let encoder = prometheus::TextEncoder::new();
148        let string = match encoder.encode_to_string(&histograms) {
149            Ok(s) => s,
150            Err(error) => bail!("{error}"),
151        };
152        RELAY_PRESSURE
153            .with_label_values(&["exported"])
154            .inc_by(histograms.len() as f64);
155        timer.observe_duration();
156        Ok(string)
157    }
158}
159
160fn extract_histograms(data: Vec<MetricFamily>) -> impl Iterator<Item = MetricFamily> {
161    data.into_iter().filter_map(|mf| {
162        let metrics = mf.get_metric().iter().filter_map(|m| {
163            if !m.has_histogram() {
164                return None;
165            }
166            let mut v = Metric::default();
167            v.set_label(protobuf::RepeatedField::from_slice(m.get_label()));
168            v.set_histogram(m.get_histogram().to_owned());
169            v.set_timestamp_ms(m.get_timestamp_ms());
170            Some(v)
171        });
172
173        let only_histograms = protobuf::RepeatedField::from_iter(metrics);
174        if only_histograms.is_empty() {
175            return None;
176        }
177
178        let mut v = MetricFamily::default();
179        v.set_name(mf.get_name().to_owned());
180        v.set_help(mf.get_help().to_owned());
181        v.set_field_type(mf.get_field_type());
182        v.set_metric(only_histograms);
183        Some(v)
184    })
185}
186
187#[cfg(test)]
188mod tests {
189    use prometheus::proto;
190    use protobuf;
191
192    use crate::{
193        histogram_relay::extract_histograms,
194        prom_to_mimir::tests::{
195            create_counter, create_histogram, create_labels, create_metric_counter,
196            create_metric_family, create_metric_histogram,
197        },
198    };
199
200    #[test]
201    fn filter_histograms() {
202        struct Test {
203            data: Vec<proto::MetricFamily>,
204            expected: Vec<proto::MetricFamily>,
205        }
206
207        let tests = vec![
208            Test {
209                data: vec![create_metric_family(
210                    "test_counter",
211                    "i'm a help message",
212                    Some(proto::MetricType::GAUGE),
213                    protobuf::RepeatedField::from(vec![create_metric_counter(
214                        protobuf::RepeatedField::from_vec(create_labels(vec![
215                            ("host", "local-test-validator"),
216                            ("network", "unittest-network"),
217                        ])),
218                        create_counter(2046.0),
219                    )]),
220                )],
221                expected: vec![],
222            },
223            Test {
224                data: vec![create_metric_family(
225                    "test_histogram",
226                    "i'm a help message",
227                    Some(proto::MetricType::HISTOGRAM),
228                    protobuf::RepeatedField::from(vec![create_metric_histogram(
229                        protobuf::RepeatedField::from_vec(create_labels(vec![
230                            ("host", "local-test-validator"),
231                            ("network", "unittest-network"),
232                        ])),
233                        create_histogram(),
234                    )]),
235                )],
236                expected: vec![create_metric_family(
237                    "test_histogram",
238                    "i'm a help message",
239                    Some(proto::MetricType::HISTOGRAM),
240                    protobuf::RepeatedField::from(vec![create_metric_histogram(
241                        protobuf::RepeatedField::from_vec(create_labels(vec![
242                            ("host", "local-test-validator"),
243                            ("network", "unittest-network"),
244                        ])),
245                        create_histogram(),
246                    )]),
247                )],
248            },
249        ];
250
251        for test in tests {
252            let extracted: Vec<proto::MetricFamily> = extract_histograms(test.data).collect();
253            assert_eq!(extracted, test.expected);
254        }
255    }
256}