sui_proxy/
consumer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::admin::ReqwestClient;
5use crate::prom_to_mimir::Mimir;
6use crate::remote_write::WriteRequest;
7use anyhow::Result;
8use axum::body::Bytes;
9use axum::http::StatusCode;
10use bytes::buf::Reader;
11use fastcrypto::ed25519::Ed25519PublicKey;
12use multiaddr::Multiaddr;
13use once_cell::sync::Lazy;
14use prometheus::proto::{self, MetricFamily};
15use prometheus::{Counter, CounterVec, HistogramVec};
16use prometheus::{register_counter, register_counter_vec, register_histogram_vec};
17use prost::Message;
18use protobuf::CodedInputStream;
19use std::io::Read;
20use tracing::{debug, error};
21
22static CONSUMER_OPS_SUBMITTED: Lazy<Counter> = Lazy::new(|| {
23    register_counter!(
24        "consumer_operations_submitted",
25        "Operations counter for the number of metric family types we submit, excluding histograms, and not the discrete timeseries counts.",
26    )
27    .unwrap()
28});
29static CONSUMER_OPS: Lazy<CounterVec> = Lazy::new(|| {
30    register_counter_vec!(
31        "consumer_operations",
32        "Operations counters and status from operations performed in the consumer.",
33        &["operation", "status"]
34    )
35    .unwrap()
36});
37static CONSUMER_ENCODE_COMPRESS_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
38    register_histogram_vec!(
39        "protobuf_compression_seconds",
40        "The time it takes to compress a remote_write payload in seconds.",
41        &["operation"],
42        vec![
43            1e-08, 2e-08, 4e-08, 8e-08, 1.6e-07, 3.2e-07, 6.4e-07, 1.28e-06, 2.56e-06, 5.12e-06,
44            1.024e-05, 2.048e-05, 4.096e-05, 8.192e-05
45        ],
46    )
47    .unwrap()
48});
49static CONSUMER_OPERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
50    register_histogram_vec!(
51        "consumer_operations_duration_seconds",
52        "The time it takes to perform various consumer operations in seconds.",
53        &["operation"],
54        vec![
55            0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
56            1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 3.25, 3.5, 3.75, 4.0, 4.25, 4.5, 4.75,
57            5.0, 5.25, 5.5, 5.75, 6.0, 6.25, 6.5, 6.75, 7.0, 7.25, 7.5, 7.75, 8.0, 8.25, 8.5, 8.75,
58            9.0, 9.25, 9.5, 9.75, 10.0, 10.25, 10.5, 10.75, 11.0, 11.25, 11.5, 11.75, 12.0, 12.25,
59            12.5, 12.75, 13.0, 13.25, 13.5, 13.75, 14.0, 14.25, 14.5, 14.75, 15.0, 15.25, 15.5,
60            15.75, 16.0, 16.25, 16.5, 16.75, 17.0, 17.25, 17.5, 17.75, 18.0, 18.25, 18.5, 18.75,
61            19.0, 19.25, 19.5, 19.75, 20.0, 20.25, 20.5, 20.75, 21.0, 21.25, 21.5, 21.75, 22.0,
62            22.25, 22.5, 22.75, 23.0, 23.25, 23.5, 23.75, 24.0, 24.25, 24.5, 24.75, 25.0, 26.0,
63            27.0, 28.0, 29.0, 30.0
64        ],
65    )
66    .unwrap()
67});
68
69/// NodeMetric holds metadata and a metric payload from the calling node
70#[derive(Debug)]
71pub struct NodeMetric {
72    pub peer_addr: Multiaddr, // the sockaddr source address from the incoming request
73    pub public_key: Ed25519PublicKey, // the public key from the sui blockchain
74    pub data: Vec<proto::MetricFamily>, // decoded protobuf of prometheus data
75}
76
77/// The ProtobufDecoder will decode message delimited protobuf messages from prom_model.proto types
78/// They are delimited by size, eg a format is such:
79/// []byte{size, data, size, data, size, data}, etc
80pub struct ProtobufDecoder {
81    buf: Reader<Bytes>,
82}
83
84impl ProtobufDecoder {
85    pub fn new(buf: Reader<Bytes>) -> Self {
86        Self { buf }
87    }
88    /// parse a delimited buffer of protobufs. this is used to consume data sent from a sui-node
89    pub fn parse<T: protobuf::Message>(&mut self) -> Result<Vec<T>> {
90        let timer = CONSUMER_OPERATION_DURATION
91            .with_label_values(&["decode_len_delim_protobuf"])
92            .start_timer();
93        let mut result: Vec<T> = vec![];
94        while !self.buf.get_ref().is_empty() {
95            let len = {
96                let mut is = CodedInputStream::from_buf_read(&mut self.buf);
97                is.read_raw_varint32()
98            }?;
99            let mut buf = vec![0; len as usize];
100            self.buf.read_exact(&mut buf)?;
101            result.push(T::parse_from_bytes(&buf)?);
102        }
103        timer.observe_duration();
104        Ok(result)
105    }
106}
107
108// populate labels in place for our given metric family data
109pub fn populate_labels(
110    name: String,               // host field for grafana agent (from chain data)
111    network: String,            // network name from ansible (via config)
112    inventory_hostname: String, // inventory_name from ansible (via config)
113    data: Vec<proto::MetricFamily>,
114) -> Vec<proto::MetricFamily> {
115    let timer = CONSUMER_OPERATION_DURATION
116        .with_label_values(&["populate_labels"])
117        .start_timer();
118    debug!("received metrics from {name} on {inventory_hostname}");
119    // proto::LabelPair doesn't have pub fields so we can't use
120    // struct literals to construct
121    let network_label = proto::LabelPair {
122        name: Some("network".into()),
123        value: Some(network),
124        ..Default::default()
125    };
126
127    let host_label = proto::LabelPair {
128        name: Some("host".into()),
129        value: Some(name),
130        ..Default::default()
131    };
132
133    let labels = vec![network_label, host_label];
134
135    let mut data = data;
136    // add our extra labels to our incoming metric data
137    for mf in data.iter_mut() {
138        for m in mf.mut_metric() {
139            m.label.extend(labels.clone());
140        }
141    }
142    timer.observe_duration();
143    data
144}
145
146fn encode_compress(request: &WriteRequest) -> Result<Vec<u8>, (StatusCode, &'static str)> {
147    let observe = || {
148        let timer = CONSUMER_ENCODE_COMPRESS_DURATION
149            .with_label_values(&["encode_compress"])
150            .start_timer();
151        || {
152            timer.observe_duration();
153        }
154    }();
155    let mut buf = Vec::with_capacity(request.encoded_len());
156    if request.encode(&mut buf).is_err() {
157        observe();
158        CONSUMER_OPS
159            .with_label_values(&["encode_compress", "failed"])
160            .inc();
161        error!("unable to encode prompb to mimirpb");
162        return Err((
163            StatusCode::INTERNAL_SERVER_ERROR,
164            "unable to encode prompb to remote_write pb",
165        ));
166    };
167
168    let mut s = snap::raw::Encoder::new();
169    let compressed = match s.compress_vec(&buf) {
170        Ok(compressed) => compressed,
171        Err(error) => {
172            observe();
173            CONSUMER_OPS
174                .with_label_values(&["encode_compress", "failed"])
175                .inc();
176            error!("unable to compress to snappy block format; {error}");
177            return Err((
178                StatusCode::INTERNAL_SERVER_ERROR,
179                "unable to compress to snappy block format",
180            ));
181        }
182    };
183    observe();
184    CONSUMER_OPS
185        .with_label_values(&["encode_compress", "success"])
186        .inc();
187    Ok(compressed)
188}
189
190async fn check_response(
191    request: WriteRequest,
192    response: reqwest::Response,
193) -> Result<(), (StatusCode, &'static str)> {
194    match response.status() {
195        reqwest::StatusCode::OK => {
196            CONSUMER_OPS
197                .with_label_values(&["check_response", "OK"])
198                .inc();
199            debug!("({}) SUCCESS: {:?}", reqwest::StatusCode::OK, request);
200            Ok(())
201        }
202        reqwest::StatusCode::BAD_REQUEST => {
203            let body = response
204                .text()
205                .await
206                .unwrap_or_else(|_| "response body cannot be decoded".into());
207
208            // see mimir docs on this error condition. it's not actionable from the proxy
209            // so we drop it.
210            if body.contains("err-mimir-sample-out-of-order") {
211                CONSUMER_OPS
212                    .with_label_values(&["check_response", "BAD_REQUEST"])
213                    .inc();
214                error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
215                return Err((
216                    StatusCode::INTERNAL_SERVER_ERROR,
217                    "IGNORING METRICS due to err-mimir-sample-out-of-order",
218                ));
219            }
220            CONSUMER_OPS
221                .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
222                .inc();
223            error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
224            Err((
225                StatusCode::INTERNAL_SERVER_ERROR,
226                "unknown bad request error encountered in remote_push",
227            ))
228        }
229        code => {
230            let body = response
231                .text()
232                .await
233                .unwrap_or_else(|_| "response body cannot be decoded".into());
234            CONSUMER_OPS
235                .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
236                .inc();
237            error!("({}) ERROR: {:?}", code, body);
238            Err((
239                StatusCode::INTERNAL_SERVER_ERROR,
240                "unknown error encountered in remote_push",
241            ))
242        }
243    }
244}
245
246async fn convert(
247    mfs: Vec<MetricFamily>,
248) -> Result<impl Iterator<Item = WriteRequest>, (StatusCode, &'static str)> {
249    let result = tokio::task::spawn_blocking(|| {
250        let timer = CONSUMER_OPERATION_DURATION
251            .with_label_values(&["convert_to_remote_write_task"])
252            .start_timer();
253        let result = Mimir::from(mfs);
254        timer.observe_duration();
255        result.into_iter()
256    })
257    .await;
258
259    let result = match result {
260        Ok(v) => v,
261        Err(err) => {
262            error!("unable to convert to remote_write; {err}");
263            return Err((
264                StatusCode::INTERNAL_SERVER_ERROR,
265                "DROPPING METRICS; unable to convert to remote_write",
266            ));
267        }
268    };
269    Ok(result)
270}
271
272/// convert_to_remote_write is an expensive method due to the time it takes to submit to mimir.
273/// other operations here are optimized for async, within reason.  The post process uses a single
274/// connection to mimir and thus incurs the seriliaztion delay for each metric family sent. Possible
275/// future optimizations would be to use multiple tcp connections to mimir, within reason. Nevertheless
276/// we await on each post of each metric family so it shouldn't block any other async work in a
277/// significant way.
278pub async fn convert_to_remote_write(
279    rc: ReqwestClient,
280    node_metric: NodeMetric,
281) -> (StatusCode, &'static str) {
282    let timer = CONSUMER_OPERATION_DURATION
283        .with_label_values(&["convert_to_remote_write"])
284        .start_timer();
285
286    let remote_write_protos = match convert(node_metric.data).await {
287        Ok(v) => v,
288        Err(err) => {
289            timer.stop_and_discard();
290            return err;
291        }
292    };
293
294    // a counter so we don't iterate the node data 2x
295    let mut mf_cnt = 0;
296    for request in remote_write_protos {
297        mf_cnt += 1;
298        let compressed = match encode_compress(&request) {
299            Ok(compressed) => compressed,
300            Err(error) => return error,
301        };
302
303        let response = match rc
304            .client
305            .post(rc.settings.url.to_owned())
306            .header(reqwest::header::CONTENT_ENCODING, "snappy")
307            .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf")
308            .header("X-Prometheus-Remote-Write-Version", "0.1.0")
309            .basic_auth(
310                rc.settings.username.to_owned(),
311                Some(rc.settings.password.to_owned()),
312            )
313            .body(compressed)
314            .send()
315            .await
316        {
317            Ok(response) => response,
318            Err(error) => {
319                CONSUMER_OPS
320                    .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
321                    .inc();
322                error!("DROPPING METRICS due to post error: {error}");
323                timer.stop_and_discard();
324                return (
325                    StatusCode::INTERNAL_SERVER_ERROR,
326                    "DROPPING METRICS due to post error",
327                );
328            }
329        };
330
331        match check_response(request, response).await {
332            Ok(_) => (),
333            Err(err) => {
334                timer.stop_and_discard();
335                return err;
336            }
337        }
338    }
339    CONSUMER_OPS_SUBMITTED.inc_by(mf_cnt as f64);
340    timer.observe_duration();
341    (StatusCode::CREATED, "created")
342}
343
344#[cfg(test)]
345mod tests {
346    use prometheus::proto;
347
348    use crate::{
349        consumer::populate_labels,
350        prom_to_mimir::tests::{
351            create_histogram, create_labels, create_metric_family, create_metric_histogram,
352        },
353    };
354
355    #[test]
356    fn test_populate_labels() {
357        let mf = create_metric_family(
358            "test_histogram",
359            "i'm a help message",
360            Some(proto::MetricType::HISTOGRAM),
361            vec![create_metric_histogram(
362                create_labels(vec![]),
363                create_histogram(),
364            )],
365        );
366
367        let labeled_mf = populate_labels(
368            "validator-0".into(),
369            "unittest-network".into(),
370            "inventory-hostname".into(),
371            vec![mf],
372        );
373        let metric = &labeled_mf[0].get_metric()[0];
374        assert_eq!(
375            metric.get_label(),
376            &create_labels(vec![
377                ("network", "unittest-network"),
378                ("host", "validator-0"),
379            ])
380        );
381    }
382}