consensus_core/network/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
//! This module defines the network interface, and provides network implementations for the
//! consensus protocol.
//!
//! Having an abstract network interface allows
//! - simplying the semantics of sending data and serving requests over the network
//! - hiding implementation specific types and semantics from the consensus protocol
//! - allowing easy swapping of network implementations, for better performance or testing
//!
//! When modifying the client and server interfaces, the principle is to keep the interfaces
//! low level, close to underlying implementations in semantics. For example, the client interface
//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
//! to a stream of blocks gets back the stream via response, instead of delivering the stream
//! directly to the server. This keeps the logic agnostics to the underlying network outside of
//! this module, so they can be reused easily across network implementations.
use std::{pin::Pin, sync::Arc, time::Duration};
use async_trait::async_trait;
use bytes::Bytes;
use consensus_config::{AuthorityIndex, NetworkKeyPair};
use futures::Stream;
use crate::{
block::{BlockRef, ExtendedBlock, VerifiedBlock},
commit::{CommitRange, TrustedCommit},
context::Context,
error::ConsensusResult,
Round,
};
// Anemo generated RPC stubs.
mod anemo_gen {
include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusRpc.rs"));
}
// Tonic generated RPC stubs.
mod tonic_gen {
include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
}
pub mod connection_monitor;
pub(crate) mod anemo_network;
pub(crate) mod epoch_filter;
pub(crate) mod metrics;
mod metrics_layer;
#[cfg(all(test, not(msim)))]
mod network_tests;
#[cfg(test)]
pub(crate) mod test_network;
#[cfg(not(msim))]
pub(crate) mod tonic_network;
#[cfg(msim)]
pub mod tonic_network;
mod tonic_tls;
/// A stream of serialized filtered blocks returned over the network.
pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
/// Network client for communicating with peers.
///
/// NOTE: the timeout parameters help saving resources at client and potentially server.
/// But it is up to the server implementation if the timeout is honored.
/// - To bound server resources, server should implement own timeout for incoming requests.
#[async_trait]
pub(crate) trait NetworkClient: Send + Sync + Sized + 'static {
// Whether the network client streams blocks to subscribed peers.
const SUPPORT_STREAMING: bool;
/// Sends a serialized SignedBlock to a peer.
async fn send_block(
&self,
peer: AuthorityIndex,
block: &VerifiedBlock,
timeout: Duration,
) -> ConsensusResult<()>;
/// Subscribes to blocks from a peer after last_received round.
async fn subscribe_blocks(
&self,
peer: AuthorityIndex,
last_received: Round,
timeout: Duration,
) -> ConsensusResult<BlockStream>;
// TODO: add a parameter for maximum total size of blocks returned.
/// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
/// of the requested blocks according to the provided `highest_accepted_rounds`. The `highest_accepted_rounds`
/// length should be equal to the committee size. If `highest_accepted_rounds` is empty then it will
/// be simply ignored.
async fn fetch_blocks(
&self,
peer: AuthorityIndex,
block_refs: Vec<BlockRef>,
highest_accepted_rounds: Vec<Round>,
timeout: Duration,
) -> ConsensusResult<Vec<Bytes>>;
/// Fetches serialized commits in the commit range from a peer.
/// Returns a tuple of both the serialized commits, and serialized blocks that contain
/// votes certifying the last commit.
async fn fetch_commits(
&self,
peer: AuthorityIndex,
commit_range: CommitRange,
timeout: Duration,
) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
/// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
/// are returned in the serialised format of `SignedBlocks`. The method can return multiple
/// blocks per peer as its possible to have equivocations.
async fn fetch_latest_blocks(
&self,
peer: AuthorityIndex,
authorities: Vec<AuthorityIndex>,
timeout: Duration,
) -> ConsensusResult<Vec<Bytes>>;
/// Gets the latest received & accepted rounds of all authorities from the peer.
async fn get_latest_rounds(
&self,
peer: AuthorityIndex,
timeout: Duration,
) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
}
/// Network service for handling requests from peers.
/// NOTE: using `async_trait` macro because `NetworkService` methods are called in the trait impl
/// of `anemo_gen::ConsensusRpc`, which itself is annotated with `async_trait`.
#[async_trait]
pub(crate) trait NetworkService: Send + Sync + 'static {
/// Handles the block sent from the peer via either unicast RPC or subscription stream.
/// Peer value can be trusted to be a valid authority index.
/// But serialized_block must be verified before its contents are trusted.
/// Excluded ancestors are also included as part of an effort to further propagate
/// blocks to peers despite the current exclusion.
async fn handle_send_block(
&self,
peer: AuthorityIndex,
block: ExtendedSerializedBlock,
) -> ConsensusResult<()>;
/// Handles the subscription request from the peer.
/// A stream of newly proposed blocks is returned to the peer.
/// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
/// occurs.
async fn handle_subscribe_blocks(
&self,
peer: AuthorityIndex,
last_received: Round,
) -> ConsensusResult<BlockStream>;
/// Handles the request to fetch blocks by references from the peer.
async fn handle_fetch_blocks(
&self,
peer: AuthorityIndex,
block_refs: Vec<BlockRef>,
highest_accepted_rounds: Vec<Round>,
) -> ConsensusResult<Vec<Bytes>>;
/// Handles the request to fetch commits by index range from the peer.
async fn handle_fetch_commits(
&self,
peer: AuthorityIndex,
commit_range: CommitRange,
) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
/// Handles the request to fetch the latest block for the provided `authorities`.
async fn handle_fetch_latest_blocks(
&self,
peer: AuthorityIndex,
authorities: Vec<AuthorityIndex>,
) -> ConsensusResult<Vec<Bytes>>;
/// Handles the request to get the latest received & accepted rounds of all authorities.
async fn handle_get_latest_rounds(
&self,
peer: AuthorityIndex,
) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
}
/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
/// Dropping `NetworkManager` will shutdown the network service.
pub(crate) trait NetworkManager<S>: Send + Sync
where
S: NetworkService,
{
type Client: NetworkClient;
/// Creates a new network manager.
fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
/// Returns the network client.
fn client(&self) -> Arc<Self::Client>;
/// Installs network service.
async fn install_service(&mut self, service: Arc<S>);
/// Stops the network service.
async fn stop(&mut self);
}
/// Serialized block with extended information from the proposing authority.
#[derive(Clone, PartialEq, Eq, Debug)]
pub(crate) struct ExtendedSerializedBlock {
pub(crate) block: Bytes,
// Serialized BlockRefs that are excluded from the blocks ancestors.
pub(crate) excluded_ancestors: Vec<Vec<u8>>,
}
impl From<ExtendedBlock> for ExtendedSerializedBlock {
fn from(extended_block: ExtendedBlock) -> Self {
Self {
block: extended_block.block.serialized().clone(),
excluded_ancestors: extended_block
.excluded_ancestors
.iter()
.filter_map(|r| match bcs::to_bytes(r) {
Ok(serialized) => Some(serialized),
Err(e) => {
tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
None
}
})
.collect(),
}
}
}