consensus_core/network/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplifying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{pin::Pin, sync::Arc, time::Duration};
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use consensus_config::{AuthorityIndex, NetworkKeyPair};
24use consensus_types::block::{BlockRef, Round};
25use futures::Stream;
26use mysten_network::Multiaddr;
27
28use crate::{
29 block::{ExtendedBlock, VerifiedBlock},
30 commit::{CommitRange, TrustedCommit},
31 context::Context,
32 error::ConsensusResult,
33};
34
35// Tonic generated RPC stubs.
36mod tonic_gen {
37 include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
38}
39
40pub(crate) mod metrics;
41mod metrics_layer;
42#[cfg(all(test, not(msim)))]
43mod network_tests;
44#[cfg(test)]
45pub(crate) mod test_network;
46#[cfg(not(msim))]
47pub(crate) mod tonic_network;
48#[cfg(msim)]
49pub mod tonic_network;
50mod tonic_tls;
51
52/// A stream of serialized filtered blocks returned over the network.
53pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
54
55/// Network client for communicating with peers.
56///
57/// NOTE: the timeout parameters help saving resources at client and potentially server.
58/// But it is up to the server implementation if the timeout is honored.
59/// - To bound server resources, server should implement own timeout for incoming requests.
60#[async_trait]
61pub(crate) trait NetworkClient: Send + Sync + Sized + 'static {
62 /// Subscribes to blocks from a peer after last_received round.
63 async fn subscribe_blocks(
64 &self,
65 peer: AuthorityIndex,
66 last_received: Round,
67 timeout: Duration,
68 ) -> ConsensusResult<BlockStream>;
69
70 // TODO: add a parameter for maximum total size of blocks returned.
71 /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
72 /// of the requested blocks according to the provided `highest_accepted_rounds`. The `highest_accepted_rounds`
73 /// length should be equal to the committee size. If `highest_accepted_rounds` is empty then it will
74 /// be simply ignored.
75 async fn fetch_blocks(
76 &self,
77 peer: AuthorityIndex,
78 block_refs: Vec<BlockRef>,
79 highest_accepted_rounds: Vec<Round>,
80 breadth_first: bool,
81 timeout: Duration,
82 ) -> ConsensusResult<Vec<Bytes>>;
83
84 /// Fetches serialized commits in the commit range from a peer.
85 /// Returns a tuple of both the serialized commits, and serialized blocks that contain
86 /// votes certifying the last commit.
87 async fn fetch_commits(
88 &self,
89 peer: AuthorityIndex,
90 commit_range: CommitRange,
91 timeout: Duration,
92 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
93
94 /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
95 /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
96 /// blocks per peer as its possible to have equivocations.
97 async fn fetch_latest_blocks(
98 &self,
99 peer: AuthorityIndex,
100 authorities: Vec<AuthorityIndex>,
101 timeout: Duration,
102 ) -> ConsensusResult<Vec<Bytes>>;
103
104 /// Gets the latest received & accepted rounds of all authorities from the peer.
105 async fn get_latest_rounds(
106 &self,
107 peer: AuthorityIndex,
108 timeout: Duration,
109 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
110
111 /// Sends a serialized SignedBlock to a peer.
112 #[cfg(test)]
113 async fn send_block(
114 &self,
115 peer: AuthorityIndex,
116 block: &VerifiedBlock,
117 timeout: Duration,
118 ) -> ConsensusResult<()>;
119}
120
121/// Network service for handling requests from peers.
122#[async_trait]
123pub(crate) trait NetworkService: Send + Sync + 'static {
124 /// Handles the block sent from the peer via either unicast RPC or subscription stream.
125 /// Peer value can be trusted to be a valid authority index.
126 /// But serialized_block must be verified before its contents are trusted.
127 /// Excluded ancestors are also included as part of an effort to further propagate
128 /// blocks to peers despite the current exclusion.
129 async fn handle_send_block(
130 &self,
131 peer: AuthorityIndex,
132 block: ExtendedSerializedBlock,
133 ) -> ConsensusResult<()>;
134
135 /// Handles the subscription request from the peer.
136 /// A stream of newly proposed blocks is returned to the peer.
137 /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
138 /// occurs.
139 async fn handle_subscribe_blocks(
140 &self,
141 peer: AuthorityIndex,
142 last_received: Round,
143 ) -> ConsensusResult<BlockStream>;
144
145 /// Handles the request to fetch blocks by references from the peer.
146 async fn handle_fetch_blocks(
147 &self,
148 peer: AuthorityIndex,
149 block_refs: Vec<BlockRef>,
150 highest_accepted_rounds: Vec<Round>,
151 breadth_first: bool,
152 ) -> ConsensusResult<Vec<Bytes>>;
153
154 /// Handles the request to fetch commits by index range from the peer.
155 async fn handle_fetch_commits(
156 &self,
157 peer: AuthorityIndex,
158 commit_range: CommitRange,
159 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
160
161 /// Handles the request to fetch the latest block for the provided `authorities`.
162 async fn handle_fetch_latest_blocks(
163 &self,
164 peer: AuthorityIndex,
165 authorities: Vec<AuthorityIndex>,
166 ) -> ConsensusResult<Vec<Bytes>>;
167
168 /// Handles the request to get the latest received & accepted rounds of all authorities.
169 async fn handle_get_latest_rounds(
170 &self,
171 peer: AuthorityIndex,
172 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
173}
174
175/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
176/// Dropping `NetworkManager` will shutdown the network service.
177pub(crate) trait NetworkManager<S>: Send + Sync
178where
179 S: NetworkService,
180{
181 type Client: NetworkClient;
182
183 /// Creates a new network manager.
184 fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
185
186 /// Returns the network client.
187 fn client(&self) -> Arc<Self::Client>;
188
189 /// Installs network service.
190 async fn install_service(&mut self, service: Arc<S>);
191
192 /// Stops the network service.
193 async fn stop(&mut self);
194
195 /// Updates the network address for a peer identified by their authority index.
196 /// If address is None, the override is cleared and the committee address will be used.
197 fn update_peer_address(&self, peer: AuthorityIndex, address: Option<Multiaddr>);
198}
199
200/// Serialized block with extended information from the proposing authority.
201#[derive(Clone, PartialEq, Eq, Debug)]
202pub(crate) struct ExtendedSerializedBlock {
203 pub(crate) block: Bytes,
204 // Serialized BlockRefs that are excluded from the blocks ancestors.
205 pub(crate) excluded_ancestors: Vec<Vec<u8>>,
206}
207
208impl From<ExtendedBlock> for ExtendedSerializedBlock {
209 fn from(extended_block: ExtendedBlock) -> Self {
210 Self {
211 block: extended_block.block.serialized().clone(),
212 excluded_ancestors: extended_block
213 .excluded_ancestors
214 .iter()
215 .filter_map(|r| match bcs::to_bytes(r) {
216 Ok(serialized) => Some(serialized),
217 Err(e) => {
218 tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
219 None
220 }
221 })
222 .collect(),
223 }
224 }
225}