sui_proxy/
consumer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::admin::ReqwestClient;
5use crate::prom_to_mimir::Mimir;
6use crate::remote_write::WriteRequest;
7use anyhow::Result;
8use axum::body::Bytes;
9use axum::http::StatusCode;
10use bytes::buf::Reader;
11use fastcrypto::ed25519::Ed25519PublicKey;
12use multiaddr::Multiaddr;
13use once_cell::sync::Lazy;
14use prometheus::proto::{self, MetricFamily};
15use prometheus::{Counter, CounterVec, HistogramVec};
16use prometheus::{register_counter, register_counter_vec, register_histogram_vec};
17use prost::Message;
18use protobuf::CodedInputStream;
19use std::io::Read;
20use tracing::{debug, error};
21
22static CONSUMER_OPS_SUBMITTED: Lazy<Counter> = Lazy::new(|| {
23    register_counter!(
24        "consumer_operations_submitted",
25        "Operations counter for the number of metric family types we submit, excluding histograms, and not the discrete timeseries counts.",
26    )
27    .unwrap()
28});
29static CONSUMER_OPS: Lazy<CounterVec> = Lazy::new(|| {
30    register_counter_vec!(
31        "consumer_operations",
32        "Operations counters and status from operations performed in the consumer.",
33        &["operation", "status"]
34    )
35    .unwrap()
36});
37static CONSUMER_ENCODE_COMPRESS_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
38    register_histogram_vec!(
39        "protobuf_compression_seconds",
40        "The time it takes to compress a remote_write payload in seconds.",
41        &["operation"],
42        vec![
43            1e-08, 2e-08, 4e-08, 8e-08, 1.6e-07, 3.2e-07, 6.4e-07, 1.28e-06, 2.56e-06, 5.12e-06,
44            1.024e-05, 2.048e-05, 4.096e-05, 8.192e-05
45        ],
46    )
47    .unwrap()
48});
49static CONSUMER_OPERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
50    register_histogram_vec!(
51        "consumer_operations_duration_seconds",
52        "The time it takes to perform various consumer operations in seconds.",
53        &["operation"],
54        vec![
55            0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
56            1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 3.25, 3.5, 3.75, 4.0, 4.25, 4.5, 4.75,
57            5.0, 5.25, 5.5, 5.75, 6.0, 6.25, 6.5, 6.75, 7.0, 7.25, 7.5, 7.75, 8.0, 8.25, 8.5, 8.75,
58            9.0, 9.25, 9.5, 9.75, 10.0, 10.25, 10.5, 10.75, 11.0, 11.25, 11.5, 11.75, 12.0, 12.25,
59            12.5, 12.75, 13.0, 13.25, 13.5, 13.75, 14.0, 14.25, 14.5, 14.75, 15.0, 15.25, 15.5,
60            15.75, 16.0, 16.25, 16.5, 16.75, 17.0, 17.25, 17.5, 17.75, 18.0, 18.25, 18.5, 18.75,
61            19.0, 19.25, 19.5, 19.75, 20.0, 20.25, 20.5, 20.75, 21.0, 21.25, 21.5, 21.75, 22.0,
62            22.25, 22.5, 22.75, 23.0, 23.25, 23.5, 23.75, 24.0, 24.25, 24.5, 24.75, 25.0, 26.0,
63            27.0, 28.0, 29.0, 30.0
64        ],
65    )
66    .unwrap()
67});
68
69/// NodeMetric holds metadata and a metric payload from the calling node
70#[derive(Debug)]
71pub struct NodeMetric {
72    pub peer_addr: Multiaddr, // the sockaddr source address from the incoming request
73    pub public_key: Ed25519PublicKey, // the public key from the sui blockchain
74    pub data: Vec<proto::MetricFamily>, // decoded protobuf of prometheus data
75}
76
77/// The ProtobufDecoder will decode message delimited protobuf messages from prom_model.proto types
78/// They are delimited by size, eg a format is such:
79/// []byte{size, data, size, data, size, data}, etc
80pub struct ProtobufDecoder {
81    buf: Reader<Bytes>,
82}
83
84impl ProtobufDecoder {
85    pub fn new(buf: Reader<Bytes>) -> Self {
86        Self { buf }
87    }
88    /// parse a delimited buffer of protobufs. this is used to consume data sent from a sui-node
89    pub fn parse<T: protobuf::Message>(&mut self) -> Result<Vec<T>> {
90        let timer = CONSUMER_OPERATION_DURATION
91            .with_label_values(&["decode_len_delim_protobuf"])
92            .start_timer();
93        let mut result: Vec<T> = vec![];
94        while !self.buf.get_ref().is_empty() {
95            let len = {
96                let mut is = CodedInputStream::from_buffered_reader(&mut self.buf);
97                is.read_raw_varint32()
98            }?;
99            let mut buf = vec![0; len as usize];
100            self.buf.read_exact(&mut buf)?;
101            result.push(T::parse_from_bytes(&buf)?);
102        }
103        timer.observe_duration();
104        Ok(result)
105    }
106}
107
108// populate labels in place for our given metric family data
109pub fn populate_labels(
110    name: String,               // host field for grafana agent (from chain data)
111    network: String,            // network name from ansible (via config)
112    inventory_hostname: String, // inventory_name from ansible (via config)
113    data: Vec<proto::MetricFamily>,
114) -> Vec<proto::MetricFamily> {
115    let timer = CONSUMER_OPERATION_DURATION
116        .with_label_values(&["populate_labels"])
117        .start_timer();
118    debug!("received metrics from {name} on {inventory_hostname}");
119    // proto::LabelPair doesn't have pub fields so we can't use
120    // struct literals to construct
121    let mut network_label = proto::LabelPair::default();
122    network_label.set_name("network".into());
123    network_label.set_value(network);
124
125    let mut host_label = proto::LabelPair::default();
126    host_label.set_name("host".into());
127    host_label.set_value(name);
128
129    let labels = vec![network_label, host_label];
130
131    let mut data = data;
132    // add our extra labels to our incoming metric data
133    for mf in data.iter_mut() {
134        for m in mf.mut_metric() {
135            m.mut_label().extend(labels.clone());
136        }
137    }
138    timer.observe_duration();
139    data
140}
141
142fn encode_compress(request: &WriteRequest) -> Result<Vec<u8>, (StatusCode, &'static str)> {
143    let observe = || {
144        let timer = CONSUMER_ENCODE_COMPRESS_DURATION
145            .with_label_values(&["encode_compress"])
146            .start_timer();
147        || {
148            timer.observe_duration();
149        }
150    }();
151    let mut buf = Vec::with_capacity(request.encoded_len());
152    if request.encode(&mut buf).is_err() {
153        observe();
154        CONSUMER_OPS
155            .with_label_values(&["encode_compress", "failed"])
156            .inc();
157        error!("unable to encode prompb to mimirpb");
158        return Err((
159            StatusCode::INTERNAL_SERVER_ERROR,
160            "unable to encode prompb to remote_write pb",
161        ));
162    };
163
164    let mut s = snap::raw::Encoder::new();
165    let compressed = match s.compress_vec(&buf) {
166        Ok(compressed) => compressed,
167        Err(error) => {
168            observe();
169            CONSUMER_OPS
170                .with_label_values(&["encode_compress", "failed"])
171                .inc();
172            error!("unable to compress to snappy block format; {error}");
173            return Err((
174                StatusCode::INTERNAL_SERVER_ERROR,
175                "unable to compress to snappy block format",
176            ));
177        }
178    };
179    observe();
180    CONSUMER_OPS
181        .with_label_values(&["encode_compress", "success"])
182        .inc();
183    Ok(compressed)
184}
185
186async fn check_response(
187    request: WriteRequest,
188    response: reqwest::Response,
189) -> Result<(), (StatusCode, &'static str)> {
190    match response.status() {
191        reqwest::StatusCode::OK => {
192            CONSUMER_OPS
193                .with_label_values(&["check_response", "OK"])
194                .inc();
195            debug!("({}) SUCCESS: {:?}", reqwest::StatusCode::OK, request);
196            Ok(())
197        }
198        reqwest::StatusCode::BAD_REQUEST => {
199            let body = response
200                .text()
201                .await
202                .unwrap_or_else(|_| "response body cannot be decoded".into());
203
204            // see mimir docs on this error condition. it's not actionable from the proxy
205            // so we drop it.
206            if body.contains("err-mimir-sample-out-of-order") {
207                CONSUMER_OPS
208                    .with_label_values(&["check_response", "BAD_REQUEST"])
209                    .inc();
210                error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
211                return Err((
212                    StatusCode::INTERNAL_SERVER_ERROR,
213                    "IGNORING METRICS due to err-mimir-sample-out-of-order",
214                ));
215            }
216            CONSUMER_OPS
217                .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
218                .inc();
219            error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
220            Err((
221                StatusCode::INTERNAL_SERVER_ERROR,
222                "unknown bad request error encountered in remote_push",
223            ))
224        }
225        code => {
226            let body = response
227                .text()
228                .await
229                .unwrap_or_else(|_| "response body cannot be decoded".into());
230            CONSUMER_OPS
231                .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
232                .inc();
233            error!("({}) ERROR: {:?}", code, body);
234            Err((
235                StatusCode::INTERNAL_SERVER_ERROR,
236                "unknown error encountered in remote_push",
237            ))
238        }
239    }
240}
241
242async fn convert(
243    mfs: Vec<MetricFamily>,
244) -> Result<impl Iterator<Item = WriteRequest>, (StatusCode, &'static str)> {
245    let result = tokio::task::spawn_blocking(|| {
246        let timer = CONSUMER_OPERATION_DURATION
247            .with_label_values(&["convert_to_remote_write_task"])
248            .start_timer();
249        let result = Mimir::from(mfs);
250        timer.observe_duration();
251        result.into_iter()
252    })
253    .await;
254
255    let result = match result {
256        Ok(v) => v,
257        Err(err) => {
258            error!("unable to convert to remote_write; {err}");
259            return Err((
260                StatusCode::INTERNAL_SERVER_ERROR,
261                "DROPPING METRICS; unable to convert to remote_write",
262            ));
263        }
264    };
265    Ok(result)
266}
267
268/// convert_to_remote_write is an expensive method due to the time it takes to submit to mimir.
269/// other operations here are optimized for async, within reason.  The post process uses a single
270/// connection to mimir and thus incurs the seriliaztion delay for each metric family sent. Possible
271/// future optimizations would be to use multiple tcp connections to mimir, within reason. Nevertheless
272/// we await on each post of each metric family so it shouldn't block any other async work in a
273/// significant way.
274pub async fn convert_to_remote_write(
275    rc: ReqwestClient,
276    node_metric: NodeMetric,
277) -> (StatusCode, &'static str) {
278    let timer = CONSUMER_OPERATION_DURATION
279        .with_label_values(&["convert_to_remote_write"])
280        .start_timer();
281
282    let remote_write_protos = match convert(node_metric.data).await {
283        Ok(v) => v,
284        Err(err) => {
285            timer.stop_and_discard();
286            return err;
287        }
288    };
289
290    // a counter so we don't iterate the node data 2x
291    let mut mf_cnt = 0;
292    for request in remote_write_protos {
293        mf_cnt += 1;
294        let compressed = match encode_compress(&request) {
295            Ok(compressed) => compressed,
296            Err(error) => return error,
297        };
298
299        let response = match rc
300            .client
301            .post(rc.settings.url.to_owned())
302            .header(reqwest::header::CONTENT_ENCODING, "snappy")
303            .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf")
304            .header("X-Prometheus-Remote-Write-Version", "0.1.0")
305            .basic_auth(
306                rc.settings.username.to_owned(),
307                Some(rc.settings.password.to_owned()),
308            )
309            .body(compressed)
310            .send()
311            .await
312        {
313            Ok(response) => response,
314            Err(error) => {
315                CONSUMER_OPS
316                    .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
317                    .inc();
318                error!("DROPPING METRICS due to post error: {error}");
319                timer.stop_and_discard();
320                return (
321                    StatusCode::INTERNAL_SERVER_ERROR,
322                    "DROPPING METRICS due to post error",
323                );
324            }
325        };
326
327        match check_response(request, response).await {
328            Ok(_) => (),
329            Err(err) => {
330                timer.stop_and_discard();
331                return err;
332            }
333        }
334    }
335    CONSUMER_OPS_SUBMITTED.inc_by(mf_cnt as f64);
336    timer.observe_duration();
337    (StatusCode::CREATED, "created")
338}
339
340#[cfg(test)]
341mod tests {
342    use prometheus::proto;
343    use protobuf;
344
345    use crate::{
346        consumer::populate_labels,
347        prom_to_mimir::tests::{
348            create_histogram, create_labels, create_metric_family, create_metric_histogram,
349        },
350    };
351
352    #[test]
353    fn test_populate_labels() {
354        let mf = create_metric_family(
355            "test_histogram",
356            "i'm a help message",
357            Some(proto::MetricType::HISTOGRAM),
358            protobuf::RepeatedField::from(vec![create_metric_histogram(
359                protobuf::RepeatedField::from_vec(create_labels(vec![])),
360                create_histogram(),
361            )]),
362        );
363
364        let labeled_mf = populate_labels(
365            "validator-0".into(),
366            "unittest-network".into(),
367            "inventory-hostname".into(),
368            vec![mf],
369        );
370        let metric = &labeled_mf[0].get_metric()[0];
371        assert_eq!(
372            metric.get_label(),
373            &create_labels(vec![
374                ("network", "unittest-network"),
375                ("host", "validator-0"),
376            ])
377        );
378    }
379}