consensus_core/network/
anemo_network.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap},
6    panic,
7    sync::Arc,
8    time::Duration,
9};
10
11use anemo::{
12    PeerId, Response,
13    rpc::Status,
14    types::{PeerInfo, response::StatusCode},
15};
16use anemo_tower::{
17    auth::{AllowedPeers, RequireAuthorizationLayer},
18    callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler},
19    set_header::{SetRequestHeaderLayer, SetResponseHeaderLayer},
20    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use arc_swap::ArcSwapOption;
23use async_trait::async_trait;
24use bytes::Bytes;
25use consensus_config::{AuthorityIndex, NetworkKeyPair};
26use consensus_types::block::{BlockRef, Round};
27use serde::{Deserialize, Serialize};
28use tokio::sync::broadcast::error::RecvError;
29use tracing::{debug, error, warn};
30
31use super::{
32    BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkManager, NetworkService,
33    anemo_gen::{
34        consensus_rpc_client::ConsensusRpcClient,
35        consensus_rpc_server::{ConsensusRpc, ConsensusRpcServer},
36    },
37    connection_monitor::{AnemoConnectionMonitor, ConnectionMonitorHandle},
38    epoch_filter::{AllowedEpoch, EPOCH_HEADER_KEY},
39    metrics_layer::{MetricsCallbackMaker, MetricsResponseCallback, SizedRequest, SizedResponse},
40};
41use crate::{
42    CommitIndex,
43    block::VerifiedBlock,
44    commit::CommitRange,
45    context::Context,
46    error::{ConsensusError, ConsensusResult},
47};
48
49/// Implements Anemo RPC client for Consensus.
50pub(crate) struct AnemoClient {
51    context: Arc<Context>,
52    network: Arc<ArcSwapOption<anemo::Network>>,
53}
54
55impl AnemoClient {
56    const GET_CLIENT_INTERVAL: Duration = Duration::from_millis(10);
57
58    pub(crate) fn new(context: Arc<Context>) -> Self {
59        Self {
60            context,
61            network: Arc::new(ArcSwapOption::default()),
62        }
63    }
64
65    pub(crate) fn set_network(&self, network: anemo::Network) {
66        self.network.store(Some(Arc::new(network)));
67    }
68
69    async fn get_client(
70        &self,
71        peer: AuthorityIndex,
72        timeout: Duration,
73    ) -> ConsensusResult<ConsensusRpcClient<anemo::Peer>> {
74        let network = loop {
75            if let Some(network) = self.network.load_full() {
76                break network;
77            } else {
78                tokio::time::sleep(Self::GET_CLIENT_INTERVAL).await;
79            }
80        };
81
82        let authority = self.context.committee.authority(peer);
83        let peer_id = PeerId(authority.network_key.to_bytes());
84        if let Some(peer) = network.peer(peer_id) {
85            return Ok(ConsensusRpcClient::new(peer));
86        };
87
88        // If we're not connected we'll need to check to see if the Peer is a KnownPeer
89        if network.known_peers().get(&peer_id).is_none() {
90            return Err(ConsensusError::UnknownNetworkPeer(format!("{}", peer_id)));
91        }
92
93        let (mut subscriber, _) = network.subscribe().map_err(|e| {
94            ConsensusError::NetworkClientConnection(format!(
95                "Cannot subscribe to AnemoNetwork updates: {e:?}"
96            ))
97        })?;
98
99        let sleep = tokio::time::sleep(timeout);
100        tokio::pin!(sleep);
101        loop {
102            tokio::select! {
103                recv = subscriber.recv() => match recv {
104                    Ok(anemo::types::PeerEvent::NewPeer(pid)) if pid == peer_id => {
105                        // We're now connected with the peer, lets try to make a network request
106                        if let Some(peer) = network.peer(peer_id) {
107                            return Ok(ConsensusRpcClient::new(peer));
108                        }
109                        warn!("Peer {} should be connected.", peer_id)
110                    }
111                    Err(RecvError::Closed) => return Err(ConsensusError::Shutdown),
112                    Err(RecvError::Lagged(_)) => {
113                        subscriber = subscriber.resubscribe();
114                        // We lagged behind so we may have missed the connection event
115                        if let Some(peer) = network.peer(peer_id) {
116                            return Ok(ConsensusRpcClient::new(peer));
117                        }
118                    }
119                    // Just do another iteration
120                    _ => {}
121                },
122                _ = &mut sleep => {
123                    return Err(ConsensusError::PeerDisconnected(format!("{}", peer_id)));
124                },
125            }
126        }
127    }
128}
129
130#[async_trait]
131impl NetworkClient for AnemoClient {
132    const SUPPORT_STREAMING: bool = false;
133
134    async fn send_block(
135        &self,
136        peer: AuthorityIndex,
137        block: &VerifiedBlock,
138        timeout: Duration,
139    ) -> ConsensusResult<()> {
140        let mut client = self.get_client(peer, timeout).await?;
141        let request = SendBlockRequest {
142            block: block.serialized().clone(),
143        };
144        client
145            .send_block(anemo::Request::new(request).with_timeout(timeout))
146            .await
147            .map_err(|e| ConsensusError::NetworkRequest(format!("send_block failed: {e:?}")))?;
148        Ok(())
149    }
150
151    async fn subscribe_blocks(
152        &self,
153        _peer: AuthorityIndex,
154        _last_received: Round,
155        _timeout: Duration,
156    ) -> ConsensusResult<BlockStream> {
157        unimplemented!("Unimplemented")
158    }
159
160    async fn fetch_blocks(
161        &self,
162        peer: AuthorityIndex,
163        block_refs: Vec<BlockRef>,
164        highest_accepted_rounds: Vec<Round>,
165        breadth_first: bool,
166        timeout: Duration,
167    ) -> ConsensusResult<Vec<Bytes>> {
168        let mut client = self.get_client(peer, timeout).await?;
169        let request = FetchBlocksRequest {
170            block_refs: block_refs
171                .iter()
172                .filter_map(|r| match bcs::to_bytes(r) {
173                    Ok(serialized) => Some(serialized),
174                    Err(e) => {
175                        debug!("Failed to serialize block ref {:?}: {e:?}", r);
176                        None
177                    }
178                })
179                .collect(),
180            highest_accepted_rounds,
181            breadth_first,
182        };
183        let response = client
184            .fetch_blocks(anemo::Request::new(request).with_timeout(timeout))
185            .await
186            .map_err(|e: Status| {
187                if e.status() == StatusCode::RequestTimeout {
188                    ConsensusError::NetworkRequestTimeout(format!("fetch_blocks timeout: {e:?}"))
189                } else {
190                    ConsensusError::NetworkRequest(format!("fetch_blocks failed: {e:?}"))
191                }
192            })?;
193        let body = response.into_body();
194        Ok(body.blocks)
195    }
196
197    async fn fetch_commits(
198        &self,
199        peer: AuthorityIndex,
200        commit_range: CommitRange,
201        timeout: Duration,
202    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
203        let mut client = self.get_client(peer, timeout).await?;
204        let request = FetchCommitsRequest {
205            start: commit_range.start(),
206            end: commit_range.end(),
207        };
208        let response = client
209            .fetch_commits(anemo::Request::new(request).with_timeout(timeout))
210            .await
211            .map_err(|e| ConsensusError::NetworkRequest(format!("fetch_blocks failed: {e:?}")))?;
212        let response = response.into_body();
213        Ok((response.commits, response.certifier_blocks))
214    }
215
216    async fn fetch_latest_blocks(
217        &self,
218        peer: AuthorityIndex,
219        authorities: Vec<AuthorityIndex>,
220        timeout: Duration,
221    ) -> ConsensusResult<Vec<Bytes>> {
222        let mut client = self.get_client(peer, timeout).await?;
223        let request = FetchLatestBlocksRequest { authorities };
224        let response = client
225            .fetch_latest_blocks(anemo::Request::new(request).with_timeout(timeout))
226            .await
227            .map_err(|e: Status| {
228                if e.status() == StatusCode::RequestTimeout {
229                    ConsensusError::NetworkRequestTimeout(format!(
230                        "fetch_latest_blocks timeout: {e:?}"
231                    ))
232                } else {
233                    ConsensusError::NetworkRequest(format!("fetch_latest_blocks failed: {e:?}"))
234                }
235            })?;
236        let body = response.into_body();
237        Ok(body.blocks)
238    }
239
240    async fn get_latest_rounds(
241        &self,
242        peer: AuthorityIndex,
243        timeout: Duration,
244    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
245        let mut client = self.get_client(peer, timeout).await?;
246        let request = GetLatestRoundsRequest {};
247        let response = client
248            .get_latest_rounds(anemo::Request::new(request).with_timeout(timeout))
249            .await
250            .map_err(|e: Status| {
251                if e.status() == StatusCode::RequestTimeout {
252                    ConsensusError::NetworkRequestTimeout(format!(
253                        "get_latest_rounds timeout: {e:?}"
254                    ))
255                } else {
256                    ConsensusError::NetworkRequest(format!("get_latest_rounds failed: {e:?}"))
257                }
258            })?;
259        let body = response.into_body();
260        Ok((body.highest_received, body.highest_accepted))
261    }
262}
263
264/// Proxies Anemo requests to NetworkService with actual handler implementation.
265struct AnemoServiceProxy<S: NetworkService> {
266    peer_map: BTreeMap<PeerId, AuthorityIndex>,
267    service: Arc<S>,
268}
269
270impl<S: NetworkService> AnemoServiceProxy<S> {
271    fn new(context: Arc<Context>, service: Arc<S>) -> Self {
272        let peer_map = context
273            .committee
274            .authorities()
275            .map(|(index, authority)| {
276                let peer_id = PeerId(authority.network_key.to_bytes());
277                (peer_id, index)
278            })
279            .collect();
280        Self { peer_map, service }
281    }
282}
283
284#[async_trait]
285impl<S: NetworkService> ConsensusRpc for AnemoServiceProxy<S> {
286    async fn send_block(
287        &self,
288        request: anemo::Request<SendBlockRequest>,
289    ) -> Result<anemo::Response<SendBlockResponse>, anemo::rpc::Status> {
290        let Some(peer_id) = request.peer_id() else {
291            return Err(anemo::rpc::Status::new_with_message(
292                anemo::types::response::StatusCode::BadRequest,
293                "peer_id not found",
294            ));
295        };
296        let index = *self.peer_map.get(peer_id).ok_or_else(|| {
297            anemo::rpc::Status::new_with_message(
298                anemo::types::response::StatusCode::BadRequest,
299                "peer not found",
300            )
301        })?;
302        let block = request.into_body().block;
303        let block = ExtendedSerializedBlock {
304            block,
305            excluded_ancestors: vec![],
306        };
307        self.service
308            .handle_send_block(index, block)
309            .await
310            .map_err(|e| {
311                anemo::rpc::Status::new_with_message(
312                    anemo::types::response::StatusCode::BadRequest,
313                    format!("{e}"),
314                )
315            })?;
316        Ok(Response::new(SendBlockResponse {}))
317    }
318
319    async fn fetch_blocks(
320        &self,
321        request: anemo::Request<FetchBlocksRequest>,
322    ) -> Result<anemo::Response<FetchBlocksResponse>, anemo::rpc::Status> {
323        let Some(peer_id) = request.peer_id() else {
324            return Err(anemo::rpc::Status::new_with_message(
325                anemo::types::response::StatusCode::BadRequest,
326                "peer_id not found",
327            ));
328        };
329        let index = *self.peer_map.get(peer_id).ok_or_else(|| {
330            anemo::rpc::Status::new_with_message(
331                anemo::types::response::StatusCode::BadRequest,
332                "peer not found",
333            )
334        })?;
335        let body = request.into_body();
336        let block_refs = body
337            .block_refs
338            .into_iter()
339            .filter_map(|serialized| match bcs::from_bytes(&serialized) {
340                Ok(r) => Some(r),
341                Err(e) => {
342                    debug!("Failed to deserialize block ref {:?}: {e:?}", serialized);
343                    None
344                }
345            })
346            .collect();
347
348        let highest_accepted_rounds = body.highest_accepted_rounds;
349        let breadth_first = body.breadth_first;
350        let blocks = self
351            .service
352            .handle_fetch_blocks(index, block_refs, highest_accepted_rounds, breadth_first)
353            .await
354            .map_err(|e| {
355                anemo::rpc::Status::new_with_message(
356                    anemo::types::response::StatusCode::BadRequest,
357                    format!("{e}"),
358                )
359            })?;
360        Ok(Response::new(FetchBlocksResponse { blocks }))
361    }
362
363    async fn fetch_commits(
364        &self,
365        request: anemo::Request<FetchCommitsRequest>,
366    ) -> Result<anemo::Response<FetchCommitsResponse>, anemo::rpc::Status> {
367        let Some(peer_id) = request.peer_id() else {
368            return Err(anemo::rpc::Status::new_with_message(
369                anemo::types::response::StatusCode::BadRequest,
370                "peer_id not found",
371            ));
372        };
373        let index = *self.peer_map.get(peer_id).ok_or_else(|| {
374            anemo::rpc::Status::new_with_message(
375                anemo::types::response::StatusCode::BadRequest,
376                "peer not found",
377            )
378        })?;
379        let request = request.into_body();
380        let (commits, certifier_blocks) = self
381            .service
382            .handle_fetch_commits(index, (request.start..=request.end).into())
383            .await
384            .map_err(|e| {
385                anemo::rpc::Status::new_with_message(
386                    anemo::types::response::StatusCode::InternalServerError,
387                    format!("{e}"),
388                )
389            })?;
390        let commits = commits
391            .into_iter()
392            .map(|c| c.serialized().clone())
393            .collect();
394        let certifier_blocks = certifier_blocks
395            .into_iter()
396            .map(|b| b.serialized().clone())
397            .collect();
398        Ok(Response::new(FetchCommitsResponse {
399            commits,
400            certifier_blocks,
401        }))
402    }
403
404    async fn fetch_latest_blocks(
405        &self,
406        request: anemo::Request<FetchLatestBlocksRequest>,
407    ) -> Result<anemo::Response<FetchLatestBlocksResponse>, anemo::rpc::Status> {
408        let Some(peer_id) = request.peer_id() else {
409            return Err(anemo::rpc::Status::new_with_message(
410                anemo::types::response::StatusCode::BadRequest,
411                "peer_id not found",
412            ));
413        };
414        let index = *self.peer_map.get(peer_id).ok_or_else(|| {
415            anemo::rpc::Status::new_with_message(
416                anemo::types::response::StatusCode::BadRequest,
417                "peer not found",
418            )
419        })?;
420        let body = request.into_body();
421        let blocks = self
422            .service
423            .handle_fetch_latest_blocks(index, body.authorities)
424            .await
425            .map_err(|e| {
426                anemo::rpc::Status::new_with_message(
427                    anemo::types::response::StatusCode::BadRequest,
428                    format!("{e}"),
429                )
430            })?;
431        Ok(Response::new(FetchLatestBlocksResponse { blocks }))
432    }
433
434    async fn get_latest_rounds(
435        &self,
436        request: anemo::Request<GetLatestRoundsRequest>,
437    ) -> Result<anemo::Response<GetLatestRoundsResponse>, anemo::rpc::Status> {
438        let Some(peer_id) = request.peer_id() else {
439            return Err(anemo::rpc::Status::new_with_message(
440                anemo::types::response::StatusCode::BadRequest,
441                "peer_id not found",
442            ));
443        };
444        let index = *self.peer_map.get(peer_id).ok_or_else(|| {
445            anemo::rpc::Status::new_with_message(
446                anemo::types::response::StatusCode::BadRequest,
447                "peer not found",
448            )
449        })?;
450        let (highest_received, highest_accepted) = self
451            .service
452            .handle_get_latest_rounds(index)
453            .await
454            .map_err(|e| {
455                anemo::rpc::Status::new_with_message(
456                    anemo::types::response::StatusCode::InternalServerError,
457                    format!("{e}"),
458                )
459            })?;
460        Ok(Response::new(GetLatestRoundsResponse {
461            highest_received,
462            highest_accepted,
463        }))
464    }
465}
466
467/// Manages the lifecycle of Anemo network. Typical usage during initialization:
468/// 1. Create a new `AnemoManager`.
469/// 2. Take `AnemoClient` from `AnemoManager::client()`.
470/// 3. Create consensus components.
471/// 4. Create `AnemoService` for consensus RPC handler.
472/// 5. Install `AnemoService` to `AnemoManager` with `AnemoManager::install_service()`.
473pub(crate) struct AnemoManager {
474    context: Arc<Context>,
475    network_keypair: Option<NetworkKeyPair>,
476    client: Arc<AnemoClient>,
477    network: Arc<ArcSwapOption<anemo::Network>>,
478    connection_monitor_handle: Option<ConnectionMonitorHandle>,
479}
480
481impl AnemoManager {
482    pub(crate) fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self {
483        Self {
484            context: context.clone(),
485            network_keypair: Some(network_keypair),
486            client: Arc::new(AnemoClient::new(context)),
487            network: Arc::new(ArcSwapOption::default()),
488            connection_monitor_handle: None,
489        }
490    }
491}
492
493impl<S: NetworkService> NetworkManager<S> for AnemoManager {
494    type Client = AnemoClient;
495
496    fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self {
497        AnemoManager::new(context, network_keypair)
498    }
499
500    fn client(&self) -> Arc<Self::Client> {
501        self.client.clone()
502    }
503
504    async fn install_service(&mut self, service: Arc<S>) {
505        self.context
506            .metrics
507            .network_metrics
508            .network_type
509            .with_label_values(&["anemo"])
510            .set(1);
511
512        debug!("Starting anemo service");
513
514        let server = ConsensusRpcServer::new(AnemoServiceProxy::new(self.context.clone(), service));
515        let authority = self.context.committee.authority(self.context.own_index);
516        // By default, bind to the unspecified address to allow the actual address to be assigned.
517        // But bind to localhost if it is requested.
518        let own_address = if authority.address.is_localhost_ip() {
519            authority.address.clone()
520        } else {
521            authority.address.with_zero_ip()
522        };
523        let epoch_string: String = self.context.committee.epoch().to_string();
524        let inbound_network_metrics = self.context.metrics.network_metrics.inbound.clone();
525        let outbound_network_metrics = self.context.metrics.network_metrics.outbound.clone();
526        let quinn_connection_metrics = self
527            .context
528            .metrics
529            .network_metrics
530            .quinn_connection_metrics
531            .clone();
532        let all_peer_ids = self
533            .context
534            .committee
535            .authorities()
536            .map(|(_i, authority)| PeerId(authority.network_key.to_bytes()));
537
538        let routes = anemo::Router::new()
539            .route_layer(RequireAuthorizationLayer::new(AllowedPeers::new(
540                all_peer_ids,
541            )))
542            .route_layer(RequireAuthorizationLayer::new(AllowedEpoch::new(
543                epoch_string.clone(),
544            )))
545            .add_rpc_service(server);
546
547        // TODO: instrument with failpoints.
548        let service = tower::ServiceBuilder::new()
549            .layer(
550                TraceLayer::new_for_server_errors()
551                    .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
552                    .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
553            )
554            .layer(CallbackLayer::new(MetricsCallbackMaker::new(
555                inbound_network_metrics,
556                self.context.parameters.anemo.excessive_message_size,
557            )))
558            .layer(SetResponseHeaderLayer::overriding(
559                EPOCH_HEADER_KEY.parse().unwrap(),
560                epoch_string.clone(),
561            ))
562            .service(routes);
563
564        let outbound_layer = tower::ServiceBuilder::new()
565            .layer(
566                TraceLayer::new_for_client_and_server_errors()
567                    .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
568                    .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
569            )
570            .layer(CallbackLayer::new(MetricsCallbackMaker::new(
571                outbound_network_metrics,
572                self.context.parameters.anemo.excessive_message_size,
573            )))
574            .layer(SetRequestHeaderLayer::overriding(
575                EPOCH_HEADER_KEY.parse().unwrap(),
576                epoch_string,
577            ))
578            .into_inner();
579
580        let anemo_config = {
581            let mut quic_config = anemo::QuicConfig::default();
582            // Allow more concurrent streams for burst activity.
583            quic_config.max_concurrent_bidi_streams = Some(10_000);
584            // Increase send and receive buffer sizes on the primary, since the primary also
585            // needs to fetch payloads.
586            // With 200MiB buffer size and ~500ms RTT, the max throughput ~400MiB/s.
587            quic_config.stream_receive_window = Some(100 << 20);
588            quic_config.receive_window = Some(200 << 20);
589            quic_config.send_window = Some(200 << 20);
590            quic_config.crypto_buffer_size = Some(1 << 20);
591            quic_config.socket_receive_buffer_size = Some(20 << 20);
592            quic_config.socket_send_buffer_size = Some(20 << 20);
593            quic_config.allow_failed_socket_buffer_size_setting = true;
594            quic_config.max_idle_timeout_ms = Some(30_000);
595            // Enable keep alives every 5s
596            quic_config.keep_alive_interval_ms = Some(5_000);
597
598            let mut config = anemo::Config::default();
599            config.quic = Some(quic_config);
600            // Set the max_frame_size to be 1 GB to work around the issue of there being too many
601            // delegation events in the epoch change txn.
602            config.max_frame_size = Some(1 << 30);
603            // Set a default timeout of 300s for all RPC requests
604            config.inbound_request_timeout_ms = Some(300_000);
605            config.outbound_request_timeout_ms = Some(300_000);
606            config.shutdown_idle_timeout_ms = Some(1_000);
607            config.connectivity_check_interval_ms = Some(2_000);
608            config.connection_backoff_ms = Some(1_000);
609            config.max_connection_backoff_ms = Some(20_000);
610            config
611        };
612
613        let mut retries_left = 90;
614        let addr = own_address
615            .to_anemo_address()
616            .unwrap_or_else(|op| panic!("{op}: {own_address}"));
617        let private_key_bytes = self.network_keypair.take().unwrap().private_key_bytes();
618        let network = loop {
619            let network_result = anemo::Network::bind(addr.clone())
620                .server_name("consensus")
621                .private_key(private_key_bytes)
622                .config(anemo_config.clone())
623                .outbound_request_layer(outbound_layer.clone())
624                .start(service.clone());
625            match network_result {
626                Ok(n) => {
627                    break n;
628                }
629                Err(e) => {
630                    retries_left -= 1;
631
632                    if retries_left <= 0 {
633                        panic!("Failed to initialize AnemoNetwork at {addr}! Last error: {e:#?}");
634                    }
635                    warn!(
636                        "Address {addr} should be available for the Consensus service, retrying in one second: {e:#?}",
637                    );
638                    tokio::time::sleep(Duration::from_secs(1)).await;
639                }
640            }
641        };
642
643        let mut known_peer_ids = HashMap::new();
644        for (_i, authority) in self.context.committee.authorities() {
645            let peer_id = PeerId(authority.network_key.to_bytes());
646            let peer_address = match authority.address.to_anemo_address() {
647                Ok(addr) => addr,
648                // Validations are performed on addresses so this failure should not happen.
649                // But it is possible if supported anemo address formats are updated without a
650                // feature flag.
651                Err(e) => {
652                    error!(
653                        "Failed to convert {:?} to anemo address: {:?}",
654                        authority.address, e
655                    );
656                    continue;
657                }
658            };
659            let peer_info = PeerInfo {
660                peer_id,
661                affinity: anemo::types::PeerAffinity::High,
662                address: vec![peer_address.clone()],
663            };
664            network.known_peers().insert(peer_info);
665            known_peer_ids.insert(peer_id, authority.hostname.clone());
666        }
667
668        let connection_monitor_handle = AnemoConnectionMonitor::spawn(
669            network.downgrade(),
670            quinn_connection_metrics,
671            known_peer_ids,
672        );
673
674        self.connection_monitor_handle = Some(connection_monitor_handle);
675        self.client.set_network(network.clone());
676        self.network.store(Some(Arc::new(network)));
677    }
678
679    async fn stop(&mut self) {
680        if let Some(network) = self.network.load_full() {
681            if let Err(e) = network.shutdown().await {
682                warn!("Failure when shutting down AnemoNetwork: {e:?}");
683            }
684            self.network.store(None);
685        }
686
687        if let Some(connection_monitor_handle) = self.connection_monitor_handle.take() {
688            connection_monitor_handle.stop().await;
689        }
690
691        self.context
692            .metrics
693            .network_metrics
694            .network_type
695            .with_label_values(&["anemo"])
696            .set(0);
697    }
698}
699
700// Adapt MetricsCallbackMaker and MetricsResponseCallback to anemo.
701
702impl SizedRequest for anemo::Request<Bytes> {
703    fn size(&self) -> usize {
704        self.body().len()
705    }
706
707    fn route(&self) -> String {
708        self.route().to_string()
709    }
710}
711
712impl SizedResponse for anemo::Response<Bytes> {
713    fn size(&self) -> usize {
714        self.body().len()
715    }
716
717    fn error_type(&self) -> Option<String> {
718        if self.status().is_success() {
719            None
720        } else {
721            Some(self.status().to_string())
722        }
723    }
724}
725
726impl MakeCallbackHandler for MetricsCallbackMaker {
727    type Handler = MetricsResponseCallback;
728
729    fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
730        self.handle_request(request)
731    }
732}
733
734impl ResponseHandler for MetricsResponseCallback {
735    fn on_response(mut self, response: &anemo::Response<bytes::Bytes>) {
736        MetricsResponseCallback::on_response(&mut self, response)
737    }
738
739    fn on_error<E>(mut self, err: &E) {
740        MetricsResponseCallback::on_error(&mut self, err)
741    }
742}
743
744/// Network message types.
745#[derive(Clone, Serialize, Deserialize)]
746pub(crate) struct SendBlockRequest {
747    // Serialized SignedBlock.
748    block: Bytes,
749}
750
751#[derive(Clone, Serialize, Deserialize, prost::Message)]
752pub(crate) struct SendBlockResponse {}
753
754#[derive(Clone, Serialize, Deserialize)]
755pub(crate) struct FetchBlocksRequest {
756    block_refs: Vec<Vec<u8>>,
757    highest_accepted_rounds: Vec<Round>,
758    breadth_first: bool,
759}
760
761#[derive(Clone, Serialize, Deserialize)]
762pub(crate) struct FetchBlocksResponse {
763    // Serialized SignedBlock.
764    blocks: Vec<Bytes>,
765}
766
767#[derive(Clone, Serialize, Deserialize)]
768pub(crate) struct FetchCommitsRequest {
769    start: CommitIndex,
770    end: CommitIndex,
771}
772
773#[derive(Clone, Serialize, Deserialize)]
774pub(crate) struct FetchCommitsResponse {
775    // Serialized consecutive Commit.
776    commits: Vec<Bytes>,
777    // Serialized SignedBlock that certify the last commit from above.
778    certifier_blocks: Vec<Bytes>,
779}
780
781#[derive(Clone, Serialize, Deserialize)]
782pub(crate) struct FetchLatestBlocksRequest {
783    authorities: Vec<AuthorityIndex>,
784}
785
786#[derive(Clone, Serialize, Deserialize)]
787pub(crate) struct FetchLatestBlocksResponse {
788    // Serialized SignedBlocks.
789    blocks: Vec<Bytes>,
790}
791
792#[derive(Clone, Serialize, Deserialize)]
793pub(crate) struct GetLatestRoundsRequest {}
794
795#[derive(Clone, Serialize, Deserialize)]
796pub(crate) struct GetLatestRoundsResponse {
797    // Highest received round per authority.
798    highest_received: Vec<Round>,
799    // Highest accepted round per authority.
800    highest_accepted: Vec<Round>,
801}