consensus_core/network/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{pin::Pin, sync::Arc, time::Duration};
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use consensus_config::{AuthorityIndex, NetworkKeyPair};
24use consensus_types::block::{BlockRef, Round};
25use futures::Stream;
26
27use crate::{
28    block::{ExtendedBlock, VerifiedBlock},
29    commit::{CommitRange, TrustedCommit},
30    context::Context,
31    error::ConsensusResult,
32};
33
34// Anemo generated RPC stubs.
35mod anemo_gen {
36    include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusRpc.rs"));
37}
38
39// Tonic generated RPC stubs.
40mod tonic_gen {
41    include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
42}
43
44pub mod connection_monitor;
45
46pub(crate) mod anemo_network;
47pub(crate) mod epoch_filter;
48pub(crate) mod metrics;
49mod metrics_layer;
50#[cfg(all(test, not(msim)))]
51mod network_tests;
52#[cfg(test)]
53pub(crate) mod test_network;
54#[cfg(not(msim))]
55pub(crate) mod tonic_network;
56#[cfg(msim)]
57pub mod tonic_network;
58mod tonic_tls;
59
60/// A stream of serialized filtered blocks returned over the network.
61pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
62
63/// Network client for communicating with peers.
64///
65/// NOTE: the timeout parameters help saving resources at client and potentially server.
66/// But it is up to the server implementation if the timeout is honored.
67/// - To bound server resources, server should implement own timeout for incoming requests.
68#[async_trait]
69pub(crate) trait NetworkClient: Send + Sync + Sized + 'static {
70    // Whether the network client streams blocks to subscribed peers.
71    const SUPPORT_STREAMING: bool;
72
73    /// Sends a serialized SignedBlock to a peer.
74    async fn send_block(
75        &self,
76        peer: AuthorityIndex,
77        block: &VerifiedBlock,
78        timeout: Duration,
79    ) -> ConsensusResult<()>;
80
81    /// Subscribes to blocks from a peer after last_received round.
82    async fn subscribe_blocks(
83        &self,
84        peer: AuthorityIndex,
85        last_received: Round,
86        timeout: Duration,
87    ) -> ConsensusResult<BlockStream>;
88
89    // TODO: add a parameter for maximum total size of blocks returned.
90    /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
91    /// of the requested blocks according to the provided `highest_accepted_rounds`. The `highest_accepted_rounds`
92    /// length should be equal to the committee size. If `highest_accepted_rounds` is empty then it will
93    /// be simply ignored.
94    async fn fetch_blocks(
95        &self,
96        peer: AuthorityIndex,
97        block_refs: Vec<BlockRef>,
98        highest_accepted_rounds: Vec<Round>,
99        breadth_first: bool,
100        timeout: Duration,
101    ) -> ConsensusResult<Vec<Bytes>>;
102
103    /// Fetches serialized commits in the commit range from a peer.
104    /// Returns a tuple of both the serialized commits, and serialized blocks that contain
105    /// votes certifying the last commit.
106    async fn fetch_commits(
107        &self,
108        peer: AuthorityIndex,
109        commit_range: CommitRange,
110        timeout: Duration,
111    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
112
113    /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
114    /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
115    /// blocks per peer as its possible to have equivocations.
116    async fn fetch_latest_blocks(
117        &self,
118        peer: AuthorityIndex,
119        authorities: Vec<AuthorityIndex>,
120        timeout: Duration,
121    ) -> ConsensusResult<Vec<Bytes>>;
122
123    /// Gets the latest received & accepted rounds of all authorities from the peer.
124    async fn get_latest_rounds(
125        &self,
126        peer: AuthorityIndex,
127        timeout: Duration,
128    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
129}
130
131/// Network service for handling requests from peers.
132/// NOTE: using `async_trait` macro because `NetworkService` methods are called in the trait impl
133/// of `anemo_gen::ConsensusRpc`, which itself is annotated with `async_trait`.
134#[async_trait]
135pub(crate) trait NetworkService: Send + Sync + 'static {
136    /// Handles the block sent from the peer via either unicast RPC or subscription stream.
137    /// Peer value can be trusted to be a valid authority index.
138    /// But serialized_block must be verified before its contents are trusted.
139    /// Excluded ancestors are also included as part of an effort to further propagate
140    /// blocks to peers despite the current exclusion.
141    async fn handle_send_block(
142        &self,
143        peer: AuthorityIndex,
144        block: ExtendedSerializedBlock,
145    ) -> ConsensusResult<()>;
146
147    /// Handles the subscription request from the peer.
148    /// A stream of newly proposed blocks is returned to the peer.
149    /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
150    /// occurs.
151    async fn handle_subscribe_blocks(
152        &self,
153        peer: AuthorityIndex,
154        last_received: Round,
155    ) -> ConsensusResult<BlockStream>;
156
157    /// Handles the request to fetch blocks by references from the peer.
158    async fn handle_fetch_blocks(
159        &self,
160        peer: AuthorityIndex,
161        block_refs: Vec<BlockRef>,
162        highest_accepted_rounds: Vec<Round>,
163        breadth_first: bool,
164    ) -> ConsensusResult<Vec<Bytes>>;
165
166    /// Handles the request to fetch commits by index range from the peer.
167    async fn handle_fetch_commits(
168        &self,
169        peer: AuthorityIndex,
170        commit_range: CommitRange,
171    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
172
173    /// Handles the request to fetch the latest block for the provided `authorities`.
174    async fn handle_fetch_latest_blocks(
175        &self,
176        peer: AuthorityIndex,
177        authorities: Vec<AuthorityIndex>,
178    ) -> ConsensusResult<Vec<Bytes>>;
179
180    /// Handles the request to get the latest received & accepted rounds of all authorities.
181    async fn handle_get_latest_rounds(
182        &self,
183        peer: AuthorityIndex,
184    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
185}
186
187/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
188/// Dropping `NetworkManager` will shutdown the network service.
189pub(crate) trait NetworkManager<S>: Send + Sync
190where
191    S: NetworkService,
192{
193    type Client: NetworkClient;
194
195    /// Creates a new network manager.
196    fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
197
198    /// Returns the network client.
199    fn client(&self) -> Arc<Self::Client>;
200
201    /// Installs network service.
202    async fn install_service(&mut self, service: Arc<S>);
203
204    /// Stops the network service.
205    async fn stop(&mut self);
206}
207
208/// Serialized block with extended information from the proposing authority.
209#[derive(Clone, PartialEq, Eq, Debug)]
210pub(crate) struct ExtendedSerializedBlock {
211    pub(crate) block: Bytes,
212    // Serialized BlockRefs that are excluded from the blocks ancestors.
213    pub(crate) excluded_ancestors: Vec<Vec<u8>>,
214}
215
216impl From<ExtendedBlock> for ExtendedSerializedBlock {
217    fn from(extended_block: ExtendedBlock) -> Self {
218        Self {
219            block: extended_block.block.serialized().clone(),
220            excluded_ancestors: extended_block
221                .excluded_ancestors
222                .iter()
223                .filter_map(|r| match bcs::to_bytes(r) {
224                    Ok(serialized) => Some(serialized),
225                    Err(e) => {
226                        tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
227                        None
228                    }
229                })
230                .collect(),
231        }
232    }
233}