consensus_core/network/
metrics_layer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use prometheus::HistogramTimer;

use super::metrics::NetworkRouteMetrics;

// Tower layer adapters that allow specifying callbacks for request and response handling
// exist for both anemo and http. So the metrics layer implementation can be reused across
// networking stacks.

pub(crate) trait SizedRequest {
    fn size(&self) -> usize;
    fn route(&self) -> String;
}

pub(crate) trait SizedResponse {
    fn size(&self) -> usize;
    fn error_type(&self) -> Option<String>;
}

#[derive(Clone)]
pub(crate) struct MetricsCallbackMaker {
    metrics: Arc<NetworkRouteMetrics>,
    /// Size in bytes above which a request or response message is considered excessively large
    excessive_message_size: usize,
}

impl MetricsCallbackMaker {
    pub(crate) fn new(metrics: Arc<NetworkRouteMetrics>, excessive_message_size: usize) -> Self {
        Self {
            metrics,
            excessive_message_size,
        }
    }

    // Update request metrics. And create a callback that should be called on response.
    pub(crate) fn handle_request(&self, request: &dyn SizedRequest) -> MetricsResponseCallback {
        let route = request.route();

        self.metrics.requests.with_label_values(&[&route]).inc();
        self.metrics
            .inflight_requests
            .with_label_values(&[&route])
            .inc();
        let request_size = request.size();
        if request_size > 0 {
            self.metrics
                .request_size
                .with_label_values(&[&route])
                .observe(request_size as f64);
        }
        if request_size > self.excessive_message_size {
            self.metrics
                .excessive_size_requests
                .with_label_values(&[&route])
                .inc();
        }

        let timer = self
            .metrics
            .request_latency
            .with_label_values(&[&route])
            .start_timer();

        MetricsResponseCallback {
            metrics: self.metrics.clone(),
            timer,
            route,
            excessive_message_size: self.excessive_message_size,
        }
    }
}

pub(crate) struct MetricsResponseCallback {
    metrics: Arc<NetworkRouteMetrics>,
    // The timer is held on to and "observed" once dropped
    #[allow(unused)]
    timer: HistogramTimer,
    route: String,
    excessive_message_size: usize,
}

impl MetricsResponseCallback {
    // Update response metrics.
    pub(crate) fn on_response(&mut self, response: &dyn SizedResponse) {
        let response_size = response.size();
        if response_size > 0 {
            self.metrics
                .response_size
                .with_label_values(&[&self.route])
                .observe(response_size as f64);
        }
        if response_size > self.excessive_message_size {
            self.metrics
                .excessive_size_responses
                .with_label_values(&[&self.route])
                .inc();
        }

        if let Some(err) = response.error_type() {
            self.metrics
                .errors
                .with_label_values(&[&self.route, &err])
                .inc();
        }
    }

    pub(crate) fn on_error<E>(&mut self, _error: &E) {
        self.metrics
            .errors
            .with_label_values(&[&self.route, "unknown"])
            .inc();
    }
}

impl Drop for MetricsResponseCallback {
    fn drop(&mut self) {
        self.metrics
            .inflight_requests
            .with_label_values(&[&self.route])
            .dec();
    }
}