1use std::{collections::HashMap, sync::Arc, time::Duration};
5
6use anemo::{PeerId, types::PeerEvent};
7use dashmap::DashMap;
8use mysten_metrics::spawn_logged_monitored_task;
9use quinn_proto::ConnectionStats;
10use tokio::{
11 sync::oneshot::{Receiver, Sender},
12 task::JoinHandle,
13 time,
14};
15
16use super::metrics::QuinnConnectionMetrics;
17
18const CONNECTION_STAT_COLLECTION_INTERVAL: Duration = Duration::from_secs(60);
19
20pub struct ConnectionMonitorHandle {
21 handle: JoinHandle<()>,
22 stop: Sender<()>,
23 connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
24}
25
26impl ConnectionMonitorHandle {
27 pub async fn stop(self) {
28 self.stop.send(()).ok();
29 self.handle.await.ok();
30 }
31
32 pub fn connection_statuses(&self) -> Arc<DashMap<PeerId, ConnectionStatus>> {
33 self.connection_statuses.clone()
34 }
35}
36
37#[derive(Eq, PartialEq, Clone, Debug)]
38pub enum ConnectionStatus {
39 Connected,
40 Disconnected,
41}
42
43pub struct AnemoConnectionMonitor {
44 network: anemo::NetworkRef,
45 connection_metrics: Arc<QuinnConnectionMetrics>,
46 known_peers: HashMap<PeerId, String>,
47 connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
48 stop: Receiver<()>,
49}
50
51impl AnemoConnectionMonitor {
52 #[must_use]
53 pub fn spawn(
54 network: anemo::NetworkRef,
55 connection_metrics: Arc<QuinnConnectionMetrics>,
56 known_peers: HashMap<PeerId, String>,
57 ) -> ConnectionMonitorHandle {
58 let connection_statuses_outer = Arc::new(DashMap::new());
59 let connection_statuses = connection_statuses_outer.clone();
60 let (stop_sender, stop) = tokio::sync::oneshot::channel();
61 let handle = spawn_logged_monitored_task!(
62 Self {
63 network,
64 connection_metrics,
65 known_peers,
66 connection_statuses,
67 stop
68 }
69 .run(),
70 "AnemoConnectionMonitor"
71 );
72
73 ConnectionMonitorHandle {
74 handle,
75 stop: stop_sender,
76 connection_statuses: connection_statuses_outer,
77 }
78 }
79
80 async fn run(mut self) {
81 let (mut subscriber, connected_peers) = {
82 if let Some(network) = self.network.upgrade() {
83 let Ok((subscriber, active_peers)) = network.subscribe() else {
84 return;
85 };
86 (subscriber, active_peers)
87 } else {
88 return;
89 }
90 };
91
92 for (peer_id, peer_label) in &self.known_peers {
95 self.connection_metrics
96 .network_peer_connected
97 .with_label_values(&[&format!("{peer_id}"), peer_label])
98 .set(0)
99 }
100
101 for peer_id in connected_peers.iter() {
103 self.handle_peer_event(PeerEvent::NewPeer(*peer_id)).await;
104 }
105
106 let mut connection_stat_collection_interval =
107 time::interval(CONNECTION_STAT_COLLECTION_INTERVAL);
108
109 loop {
110 tokio::select! {
111 _ = connection_stat_collection_interval.tick() => {
112 if let Some(network) = self.network.upgrade() {
113 self.connection_metrics.socket_receive_buffer_size.set(
114 network.socket_receive_buf_size() as i64
115 );
116 self.connection_metrics.socket_send_buffer_size.set(
117 network.socket_send_buf_size() as i64
118 );
119 for (peer_id, peer_label) in &self.known_peers {
120 if let Some(connection) = network.peer(*peer_id) {
121 let stats = connection.connection_stats();
122 self.update_quinn_metrics_for_peer(&format!("{peer_id}"), peer_label, &stats);
123 }
124 }
125 } else {
126 continue;
127 }
128 }
129 Ok(event) = subscriber.recv() => {
130 self.handle_peer_event(event).await;
131 }
132 _ = &mut self.stop => {
133 tracing::debug!("Stop signal has been received, now shutting down");
134 return;
135 }
136 }
137 }
138 }
139
140 async fn handle_peer_event(&self, peer_event: PeerEvent) {
141 if let Some(network) = self.network.upgrade() {
142 self.connection_metrics
143 .network_peers
144 .set(network.peers().len() as i64);
145 } else {
146 return;
147 }
148
149 let (peer_id, status, int_status) = match peer_event {
150 PeerEvent::NewPeer(peer_id) => (peer_id, ConnectionStatus::Connected, 1),
151 PeerEvent::LostPeer(peer_id, _) => (peer_id, ConnectionStatus::Disconnected, 0),
152 };
153 self.connection_statuses.insert(peer_id, status);
154
155 if self.known_peers.contains_key(&peer_id) {
157 let peer_id_str = format!("{peer_id}");
158 let peer_label = self.known_peers.get(&peer_id).unwrap();
159
160 self.connection_metrics
161 .network_peer_connected
162 .with_label_values(&[&peer_id_str, peer_label])
163 .set(int_status);
164
165 if let PeerEvent::LostPeer(_, reason) = peer_event {
166 self.connection_metrics
167 .network_peer_disconnects
168 .with_label_values(&[&peer_id_str, peer_label, &format!("{reason:?}")])
169 .inc();
170 }
171 }
172 }
173
174 fn update_quinn_metrics_for_peer(
176 &self,
177 peer_id: &str,
178 peer_label: &str,
179 stats: &ConnectionStats,
180 ) {
181 self.connection_metrics
183 .network_peer_rtt
184 .with_label_values(&[peer_id, peer_label])
185 .set(stats.path.rtt.as_millis() as i64);
186 self.connection_metrics
187 .network_peer_lost_packets
188 .with_label_values(&[peer_id, peer_label])
189 .set(stats.path.lost_packets as i64);
190 self.connection_metrics
191 .network_peer_lost_bytes
192 .with_label_values(&[peer_id, peer_label])
193 .set(stats.path.lost_bytes as i64);
194 self.connection_metrics
195 .network_peer_sent_packets
196 .with_label_values(&[peer_id, peer_label])
197 .set(stats.path.sent_packets as i64);
198 self.connection_metrics
199 .network_peer_congestion_events
200 .with_label_values(&[peer_id, peer_label])
201 .set(stats.path.congestion_events as i64);
202 self.connection_metrics
203 .network_peer_congestion_window
204 .with_label_values(&[peer_id, peer_label])
205 .set(stats.path.cwnd as i64);
206
207 self.connection_metrics
209 .network_peer_max_data
210 .with_label_values(&[peer_id, peer_label, "transmitted"])
211 .set(stats.frame_tx.max_data as i64);
212 self.connection_metrics
213 .network_peer_max_data
214 .with_label_values(&[peer_id, peer_label, "received"])
215 .set(stats.frame_rx.max_data as i64);
216 self.connection_metrics
217 .network_peer_closed_connections
218 .with_label_values(&[peer_id, peer_label, "transmitted"])
219 .set(stats.frame_tx.connection_close as i64);
220 self.connection_metrics
221 .network_peer_closed_connections
222 .with_label_values(&[peer_id, peer_label, "received"])
223 .set(stats.frame_rx.connection_close as i64);
224 self.connection_metrics
225 .network_peer_data_blocked
226 .with_label_values(&[peer_id, peer_label, "transmitted"])
227 .set(stats.frame_tx.data_blocked as i64);
228 self.connection_metrics
229 .network_peer_data_blocked
230 .with_label_values(&[peer_id, peer_label, "received"])
231 .set(stats.frame_rx.data_blocked as i64);
232
233 self.connection_metrics
235 .network_peer_udp_datagrams
236 .with_label_values(&[peer_id, peer_label, "transmitted"])
237 .set(stats.udp_tx.datagrams as i64);
238 self.connection_metrics
239 .network_peer_udp_datagrams
240 .with_label_values(&[peer_id, peer_label, "received"])
241 .set(stats.udp_rx.datagrams as i64);
242 self.connection_metrics
243 .network_peer_udp_bytes
244 .with_label_values(&[peer_id, peer_label, "transmitted"])
245 .set(stats.udp_tx.bytes as i64);
246 self.connection_metrics
247 .network_peer_udp_bytes
248 .with_label_values(&[peer_id, peer_label, "received"])
249 .set(stats.udp_rx.bytes as i64);
250 self.connection_metrics
251 .network_peer_udp_transmits
252 .with_label_values(&[peer_id, peer_label, "transmitted"])
253 .set(stats.udp_tx.ios as i64);
254 self.connection_metrics
255 .network_peer_udp_transmits
256 .with_label_values(&[peer_id, peer_label, "received"])
257 .set(stats.udp_rx.ios as i64);
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use std::{collections::HashMap, convert::Infallible};
264
265 use anemo::{Network, Request, Response};
266 use bytes::Bytes;
267 use prometheus::Registry;
268 use tokio::time::{sleep, timeout};
269 use tower::util::BoxCloneService;
270
271 use super::*;
272
273 #[tokio::test]
274 async fn test_connectivity() {
275 let network_1 = build_network().unwrap();
277 let network_2 = build_network().unwrap();
278 let network_3 = build_network().unwrap();
279
280 let registry = Registry::new();
281 let metrics = Arc::new(QuinnConnectionMetrics::new("consensus", ®istry));
282
283 let peer_2 = network_1.connect(network_2.local_addr()).await.unwrap();
285
286 let mut known_peers = HashMap::new();
287 known_peers.insert(network_2.peer_id(), "peer_2".to_string());
288 known_peers.insert(network_3.peer_id(), "peer_3".to_string());
289
290 let handle =
292 AnemoConnectionMonitor::spawn(network_1.downgrade(), metrics.clone(), known_peers);
293 let connection_statuses = handle.connection_statuses();
294
295 assert_network_peers(&metrics, 1).await;
297
298 let mut labels = HashMap::new();
300 let peer_2_str = format!("{peer_2}");
301 labels.insert("peer_id", peer_2_str.as_str());
302 labels.insert("peer_label", "peer_2");
303 assert_ne!(
304 metrics
305 .network_peer_rtt
306 .get_metric_with(&labels)
307 .unwrap()
308 .get(),
309 0
310 );
311 assert_eq!(
312 *connection_statuses.get(&peer_2).unwrap().value(),
313 ConnectionStatus::Connected
314 );
315
316 let peer_3 = network_1.connect(network_3.local_addr()).await.unwrap();
318
319 assert_network_peers(&metrics, 2).await;
321 assert_eq!(
322 *connection_statuses.get(&peer_3).unwrap().value(),
323 ConnectionStatus::Connected
324 );
325
326 network_1.disconnect(peer_2).unwrap();
328
329 assert_network_peers(&metrics, 1).await;
331 assert_eq!(
332 *connection_statuses.get(&peer_2).unwrap().value(),
333 ConnectionStatus::Disconnected
334 );
335
336 network_1.disconnect(peer_3).unwrap();
338
339 assert_network_peers(&metrics, 0).await;
341 assert_eq!(
342 *connection_statuses.get(&peer_3).unwrap().value(),
343 ConnectionStatus::Disconnected
344 );
345 }
346
347 async fn assert_network_peers(metrics: &QuinnConnectionMetrics, value: i64) {
348 timeout(Duration::from_secs(5), async move {
349 while metrics.network_peers.get() != value {
350 sleep(Duration::from_millis(500)).await;
351 }
352 })
353 .await
354 .unwrap_or_else(|_| {
355 panic!(
356 "Timeout while waiting for connectivity results for value {}",
357 value
358 )
359 });
360
361 assert_eq!(metrics.network_peers.get(), value);
362 }
363
364 fn build_network() -> anyhow::Result<Network> {
365 let network = Network::bind("localhost:0")
366 .private_key(random_private_key())
367 .server_name("test")
368 .start(echo_service())?;
369 Ok(network)
370 }
371
372 fn echo_service() -> BoxCloneService<Request<Bytes>, Response<Bytes>, Infallible> {
373 let handle = move |request: Request<Bytes>| async move {
374 let response = Response::new(request.into_body());
375 Result::<Response<Bytes>, Infallible>::Ok(response)
376 };
377
378 tower::ServiceExt::boxed_clone(tower::service_fn(handle))
379 }
380
381 fn random_private_key() -> [u8; 32] {
382 let mut rng = rand::thread_rng();
383 let mut bytes = [0u8; 32];
384 rand::RngCore::fill_bytes(&mut rng, &mut bytes[..]);
385
386 bytes
387 }
388}