consensus_core/network/
connection_monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::HashMap, sync::Arc, time::Duration};
5
6use anemo::{PeerId, types::PeerEvent};
7use dashmap::DashMap;
8use mysten_metrics::spawn_logged_monitored_task;
9use quinn_proto::ConnectionStats;
10use tokio::{
11    sync::oneshot::{Receiver, Sender},
12    task::JoinHandle,
13    time,
14};
15
16use super::metrics::QuinnConnectionMetrics;
17
18const CONNECTION_STAT_COLLECTION_INTERVAL: Duration = Duration::from_secs(60);
19
20pub struct ConnectionMonitorHandle {
21    handle: JoinHandle<()>,
22    stop: Sender<()>,
23    connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
24}
25
26impl ConnectionMonitorHandle {
27    pub async fn stop(self) {
28        self.stop.send(()).ok();
29        self.handle.await.ok();
30    }
31
32    pub fn connection_statuses(&self) -> Arc<DashMap<PeerId, ConnectionStatus>> {
33        self.connection_statuses.clone()
34    }
35}
36
37#[derive(Eq, PartialEq, Clone, Debug)]
38pub enum ConnectionStatus {
39    Connected,
40    Disconnected,
41}
42
43pub struct AnemoConnectionMonitor {
44    network: anemo::NetworkRef,
45    connection_metrics: Arc<QuinnConnectionMetrics>,
46    known_peers: HashMap<PeerId, String>,
47    connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
48    stop: Receiver<()>,
49}
50
51impl AnemoConnectionMonitor {
52    #[must_use]
53    pub fn spawn(
54        network: anemo::NetworkRef,
55        connection_metrics: Arc<QuinnConnectionMetrics>,
56        known_peers: HashMap<PeerId, String>,
57    ) -> ConnectionMonitorHandle {
58        let connection_statuses_outer = Arc::new(DashMap::new());
59        let connection_statuses = connection_statuses_outer.clone();
60        let (stop_sender, stop) = tokio::sync::oneshot::channel();
61        let handle = spawn_logged_monitored_task!(
62            Self {
63                network,
64                connection_metrics,
65                known_peers,
66                connection_statuses,
67                stop
68            }
69            .run(),
70            "AnemoConnectionMonitor"
71        );
72
73        ConnectionMonitorHandle {
74            handle,
75            stop: stop_sender,
76            connection_statuses: connection_statuses_outer,
77        }
78    }
79
80    async fn run(mut self) {
81        let (mut subscriber, connected_peers) = {
82            if let Some(network) = self.network.upgrade() {
83                let Ok((subscriber, active_peers)) = network.subscribe() else {
84                    return;
85                };
86                (subscriber, active_peers)
87            } else {
88                return;
89            }
90        };
91
92        // we report first all the known peers as disconnected - so we can see
93        // their labels in the metrics reporting tool
94        for (peer_id, peer_label) in &self.known_peers {
95            self.connection_metrics
96                .network_peer_connected
97                .with_label_values(&[&format!("{peer_id}"), peer_label])
98                .set(0)
99        }
100
101        // now report the connected peers
102        for peer_id in connected_peers.iter() {
103            self.handle_peer_event(PeerEvent::NewPeer(*peer_id)).await;
104        }
105
106        let mut connection_stat_collection_interval =
107            time::interval(CONNECTION_STAT_COLLECTION_INTERVAL);
108
109        loop {
110            tokio::select! {
111                _ = connection_stat_collection_interval.tick() => {
112                    if let Some(network) = self.network.upgrade() {
113                        self.connection_metrics.socket_receive_buffer_size.set(
114                            network.socket_receive_buf_size() as i64
115                        );
116                        self.connection_metrics.socket_send_buffer_size.set(
117                            network.socket_send_buf_size() as i64
118                        );
119                        for (peer_id, peer_label) in &self.known_peers {
120                            if let Some(connection) = network.peer(*peer_id) {
121                                let stats = connection.connection_stats();
122                                self.update_quinn_metrics_for_peer(&format!("{peer_id}"), peer_label, &stats);
123                            }
124                        }
125                    } else {
126                        continue;
127                    }
128                }
129                Ok(event) = subscriber.recv() => {
130                    self.handle_peer_event(event).await;
131                }
132                _ = &mut self.stop => {
133                    tracing::debug!("Stop signal has been received, now shutting down");
134                    return;
135                }
136            }
137        }
138    }
139
140    async fn handle_peer_event(&self, peer_event: PeerEvent) {
141        if let Some(network) = self.network.upgrade() {
142            self.connection_metrics
143                .network_peers
144                .set(network.peers().len() as i64);
145        } else {
146            return;
147        }
148
149        let (peer_id, status, int_status) = match peer_event {
150            PeerEvent::NewPeer(peer_id) => (peer_id, ConnectionStatus::Connected, 1),
151            PeerEvent::LostPeer(peer_id, _) => (peer_id, ConnectionStatus::Disconnected, 0),
152        };
153        self.connection_statuses.insert(peer_id, status);
154
155        // Only report peer IDs for known peers to prevent unlimited cardinality.
156        if self.known_peers.contains_key(&peer_id) {
157            let peer_id_str = format!("{peer_id}");
158            let peer_label = self.known_peers.get(&peer_id).unwrap();
159
160            self.connection_metrics
161                .network_peer_connected
162                .with_label_values(&[&peer_id_str, peer_label])
163                .set(int_status);
164
165            if let PeerEvent::LostPeer(_, reason) = peer_event {
166                self.connection_metrics
167                    .network_peer_disconnects
168                    .with_label_values(&[&peer_id_str, peer_label, &format!("{reason:?}")])
169                    .inc();
170            }
171        }
172    }
173
174    // TODO: Replace this with ClosureMetric
175    fn update_quinn_metrics_for_peer(
176        &self,
177        peer_id: &str,
178        peer_label: &str,
179        stats: &ConnectionStats,
180    ) {
181        // Update PathStats
182        self.connection_metrics
183            .network_peer_rtt
184            .with_label_values(&[peer_id, peer_label])
185            .set(stats.path.rtt.as_millis() as i64);
186        self.connection_metrics
187            .network_peer_lost_packets
188            .with_label_values(&[peer_id, peer_label])
189            .set(stats.path.lost_packets as i64);
190        self.connection_metrics
191            .network_peer_lost_bytes
192            .with_label_values(&[peer_id, peer_label])
193            .set(stats.path.lost_bytes as i64);
194        self.connection_metrics
195            .network_peer_sent_packets
196            .with_label_values(&[peer_id, peer_label])
197            .set(stats.path.sent_packets as i64);
198        self.connection_metrics
199            .network_peer_congestion_events
200            .with_label_values(&[peer_id, peer_label])
201            .set(stats.path.congestion_events as i64);
202        self.connection_metrics
203            .network_peer_congestion_window
204            .with_label_values(&[peer_id, peer_label])
205            .set(stats.path.cwnd as i64);
206
207        // Update FrameStats
208        self.connection_metrics
209            .network_peer_max_data
210            .with_label_values(&[peer_id, peer_label, "transmitted"])
211            .set(stats.frame_tx.max_data as i64);
212        self.connection_metrics
213            .network_peer_max_data
214            .with_label_values(&[peer_id, peer_label, "received"])
215            .set(stats.frame_rx.max_data as i64);
216        self.connection_metrics
217            .network_peer_closed_connections
218            .with_label_values(&[peer_id, peer_label, "transmitted"])
219            .set(stats.frame_tx.connection_close as i64);
220        self.connection_metrics
221            .network_peer_closed_connections
222            .with_label_values(&[peer_id, peer_label, "received"])
223            .set(stats.frame_rx.connection_close as i64);
224        self.connection_metrics
225            .network_peer_data_blocked
226            .with_label_values(&[peer_id, peer_label, "transmitted"])
227            .set(stats.frame_tx.data_blocked as i64);
228        self.connection_metrics
229            .network_peer_data_blocked
230            .with_label_values(&[peer_id, peer_label, "received"])
231            .set(stats.frame_rx.data_blocked as i64);
232
233        // Update UDPStats
234        self.connection_metrics
235            .network_peer_udp_datagrams
236            .with_label_values(&[peer_id, peer_label, "transmitted"])
237            .set(stats.udp_tx.datagrams as i64);
238        self.connection_metrics
239            .network_peer_udp_datagrams
240            .with_label_values(&[peer_id, peer_label, "received"])
241            .set(stats.udp_rx.datagrams as i64);
242        self.connection_metrics
243            .network_peer_udp_bytes
244            .with_label_values(&[peer_id, peer_label, "transmitted"])
245            .set(stats.udp_tx.bytes as i64);
246        self.connection_metrics
247            .network_peer_udp_bytes
248            .with_label_values(&[peer_id, peer_label, "received"])
249            .set(stats.udp_rx.bytes as i64);
250        self.connection_metrics
251            .network_peer_udp_transmits
252            .with_label_values(&[peer_id, peer_label, "transmitted"])
253            .set(stats.udp_tx.ios as i64);
254        self.connection_metrics
255            .network_peer_udp_transmits
256            .with_label_values(&[peer_id, peer_label, "received"])
257            .set(stats.udp_rx.ios as i64);
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use std::{collections::HashMap, convert::Infallible};
264
265    use anemo::{Network, Request, Response};
266    use bytes::Bytes;
267    use prometheus::Registry;
268    use tokio::time::{sleep, timeout};
269    use tower::util::BoxCloneService;
270
271    use super::*;
272
273    #[tokio::test]
274    async fn test_connectivity() {
275        // GIVEN
276        let network_1 = build_network().unwrap();
277        let network_2 = build_network().unwrap();
278        let network_3 = build_network().unwrap();
279
280        let registry = Registry::new();
281        let metrics = Arc::new(QuinnConnectionMetrics::new("consensus", &registry));
282
283        // AND we connect to peer 2
284        let peer_2 = network_1.connect(network_2.local_addr()).await.unwrap();
285
286        let mut known_peers = HashMap::new();
287        known_peers.insert(network_2.peer_id(), "peer_2".to_string());
288        known_peers.insert(network_3.peer_id(), "peer_3".to_string());
289
290        // WHEN bring up the monitor
291        let handle =
292            AnemoConnectionMonitor::spawn(network_1.downgrade(), metrics.clone(), known_peers);
293        let connection_statuses = handle.connection_statuses();
294
295        // THEN peer 2 should be already connected
296        assert_network_peers(&metrics, 1).await;
297
298        // AND we should have collected connection stats
299        let mut labels = HashMap::new();
300        let peer_2_str = format!("{peer_2}");
301        labels.insert("peer_id", peer_2_str.as_str());
302        labels.insert("peer_label", "peer_2");
303        assert_ne!(
304            metrics
305                .network_peer_rtt
306                .get_metric_with(&labels)
307                .unwrap()
308                .get(),
309            0
310        );
311        assert_eq!(
312            *connection_statuses.get(&peer_2).unwrap().value(),
313            ConnectionStatus::Connected
314        );
315
316        // WHEN connect to peer 3
317        let peer_3 = network_1.connect(network_3.local_addr()).await.unwrap();
318
319        // THEN
320        assert_network_peers(&metrics, 2).await;
321        assert_eq!(
322            *connection_statuses.get(&peer_3).unwrap().value(),
323            ConnectionStatus::Connected
324        );
325
326        // AND disconnect peer 2
327        network_1.disconnect(peer_2).unwrap();
328
329        // THEN
330        assert_network_peers(&metrics, 1).await;
331        assert_eq!(
332            *connection_statuses.get(&peer_2).unwrap().value(),
333            ConnectionStatus::Disconnected
334        );
335
336        // AND disconnect peer 3
337        network_1.disconnect(peer_3).unwrap();
338
339        // THEN
340        assert_network_peers(&metrics, 0).await;
341        assert_eq!(
342            *connection_statuses.get(&peer_3).unwrap().value(),
343            ConnectionStatus::Disconnected
344        );
345    }
346
347    async fn assert_network_peers(metrics: &QuinnConnectionMetrics, value: i64) {
348        timeout(Duration::from_secs(5), async move {
349            while metrics.network_peers.get() != value {
350                sleep(Duration::from_millis(500)).await;
351            }
352        })
353        .await
354        .unwrap_or_else(|_| {
355            panic!(
356                "Timeout while waiting for connectivity results for value {}",
357                value
358            )
359        });
360
361        assert_eq!(metrics.network_peers.get(), value);
362    }
363
364    fn build_network() -> anyhow::Result<Network> {
365        let network = Network::bind("localhost:0")
366            .private_key(random_private_key())
367            .server_name("test")
368            .start(echo_service())?;
369        Ok(network)
370    }
371
372    fn echo_service() -> BoxCloneService<Request<Bytes>, Response<Bytes>, Infallible> {
373        let handle = move |request: Request<Bytes>| async move {
374            let response = Response::new(request.into_body());
375            Result::<Response<Bytes>, Infallible>::Ok(response)
376        };
377
378        tower::ServiceExt::boxed_clone(tower::service_fn(handle))
379    }
380
381    fn random_private_key() -> [u8; 32] {
382        let mut rng = rand::thread_rng();
383        let mut bytes = [0u8; 32];
384        rand::RngCore::fill_bytes(&mut rng, &mut bytes[..]);
385
386        bytes
387    }
388}