consensus_core/network/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplifying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{pin::Pin, sync::Arc, time::Duration};
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey};
24use consensus_types::block::{BlockRef, Round};
25use futures::Stream;
26use mysten_network::Multiaddr;
27
28use crate::{
29 block::{ExtendedBlock, VerifiedBlock},
30 commit::{CommitRange, TrustedCommit},
31 context::Context,
32 error::ConsensusResult,
33};
34
35/// Identifies an observer node by its network public key.
36#[allow(unused)]
37pub(crate) type NodeId = NetworkPublicKey;
38
39/// Identifies a peer in the network, which can be either a validator or an observer.
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub(crate) enum PeerId {
42 /// A validator node identified by its authority index.
43 Validator(AuthorityIndex),
44 /// An observer node identified by its network public key.
45 #[allow(dead_code)]
46 Observer(NodeId),
47}
48
49// Tonic generated RPC stubs.
50mod tonic_gen {
51 include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
52 include!(concat!(env!("OUT_DIR"), "/consensus.ObserverService.rs"));
53}
54
55mod clients;
56pub(crate) mod metrics;
57mod metrics_layer;
58#[cfg(all(test, not(msim)))]
59mod network_tests;
60#[cfg(not(msim))]
61pub(crate) mod observer;
62#[cfg(msim)]
63pub mod observer;
64#[cfg(test)]
65pub(crate) mod test_network;
66#[cfg(not(msim))]
67pub(crate) mod tonic_network;
68#[cfg(msim)]
69pub mod tonic_network;
70mod tonic_tls;
71
72/// A stream of serialized filtered blocks returned over the network.
73pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
74
75/// Validator network client for communicating with validator peers.
76///
77/// NOTE: the timeout parameters help saving resources at client and potentially server.
78/// But it is up to the server implementation if the timeout is honored.
79/// - To bound server resources, server should implement own timeout for incoming requests.
80#[async_trait]
81pub(crate) trait ValidatorNetworkClient: Send + Sync + Sized + 'static {
82 /// Subscribes to blocks from a peer after last_received round.
83 async fn subscribe_blocks(
84 &self,
85 peer: AuthorityIndex,
86 last_received: Round,
87 timeout: Duration,
88 ) -> ConsensusResult<BlockStream>;
89
90 // TODO: add a parameter for maximum total size of blocks returned.
91 /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
92 /// of the requested blocks according to the provided `highest_accepted_rounds`. The `highest_accepted_rounds`
93 /// length should be equal to the committee size. If `highest_accepted_rounds` is empty then it will
94 /// be simply ignored.
95 async fn fetch_blocks(
96 &self,
97 peer: AuthorityIndex,
98 block_refs: Vec<BlockRef>,
99 highest_accepted_rounds: Vec<Round>,
100 breadth_first: bool,
101 timeout: Duration,
102 ) -> ConsensusResult<Vec<Bytes>>;
103
104 /// Fetches serialized commits in the commit range from a peer.
105 /// Returns a tuple of both the serialized commits, and serialized blocks that contain
106 /// votes certifying the last commit.
107 async fn fetch_commits(
108 &self,
109 peer: AuthorityIndex,
110 commit_range: CommitRange,
111 timeout: Duration,
112 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
113
114 /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
115 /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
116 /// blocks per peer as its possible to have equivocations.
117 async fn fetch_latest_blocks(
118 &self,
119 peer: AuthorityIndex,
120 authorities: Vec<AuthorityIndex>,
121 timeout: Duration,
122 ) -> ConsensusResult<Vec<Bytes>>;
123
124 /// Gets the latest received & accepted rounds of all authorities from the peer.
125 async fn get_latest_rounds(
126 &self,
127 peer: AuthorityIndex,
128 timeout: Duration,
129 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
130
131 /// Sends a serialized SignedBlock to a peer.
132 #[cfg(test)]
133 async fn send_block(
134 &self,
135 peer: AuthorityIndex,
136 block: &VerifiedBlock,
137 timeout: Duration,
138 ) -> ConsensusResult<()>;
139}
140
141/// Validator network service for handling requests from validator peers.
142#[async_trait]
143pub(crate) trait ValidatorNetworkService: Send + Sync + 'static {
144 /// Handles the block sent from the peer via either unicast RPC or subscription stream.
145 /// Peer value can be trusted to be a valid authority index.
146 /// But serialized_block must be verified before its contents are trusted.
147 /// Excluded ancestors are also included as part of an effort to further propagate
148 /// blocks to peers despite the current exclusion.
149 async fn handle_send_block(
150 &self,
151 peer: AuthorityIndex,
152 block: ExtendedSerializedBlock,
153 ) -> ConsensusResult<()>;
154
155 /// Handles the subscription request from the peer.
156 /// A stream of newly proposed blocks is returned to the peer.
157 /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
158 /// occurs.
159 async fn handle_subscribe_blocks(
160 &self,
161 peer: AuthorityIndex,
162 last_received: Round,
163 ) -> ConsensusResult<BlockStream>;
164
165 /// Handles the request to fetch blocks by references from the peer.
166 async fn handle_fetch_blocks(
167 &self,
168 peer: AuthorityIndex,
169 block_refs: Vec<BlockRef>,
170 highest_accepted_rounds: Vec<Round>,
171 breadth_first: bool,
172 ) -> ConsensusResult<Vec<Bytes>>;
173
174 /// Handles the request to fetch commits by index range from the peer.
175 async fn handle_fetch_commits(
176 &self,
177 peer: AuthorityIndex,
178 commit_range: CommitRange,
179 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
180
181 /// Handles the request to fetch the latest block for the provided `authorities`.
182 async fn handle_fetch_latest_blocks(
183 &self,
184 peer: AuthorityIndex,
185 authorities: Vec<AuthorityIndex>,
186 ) -> ConsensusResult<Vec<Bytes>>;
187
188 /// Handles the request to get the latest received & accepted rounds of all authorities.
189 async fn handle_get_latest_rounds(
190 &self,
191 peer: AuthorityIndex,
192 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
193}
194
195/// A stream item for observer block streaming that includes both the block and highest commit index.
196#[allow(dead_code)]
197pub(crate) struct ObserverBlockStreamItem {
198 pub(crate) block: Bytes,
199 pub(crate) highest_commit_index: u64,
200}
201
202/// Observer block stream type.
203#[allow(dead_code)]
204pub(crate) type ObserverBlockStream = Pin<Box<dyn Stream<Item = ObserverBlockStreamItem> + Send>>;
205
206/// Observer block request stream type for bidirectional streaming.
207#[allow(dead_code)]
208pub(crate) type BlockRequestStream =
209 Pin<Box<dyn Stream<Item = crate::network::observer::BlockStreamRequest> + Send>>;
210
211/// Observer network service for handling requests from observer nodes.
212/// Unlike ValidatorNetworkService which uses AuthorityIndex, this uses NodeId (NetworkPublicKey)
213/// to identify peers since observers are not part of the committee.
214#[async_trait]
215#[allow(dead_code)]
216pub(crate) trait ObserverNetworkService: Send + Sync + 'static {
217 /// Handles the bidirectional block streaming request from an observer peer.
218 async fn handle_stream_blocks(
219 &self,
220 peer: NodeId,
221 request_stream: BlockRequestStream,
222 ) -> ConsensusResult<ObserverBlockStream>;
223
224 /// Handles the request to fetch blocks by references from an observer peer.
225 #[allow(unused)]
226 async fn handle_fetch_blocks(
227 &self,
228 peer: NodeId,
229 block_refs: Vec<BlockRef>,
230 ) -> ConsensusResult<Vec<Bytes>>;
231
232 /// Handles the request to fetch commits by index range from an observer peer.
233 #[allow(unused)]
234 /// Returns serialized commits and certifier blocks.
235 async fn handle_fetch_commits(
236 &self,
237 peer: NodeId,
238 commit_range: CommitRange,
239 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
240}
241
242/// Observer network client for communicating with validators' observer ports or other observers.
243/// Unlike ValidatorNetworkClient which uses AuthorityIndex, this uses PeerId to identify peers
244/// since the observer server can serve both validators and observer nodes.
245#[async_trait]
246#[allow(dead_code)]
247pub(crate) trait ObserverNetworkClient: Send + Sync + Sized + 'static {
248 /// Initiates bidirectional block streaming with a peer.
249 async fn stream_blocks(
250 &self,
251 peer: PeerId,
252 request_stream: BlockRequestStream,
253 timeout: Duration,
254 ) -> ConsensusResult<ObserverBlockStream>;
255
256 /// Fetches serialized blocks by references from a peer.
257 async fn fetch_blocks(
258 &self,
259 peer: PeerId,
260 block_refs: Vec<BlockRef>,
261 timeout: Duration,
262 ) -> ConsensusResult<Vec<Bytes>>;
263
264 /// Fetches serialized commits in the commit range from a peer.
265 /// Returns a tuple of both the serialized commits, and serialized blocks that contain
266 /// votes certifying the last commit.
267 async fn fetch_commits(
268 &self,
269 peer: PeerId,
270 commit_range: CommitRange,
271 timeout: Duration,
272 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
273}
274
275/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
276/// Dropping `NetworkManager` will shutdown the network service.
277pub(crate) trait NetworkManager: Send + Sync {
278 type ValidatorClient: ValidatorNetworkClient;
279 type ObserverClient: ObserverNetworkClient;
280
281 /// Creates a new network manager.
282 fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
283
284 /// Returns the validator network client.
285 fn validator_client(&self) -> Arc<Self::ValidatorClient>;
286
287 /// Returns the observer network client.
288 #[allow(dead_code)]
289 fn observer_client(&self) -> Arc<Self::ObserverClient>;
290
291 /// Starts the validator network server with the provided service.
292 async fn start_validator_server<V>(&mut self, service: Arc<V>)
293 where
294 V: ValidatorNetworkService;
295
296 /// Starts the observer network server with the provided service.
297 async fn start_observer_server<O>(&mut self, service: Arc<O>)
298 where
299 O: ObserverNetworkService;
300
301 /// Stops the network service.
302 async fn stop(&mut self);
303
304 /// Updates the network address for a peer identified by their authority index.
305 /// If address is None, the override is cleared and the committee address will be used.
306 fn update_peer_address(&self, peer: AuthorityIndex, address: Option<Multiaddr>);
307}
308
309// Re-export the concrete client implementations.
310pub(crate) use clients::{CommitSyncerClient, SynchronizerClient};
311
312/// Serialized block with extended information from the proposing authority.
313#[derive(Clone, PartialEq, Eq, Debug)]
314pub(crate) struct ExtendedSerializedBlock {
315 pub(crate) block: Bytes,
316 // Serialized BlockRefs that are excluded from the blocks ancestors.
317 pub(crate) excluded_ancestors: Vec<Vec<u8>>,
318}
319
320impl From<ExtendedBlock> for ExtendedSerializedBlock {
321 fn from(extended_block: ExtendedBlock) -> Self {
322 Self {
323 block: extended_block.block.serialized().clone(),
324 excluded_ancestors: extended_block
325 .excluded_ancestors
326 .iter()
327 .filter_map(|r| match bcs::to_bytes(r) {
328 Ok(serialized) => Some(serialized),
329 Err(e) => {
330 tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
331 None
332 }
333 })
334 .collect(),
335 }
336 }
337}