consensus_core/network/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplifying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{pin::Pin, sync::Arc, time::Duration};
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey};
24use consensus_types::block::{BlockRef, Round};
25use futures::Stream;
26use mysten_network::Multiaddr;
27
28use crate::{
29    block::{ExtendedBlock, VerifiedBlock},
30    commit::{CommitRange, TrustedCommit},
31    context::Context,
32    error::ConsensusResult,
33};
34
35/// Identifies an observer node by its network public key.
36#[allow(unused)]
37pub(crate) type NodeId = NetworkPublicKey;
38
39/// Identifies a peer in the network, which can be either a validator or an observer.
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub(crate) enum PeerId {
42    /// A validator node identified by its authority index.
43    Validator(AuthorityIndex),
44    /// An observer node identified by its network public key.
45    #[allow(dead_code)]
46    Observer(NodeId),
47}
48
49// Tonic generated RPC stubs.
50mod tonic_gen {
51    include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
52    include!(concat!(env!("OUT_DIR"), "/consensus.ObserverService.rs"));
53}
54
55mod clients;
56pub(crate) mod metrics;
57mod metrics_layer;
58#[cfg(all(test, not(msim)))]
59mod network_tests;
60#[cfg(not(msim))]
61pub(crate) mod observer;
62#[cfg(msim)]
63pub mod observer;
64#[cfg(test)]
65pub(crate) mod test_network;
66#[cfg(not(msim))]
67pub(crate) mod tonic_network;
68#[cfg(msim)]
69pub mod tonic_network;
70mod tonic_tls;
71
72/// A stream of serialized filtered blocks returned over the network.
73pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
74
75/// Validator network client for communicating with validator peers.
76///
77/// NOTE: the timeout parameters help saving resources at client and potentially server.
78/// But it is up to the server implementation if the timeout is honored.
79/// - To bound server resources, server should implement own timeout for incoming requests.
80#[async_trait]
81pub(crate) trait ValidatorNetworkClient: Send + Sync + Sized + 'static {
82    /// Subscribes to blocks from a peer after last_received round.
83    async fn subscribe_blocks(
84        &self,
85        peer: AuthorityIndex,
86        last_received: Round,
87        timeout: Duration,
88    ) -> ConsensusResult<BlockStream>;
89
90    // TODO: add a parameter for maximum total size of blocks returned.
91    /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
92    /// of the requested blocks according to the provided `highest_accepted_rounds`. The `highest_accepted_rounds`
93    /// length should be equal to the committee size. If `highest_accepted_rounds` is empty then it will
94    /// be simply ignored.
95    async fn fetch_blocks(
96        &self,
97        peer: AuthorityIndex,
98        block_refs: Vec<BlockRef>,
99        highest_accepted_rounds: Vec<Round>,
100        breadth_first: bool,
101        timeout: Duration,
102    ) -> ConsensusResult<Vec<Bytes>>;
103
104    /// Fetches serialized commits in the commit range from a peer.
105    /// Returns a tuple of both the serialized commits, and serialized blocks that contain
106    /// votes certifying the last commit.
107    async fn fetch_commits(
108        &self,
109        peer: AuthorityIndex,
110        commit_range: CommitRange,
111        timeout: Duration,
112    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
113
114    /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
115    /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
116    /// blocks per peer as its possible to have equivocations.
117    async fn fetch_latest_blocks(
118        &self,
119        peer: AuthorityIndex,
120        authorities: Vec<AuthorityIndex>,
121        timeout: Duration,
122    ) -> ConsensusResult<Vec<Bytes>>;
123
124    /// Gets the latest received & accepted rounds of all authorities from the peer.
125    async fn get_latest_rounds(
126        &self,
127        peer: AuthorityIndex,
128        timeout: Duration,
129    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
130
131    /// Sends a serialized SignedBlock to a peer.
132    #[cfg(test)]
133    async fn send_block(
134        &self,
135        peer: AuthorityIndex,
136        block: &VerifiedBlock,
137        timeout: Duration,
138    ) -> ConsensusResult<()>;
139}
140
141/// Validator network service for handling requests from validator peers.
142#[async_trait]
143pub(crate) trait ValidatorNetworkService: Send + Sync + 'static {
144    /// Handles the block sent from the peer via either unicast RPC or subscription stream.
145    /// Peer value can be trusted to be a valid authority index.
146    /// But serialized_block must be verified before its contents are trusted.
147    /// Excluded ancestors are also included as part of an effort to further propagate
148    /// blocks to peers despite the current exclusion.
149    async fn handle_send_block(
150        &self,
151        peer: AuthorityIndex,
152        block: ExtendedSerializedBlock,
153    ) -> ConsensusResult<()>;
154
155    /// Handles the subscription request from the peer.
156    /// A stream of newly proposed blocks is returned to the peer.
157    /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
158    /// occurs.
159    async fn handle_subscribe_blocks(
160        &self,
161        peer: AuthorityIndex,
162        last_received: Round,
163    ) -> ConsensusResult<BlockStream>;
164
165    /// Handles the request to fetch blocks by references from the peer.
166    async fn handle_fetch_blocks(
167        &self,
168        peer: AuthorityIndex,
169        block_refs: Vec<BlockRef>,
170        highest_accepted_rounds: Vec<Round>,
171        breadth_first: bool,
172    ) -> ConsensusResult<Vec<Bytes>>;
173
174    /// Handles the request to fetch commits by index range from the peer.
175    async fn handle_fetch_commits(
176        &self,
177        peer: AuthorityIndex,
178        commit_range: CommitRange,
179    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
180
181    /// Handles the request to fetch the latest block for the provided `authorities`.
182    async fn handle_fetch_latest_blocks(
183        &self,
184        peer: AuthorityIndex,
185        authorities: Vec<AuthorityIndex>,
186    ) -> ConsensusResult<Vec<Bytes>>;
187
188    /// Handles the request to get the latest received & accepted rounds of all authorities.
189    async fn handle_get_latest_rounds(
190        &self,
191        peer: AuthorityIndex,
192    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
193}
194
195/// A stream item for observer block streaming that includes both the block and highest commit index.
196#[allow(dead_code)]
197pub(crate) struct ObserverBlockStreamItem {
198    pub(crate) block: Bytes,
199    pub(crate) highest_commit_index: u64,
200}
201
202/// Observer block stream type.
203#[allow(dead_code)]
204pub(crate) type ObserverBlockStream = Pin<Box<dyn Stream<Item = ObserverBlockStreamItem> + Send>>;
205
206/// Observer block request stream type for bidirectional streaming.
207#[allow(dead_code)]
208pub(crate) type BlockRequestStream =
209    Pin<Box<dyn Stream<Item = crate::network::observer::BlockStreamRequest> + Send>>;
210
211/// Observer network service for handling requests from observer nodes.
212/// Unlike ValidatorNetworkService which uses AuthorityIndex, this uses NodeId (NetworkPublicKey)
213/// to identify peers since observers are not part of the committee.
214#[async_trait]
215#[allow(dead_code)]
216pub(crate) trait ObserverNetworkService: Send + Sync + 'static {
217    /// Handles the bidirectional block streaming request from an observer peer.
218    async fn handle_stream_blocks(
219        &self,
220        peer: NodeId,
221        request_stream: BlockRequestStream,
222    ) -> ConsensusResult<ObserverBlockStream>;
223
224    /// Handles the request to fetch blocks by references from an observer peer.
225    #[allow(unused)]
226    async fn handle_fetch_blocks(
227        &self,
228        peer: NodeId,
229        block_refs: Vec<BlockRef>,
230    ) -> ConsensusResult<Vec<Bytes>>;
231
232    /// Handles the request to fetch commits by index range from an observer peer.
233    #[allow(unused)]
234    /// Returns serialized commits and certifier blocks.
235    async fn handle_fetch_commits(
236        &self,
237        peer: NodeId,
238        commit_range: CommitRange,
239    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
240}
241
242/// Observer network client for communicating with validators' observer ports or other observers.
243/// Unlike ValidatorNetworkClient which uses AuthorityIndex, this uses PeerId to identify peers
244/// since the observer server can serve both validators and observer nodes.
245#[async_trait]
246#[allow(dead_code)]
247pub(crate) trait ObserverNetworkClient: Send + Sync + Sized + 'static {
248    /// Initiates bidirectional block streaming with a peer.
249    async fn stream_blocks(
250        &self,
251        peer: PeerId,
252        request_stream: BlockRequestStream,
253        timeout: Duration,
254    ) -> ConsensusResult<ObserverBlockStream>;
255
256    /// Fetches serialized blocks by references from a peer.
257    async fn fetch_blocks(
258        &self,
259        peer: PeerId,
260        block_refs: Vec<BlockRef>,
261        timeout: Duration,
262    ) -> ConsensusResult<Vec<Bytes>>;
263
264    /// Fetches serialized commits in the commit range from a peer.
265    /// Returns a tuple of both the serialized commits, and serialized blocks that contain
266    /// votes certifying the last commit.
267    async fn fetch_commits(
268        &self,
269        peer: PeerId,
270        commit_range: CommitRange,
271        timeout: Duration,
272    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
273}
274
275/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
276/// Dropping `NetworkManager` will shutdown the network service.
277pub(crate) trait NetworkManager: Send + Sync {
278    type ValidatorClient: ValidatorNetworkClient;
279    type ObserverClient: ObserverNetworkClient;
280
281    /// Creates a new network manager.
282    fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
283
284    /// Returns the validator network client.
285    fn validator_client(&self) -> Arc<Self::ValidatorClient>;
286
287    /// Returns the observer network client.
288    #[allow(dead_code)]
289    fn observer_client(&self) -> Arc<Self::ObserverClient>;
290
291    /// Starts the validator network server with the provided service.
292    async fn start_validator_server<V>(&mut self, service: Arc<V>)
293    where
294        V: ValidatorNetworkService;
295
296    /// Starts the observer network server with the provided service.
297    async fn start_observer_server<O>(&mut self, service: Arc<O>)
298    where
299        O: ObserverNetworkService;
300
301    /// Stops the network service.
302    async fn stop(&mut self);
303
304    /// Updates the network address for a peer identified by their authority index.
305    /// If address is None, the override is cleared and the committee address will be used.
306    fn update_peer_address(&self, peer: AuthorityIndex, address: Option<Multiaddr>);
307}
308
309// Re-export the concrete client implementations.
310pub(crate) use clients::{CommitSyncerClient, SynchronizerClient};
311
312/// Serialized block with extended information from the proposing authority.
313#[derive(Clone, PartialEq, Eq, Debug)]
314pub(crate) struct ExtendedSerializedBlock {
315    pub(crate) block: Bytes,
316    // Serialized BlockRefs that are excluded from the blocks ancestors.
317    pub(crate) excluded_ancestors: Vec<Vec<u8>>,
318}
319
320impl From<ExtendedBlock> for ExtendedSerializedBlock {
321    fn from(extended_block: ExtendedBlock) -> Self {
322        Self {
323            block: extended_block.block.serialized().clone(),
324            excluded_ancestors: extended_block
325                .excluded_ancestors
326                .iter()
327                .filter_map(|r| match bcs::to_bytes(r) {
328                    Ok(serialized) => Some(serialized),
329                    Err(e) => {
330                        tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
331                        None
332                    }
333                })
334                .collect(),
335        }
336    }
337}