consensus_core/network/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplifying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{pin::Pin, sync::Arc, time::Duration};
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use consensus_config::{AuthorityIndex, NetworkKeyPair};
24use consensus_types::block::{BlockRef, Round};
25use futures::Stream;
26
27use crate::{
28 block::{ExtendedBlock, VerifiedBlock},
29 commit::{CommitRange, TrustedCommit},
30 context::Context,
31 error::ConsensusResult,
32};
33
34// Tonic generated RPC stubs.
35mod tonic_gen {
36 include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
37}
38
39pub(crate) mod metrics;
40mod metrics_layer;
41#[cfg(all(test, not(msim)))]
42mod network_tests;
43#[cfg(test)]
44pub(crate) mod test_network;
45#[cfg(not(msim))]
46pub(crate) mod tonic_network;
47#[cfg(msim)]
48pub mod tonic_network;
49mod tonic_tls;
50
51/// A stream of serialized filtered blocks returned over the network.
52pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
53
54/// Network client for communicating with peers.
55///
56/// NOTE: the timeout parameters help saving resources at client and potentially server.
57/// But it is up to the server implementation if the timeout is honored.
58/// - To bound server resources, server should implement own timeout for incoming requests.
59#[async_trait]
60pub(crate) trait NetworkClient: Send + Sync + Sized + 'static {
61 /// Subscribes to blocks from a peer after last_received round.
62 async fn subscribe_blocks(
63 &self,
64 peer: AuthorityIndex,
65 last_received: Round,
66 timeout: Duration,
67 ) -> ConsensusResult<BlockStream>;
68
69 // TODO: add a parameter for maximum total size of blocks returned.
70 /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
71 /// of the requested blocks according to the provided `highest_accepted_rounds`. The `highest_accepted_rounds`
72 /// length should be equal to the committee size. If `highest_accepted_rounds` is empty then it will
73 /// be simply ignored.
74 async fn fetch_blocks(
75 &self,
76 peer: AuthorityIndex,
77 block_refs: Vec<BlockRef>,
78 highest_accepted_rounds: Vec<Round>,
79 breadth_first: bool,
80 timeout: Duration,
81 ) -> ConsensusResult<Vec<Bytes>>;
82
83 /// Fetches serialized commits in the commit range from a peer.
84 /// Returns a tuple of both the serialized commits, and serialized blocks that contain
85 /// votes certifying the last commit.
86 async fn fetch_commits(
87 &self,
88 peer: AuthorityIndex,
89 commit_range: CommitRange,
90 timeout: Duration,
91 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
92
93 /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
94 /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
95 /// blocks per peer as its possible to have equivocations.
96 async fn fetch_latest_blocks(
97 &self,
98 peer: AuthorityIndex,
99 authorities: Vec<AuthorityIndex>,
100 timeout: Duration,
101 ) -> ConsensusResult<Vec<Bytes>>;
102
103 /// Gets the latest received & accepted rounds of all authorities from the peer.
104 async fn get_latest_rounds(
105 &self,
106 peer: AuthorityIndex,
107 timeout: Duration,
108 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
109
110 /// Sends a serialized SignedBlock to a peer.
111 #[cfg(test)]
112 async fn send_block(
113 &self,
114 peer: AuthorityIndex,
115 block: &VerifiedBlock,
116 timeout: Duration,
117 ) -> ConsensusResult<()>;
118}
119
120/// Network service for handling requests from peers.
121#[async_trait]
122pub(crate) trait NetworkService: Send + Sync + 'static {
123 /// Handles the block sent from the peer via either unicast RPC or subscription stream.
124 /// Peer value can be trusted to be a valid authority index.
125 /// But serialized_block must be verified before its contents are trusted.
126 /// Excluded ancestors are also included as part of an effort to further propagate
127 /// blocks to peers despite the current exclusion.
128 async fn handle_send_block(
129 &self,
130 peer: AuthorityIndex,
131 block: ExtendedSerializedBlock,
132 ) -> ConsensusResult<()>;
133
134 /// Handles the subscription request from the peer.
135 /// A stream of newly proposed blocks is returned to the peer.
136 /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
137 /// occurs.
138 async fn handle_subscribe_blocks(
139 &self,
140 peer: AuthorityIndex,
141 last_received: Round,
142 ) -> ConsensusResult<BlockStream>;
143
144 /// Handles the request to fetch blocks by references from the peer.
145 async fn handle_fetch_blocks(
146 &self,
147 peer: AuthorityIndex,
148 block_refs: Vec<BlockRef>,
149 highest_accepted_rounds: Vec<Round>,
150 breadth_first: bool,
151 ) -> ConsensusResult<Vec<Bytes>>;
152
153 /// Handles the request to fetch commits by index range from the peer.
154 async fn handle_fetch_commits(
155 &self,
156 peer: AuthorityIndex,
157 commit_range: CommitRange,
158 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
159
160 /// Handles the request to fetch the latest block for the provided `authorities`.
161 async fn handle_fetch_latest_blocks(
162 &self,
163 peer: AuthorityIndex,
164 authorities: Vec<AuthorityIndex>,
165 ) -> ConsensusResult<Vec<Bytes>>;
166
167 /// Handles the request to get the latest received & accepted rounds of all authorities.
168 async fn handle_get_latest_rounds(
169 &self,
170 peer: AuthorityIndex,
171 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
172}
173
174/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
175/// Dropping `NetworkManager` will shutdown the network service.
176pub(crate) trait NetworkManager<S>: Send + Sync
177where
178 S: NetworkService,
179{
180 type Client: NetworkClient;
181
182 /// Creates a new network manager.
183 fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
184
185 /// Returns the network client.
186 fn client(&self) -> Arc<Self::Client>;
187
188 /// Installs network service.
189 async fn install_service(&mut self, service: Arc<S>);
190
191 /// Stops the network service.
192 async fn stop(&mut self);
193}
194
195/// Serialized block with extended information from the proposing authority.
196#[derive(Clone, PartialEq, Eq, Debug)]
197pub(crate) struct ExtendedSerializedBlock {
198 pub(crate) block: Bytes,
199 // Serialized BlockRefs that are excluded from the blocks ancestors.
200 pub(crate) excluded_ancestors: Vec<Vec<u8>>,
201}
202
203impl From<ExtendedBlock> for ExtendedSerializedBlock {
204 fn from(extended_block: ExtendedBlock) -> Self {
205 Self {
206 block: extended_block.block.serialized().clone(),
207 excluded_ancestors: extended_block
208 .excluded_ancestors
209 .iter()
210 .filter_map(|r| match bcs::to_bytes(r) {
211 Ok(serialized) => Some(serialized),
212 Err(e) => {
213 tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
214 None
215 }
216 })
217 .collect(),
218 }
219 }
220}