consensus_core/network/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplifying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{
20 fmt::{Display, Formatter},
21 net::SocketAddrV6,
22 pin::Pin,
23 sync::Arc,
24 time::Duration,
25};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey};
30use consensus_types::block::{BlockRef, Round};
31use futures::Stream;
32use mysten_network::{Multiaddr, multiaddr::Protocol};
33
34use crate::{
35 block::{ExtendedBlock, VerifiedBlock},
36 commit::{CommitRange, TrustedCommit},
37 context::Context,
38 error::ConsensusResult,
39};
40
41/// Identifies an observer node by its network public key.
42#[allow(unused)]
43pub(crate) type NodeId = NetworkPublicKey;
44
45/// Identifies a peer in the network, which can be either a validator or an observer.
46#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
47pub(crate) enum PeerId {
48 /// A validator node identified by its authority index.
49 Validator(AuthorityIndex),
50 /// An observer node identified by its network public key.
51 #[allow(dead_code)]
52 Observer(NodeId),
53}
54
55impl Display for PeerId {
56 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57 match self {
58 PeerId::Validator(authority) => write!(f, "[{}]", authority),
59 PeerId::Observer(node_id) => write!(f, "[{:?}]", node_id),
60 }
61 }
62}
63
64// Tonic generated RPC stubs.
65mod tonic_gen {
66 include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
67 include!(concat!(env!("OUT_DIR"), "/consensus.ObserverService.rs"));
68}
69
70mod clients;
71pub(crate) mod metrics;
72mod metrics_layer;
73#[cfg(all(test, not(msim)))]
74mod network_tests;
75#[cfg(not(msim))]
76pub(crate) mod observer;
77#[cfg(msim)]
78pub mod observer;
79#[cfg(test)]
80pub(crate) mod test_network;
81#[cfg(not(msim))]
82pub(crate) mod tonic_network;
83#[cfg(msim)]
84pub mod tonic_network;
85mod tonic_tls;
86
87/// A stream of serialized filtered blocks returned over the network.
88pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
89
90/// Validator network client for communicating with validator peers.
91///
92/// NOTE: the timeout parameters help saving resources at client and potentially server.
93/// But it is up to the server implementation if the timeout is honored.
94/// - To bound server resources, server should implement own timeout for incoming requests.
95#[async_trait]
96pub(crate) trait ValidatorNetworkClient: Send + Sync + Sized + 'static {
97 /// Subscribes to blocks from a peer after last_received round.
98 async fn subscribe_blocks(
99 &self,
100 peer: AuthorityIndex,
101 last_received: Round,
102 timeout: Duration,
103 ) -> ConsensusResult<BlockStream>;
104
105 // TODO: add a parameter for maximum total size of blocks returned.
106 /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
107 /// of the requested blocks according to the provided `fetch_after_rounds`. The `fetch_after_rounds`
108 /// length should be equal to the committee size. If `fetch_after_rounds` is empty then it will
109 /// be simply ignored.
110 async fn fetch_blocks(
111 &self,
112 peer: AuthorityIndex,
113 block_refs: Vec<BlockRef>,
114 fetch_after_rounds: Vec<Round>,
115 fetch_missing_ancestors: bool,
116 timeout: Duration,
117 ) -> ConsensusResult<Vec<Bytes>>;
118
119 /// Fetches serialized commits in the commit range from a peer.
120 /// Returns a tuple of both the serialized commits, and serialized blocks that contain
121 /// votes certifying the last commit.
122 async fn fetch_commits(
123 &self,
124 peer: AuthorityIndex,
125 commit_range: CommitRange,
126 timeout: Duration,
127 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
128
129 /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
130 /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
131 /// blocks per peer as its possible to have equivocations.
132 async fn fetch_latest_blocks(
133 &self,
134 peer: AuthorityIndex,
135 authorities: Vec<AuthorityIndex>,
136 timeout: Duration,
137 ) -> ConsensusResult<Vec<Bytes>>;
138
139 /// Gets the latest received & accepted rounds of all authorities from the peer.
140 async fn get_latest_rounds(
141 &self,
142 peer: AuthorityIndex,
143 timeout: Duration,
144 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
145
146 /// Sends a serialized SignedBlock to a peer.
147 #[cfg(test)]
148 async fn send_block(
149 &self,
150 peer: AuthorityIndex,
151 block: &VerifiedBlock,
152 timeout: Duration,
153 ) -> ConsensusResult<()>;
154}
155
156/// Validator network service for handling requests from validator peers.
157#[async_trait]
158pub(crate) trait ValidatorNetworkService: Send + Sync + 'static {
159 /// Handles the block sent from the peer via either unicast RPC or subscription stream.
160 /// Peer value can be trusted to be a valid authority index.
161 /// But serialized_block must be verified before its contents are trusted.
162 /// Excluded ancestors are also included as part of an effort to further propagate
163 /// blocks to peers despite the current exclusion.
164 async fn handle_send_block(
165 &self,
166 peer: AuthorityIndex,
167 block: ExtendedSerializedBlock,
168 ) -> ConsensusResult<()>;
169
170 /// Handles the subscription request from the peer.
171 /// A stream of newly proposed blocks is returned to the peer.
172 /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
173 /// occurs.
174 async fn handle_subscribe_blocks(
175 &self,
176 peer: AuthorityIndex,
177 last_received: Round,
178 ) -> ConsensusResult<BlockStream>;
179
180 /// Handles the request to fetch blocks by references from the peer.
181 async fn handle_fetch_blocks(
182 &self,
183 peer: AuthorityIndex,
184 block_refs: Vec<BlockRef>,
185 fetch_after_rounds: Vec<Round>,
186 fetch_missing_ancestors: bool,
187 ) -> ConsensusResult<Vec<Bytes>>;
188
189 /// Handles the request to fetch commits by index range from the peer.
190 async fn handle_fetch_commits(
191 &self,
192 peer: AuthorityIndex,
193 commit_range: CommitRange,
194 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
195
196 /// Handles the request to fetch the latest block for the provided `authorities`.
197 async fn handle_fetch_latest_blocks(
198 &self,
199 peer: AuthorityIndex,
200 authorities: Vec<AuthorityIndex>,
201 ) -> ConsensusResult<Vec<Bytes>>;
202
203 /// Handles the request to get the latest received & accepted rounds of all authorities.
204 async fn handle_get_latest_rounds(
205 &self,
206 peer: AuthorityIndex,
207 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
208}
209
210/// A stream item for observer block streaming that includes both the block and highest commit index.
211pub(crate) struct ObserverBlockStreamItem {
212 pub(crate) block: Bytes,
213 pub(crate) highest_commit_index: u64,
214}
215
216/// Observer block stream type.
217pub(crate) type ObserverBlockStream =
218 Pin<Box<dyn Stream<Item = ObserverBlockStreamItem> + Send + 'static>>;
219
220/// Observer block request stream type for bidirectional streaming.
221#[allow(dead_code)]
222pub(crate) type BlockRequestStream =
223 Pin<Box<dyn Stream<Item = crate::network::observer::BlockStreamRequest> + Send + 'static>>;
224
225/// Observer network service for handling requests from observer nodes.
226/// Unlike ValidatorNetworkService which uses AuthorityIndex, this uses NodeId (NetworkPublicKey)
227/// to identify peers since observers are not part of the committee.
228#[async_trait]
229#[allow(dead_code)]
230pub(crate) trait ObserverNetworkService: Send + Sync + 'static {
231 /// Handles the block streaming request from an observer peer.
232 /// Returns a stream of blocks with the highest commit index for each block.
233 /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
234 async fn handle_stream_blocks(
235 &self,
236 peer: NodeId,
237 highest_round_per_authority: Vec<u64>,
238 ) -> ConsensusResult<ObserverBlockStream>;
239
240 /// Handles the request to fetch blocks by references from an observer peer.
241 #[allow(unused)]
242 async fn handle_fetch_blocks(
243 &self,
244 peer: NodeId,
245 block_refs: Vec<BlockRef>,
246 ) -> ConsensusResult<Vec<Bytes>>;
247
248 /// Handles the request to fetch commits by index range from an observer peer.
249 #[allow(unused)]
250 /// Returns serialized commits and certifier blocks.
251 async fn handle_fetch_commits(
252 &self,
253 peer: NodeId,
254 commit_range: CommitRange,
255 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
256}
257
258/// Observer network client for communicating with validators' observer ports or other observers.
259/// Unlike ValidatorNetworkClient which uses AuthorityIndex, this uses PeerId to identify peers
260/// since the observer server can serve both validators and observer nodes.
261#[async_trait]
262#[allow(dead_code)]
263pub(crate) trait ObserverNetworkClient: Send + Sync + Sized + 'static {
264 /// Initiates block streaming with a peer (validator or observer).
265 /// Returns a stream of blocks with the highest commit index.
266 /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
267 async fn stream_blocks(
268 &self,
269 peer: PeerId,
270 highest_round_per_authority: Vec<u64>,
271 timeout: Duration,
272 ) -> ConsensusResult<ObserverBlockStream>;
273
274 /// Fetches serialized blocks by references from a peer.
275 async fn fetch_blocks(
276 &self,
277 peer: PeerId,
278 block_refs: Vec<BlockRef>,
279 timeout: Duration,
280 ) -> ConsensusResult<Vec<Bytes>>;
281
282 /// Fetches serialized commits in the commit range from a peer.
283 /// Returns a tuple of both the serialized commits, and serialized blocks that contain
284 /// votes certifying the last commit.
285 async fn fetch_commits(
286 &self,
287 peer: PeerId,
288 commit_range: CommitRange,
289 timeout: Duration,
290 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
291}
292
293/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
294/// Dropping `NetworkManager` will shutdown the network service.
295pub(crate) trait NetworkManager: Send + Sync {
296 type ValidatorClient: ValidatorNetworkClient;
297 type ObserverClient: ObserverNetworkClient;
298
299 /// Creates a new network manager.
300 fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
301
302 /// Returns the validator network client.
303 fn validator_client(&self) -> Arc<Self::ValidatorClient>;
304
305 /// Returns the observer network client.
306 #[allow(dead_code)]
307 fn observer_client(&self) -> Arc<Self::ObserverClient>;
308
309 /// Starts the validator network server with the provided service.
310 async fn start_validator_server<V>(&mut self, service: Arc<V>)
311 where
312 V: ValidatorNetworkService;
313
314 /// Starts the observer network server with the provided service.
315 async fn start_observer_server<O>(&mut self, service: Arc<O>)
316 where
317 O: ObserverNetworkService;
318
319 /// Stops the network service.
320 async fn stop(&mut self);
321
322 /// Updates the network address for a peer identified by their authority index.
323 /// If address is None, the override is cleared and the committee address will be used.
324 fn update_peer_address(&self, peer: AuthorityIndex, address: Option<Multiaddr>);
325}
326
327// Re-export the concrete client implementations.
328pub(crate) use clients::{CommitSyncerClient, SynchronizerClient};
329
330/// Serialized block with extended information from the proposing authority.
331#[derive(Clone, PartialEq, Eq, Debug)]
332pub(crate) struct ExtendedSerializedBlock {
333 pub(crate) block: Bytes,
334 // Serialized BlockRefs that are excluded from the blocks ancestors.
335 pub(crate) excluded_ancestors: Vec<Vec<u8>>,
336}
337
338impl From<ExtendedBlock> for ExtendedSerializedBlock {
339 fn from(extended_block: ExtendedBlock) -> Self {
340 Self {
341 block: extended_block.block.serialized().clone(),
342 excluded_ancestors: extended_block
343 .excluded_ancestors
344 .iter()
345 .filter_map(|r| match bcs::to_bytes(r) {
346 Ok(serialized) => Some(serialized),
347 Err(e) => {
348 tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
349 None
350 }
351 })
352 .collect(),
353 }
354 }
355}
356
357/// Attempts to convert a multiaddr of the form `/[ip4,ip6,dns]/{}/udp/{port}` into
358/// a host:port string.
359pub(crate) fn to_host_port_str(addr: &Multiaddr) -> Result<String, String> {
360 let mut iter = addr.iter();
361
362 match (iter.next(), iter.next()) {
363 (Some(Protocol::Ip4(ipaddr)), Some(Protocol::Udp(port))) => {
364 Ok(format!("{}:{}", ipaddr, port))
365 }
366 (Some(Protocol::Ip6(ipaddr)), Some(Protocol::Udp(port))) => {
367 Ok(format!("{}", SocketAddrV6::new(ipaddr, port, 0, 0)))
368 }
369 (Some(Protocol::Dns(hostname)), Some(Protocol::Udp(port))) => {
370 Ok(format!("{}:{}", hostname, port))
371 }
372
373 _ => Err(format!("unsupported multiaddr: {addr}")),
374 }
375}