consensus_core/network/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplifying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{
20 fmt::{Display, Formatter},
21 net::SocketAddrV6,
22 pin::Pin,
23 sync::Arc,
24 time::Duration,
25};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey};
30use consensus_types::block::{BlockRef, Round};
31use fastcrypto::encoding::{Encoding, Hex};
32use futures::Stream;
33use mysten_network::{Multiaddr, multiaddr::Protocol};
34
35use crate::{
36 block::{ExtendedBlock, VerifiedBlock},
37 commit::{CommitRange, TrustedCommit},
38 context::Context,
39 error::ConsensusResult,
40};
41
42/// Identifies an observer node by its network public key.
43pub type NodeId = NetworkPublicKey;
44
45/// Identifies a peer in the network, which can be either a validator or an observer.
46/// The Observer variant is boxed to keep the enum small, since `NodeId` (32 bytes) is
47/// much larger than `AuthorityIndex` (4 bytes).
48#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
49pub enum PeerId {
50 /// A validator node identified by its authority index.
51 Validator(AuthorityIndex),
52 /// An observer node identified by its network public key.
53 #[allow(dead_code)]
54 Observer(Box<NodeId>),
55}
56
57impl Display for PeerId {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 match self {
60 PeerId::Validator(authority) => write!(f, "[{}]", authority),
61 PeerId::Observer(node_id) => {
62 let bytes = node_id.to_bytes();
63 let s = Hex::encode(bytes.get(0..4).ok_or(std::fmt::Error)?);
64 write!(f, "o#{}..", s)
65 }
66 }
67 }
68}
69
70impl PeerId {
71 /// Returns a human-readable name suitable for logging. For observers, prints
72 /// the first 8 hex digits of the public key.
73 pub(crate) fn hostname(&self, context: &Context) -> String {
74 match self {
75 PeerId::Validator(index) => context.committee.authority(*index).hostname.to_string(),
76 PeerId::Observer(node_id) => {
77 let bytes = node_id.to_bytes();
78 format!(
79 "[Observer]{:02x}{:02x}{:02x}{:02x}",
80 bytes[0], bytes[1], bytes[2], bytes[3]
81 )
82 }
83 }
84 }
85
86 /// Returns a short label suitable for use in metrics. Does not include
87 /// full public keys to avoid high-cardinality metric labels.
88 pub(crate) fn labelname(&self, context: &Context) -> String {
89 match self {
90 PeerId::Validator(index) => context.committee.authority(*index).hostname.to_string(),
91 PeerId::Observer(_) => "observer".to_string(),
92 }
93 }
94}
95
96// Tonic generated RPC stubs.
97mod tonic_gen {
98 include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
99 include!(concat!(env!("OUT_DIR"), "/consensus.ObserverService.rs"));
100}
101
102mod clients;
103pub(crate) mod metrics;
104mod metrics_layer;
105#[cfg(all(test, not(msim)))]
106mod network_tests;
107#[cfg(not(msim))]
108pub(crate) mod observer;
109#[cfg(msim)]
110pub mod observer;
111#[cfg(test)]
112pub(crate) mod test_network;
113#[cfg(not(msim))]
114pub(crate) mod tonic_network;
115#[cfg(msim)]
116pub mod tonic_network;
117mod tonic_tls;
118
119/// A stream of serialized filtered blocks returned over the network.
120pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
121
122/// Validator network client for communicating with validator peers.
123///
124/// NOTE: the timeout parameters help saving resources at client and potentially server.
125/// But it is up to the server implementation if the timeout is honored.
126/// - To bound server resources, server should implement own timeout for incoming requests.
127#[async_trait]
128pub(crate) trait ValidatorNetworkClient: Send + Sync + Sized + 'static {
129 /// Subscribes to blocks from a peer after last_received round.
130 async fn subscribe_blocks(
131 &self,
132 peer: AuthorityIndex,
133 last_received: Round,
134 timeout: Duration,
135 ) -> ConsensusResult<BlockStream>;
136
137 // TODO: add a parameter for maximum total size of blocks returned.
138 /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
139 /// of the requested blocks according to the provided `fetch_after_rounds`. The `fetch_after_rounds`
140 /// length should be equal to the committee size. If `fetch_after_rounds` is empty then it will
141 /// be simply ignored.
142 async fn fetch_blocks(
143 &self,
144 peer: AuthorityIndex,
145 block_refs: Vec<BlockRef>,
146 fetch_after_rounds: Vec<Round>,
147 fetch_missing_ancestors: bool,
148 timeout: Duration,
149 ) -> ConsensusResult<Vec<Bytes>>;
150
151 /// Fetches serialized commits in the commit range from a peer.
152 /// Returns a tuple of both the serialized commits, and serialized blocks that contain
153 /// votes certifying the last commit.
154 async fn fetch_commits(
155 &self,
156 peer: AuthorityIndex,
157 commit_range: CommitRange,
158 timeout: Duration,
159 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
160
161 /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
162 /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
163 /// blocks per peer as its possible to have equivocations.
164 async fn fetch_latest_blocks(
165 &self,
166 peer: AuthorityIndex,
167 authorities: Vec<AuthorityIndex>,
168 timeout: Duration,
169 ) -> ConsensusResult<Vec<Bytes>>;
170
171 /// Gets the latest received & accepted rounds of all authorities from the peer.
172 async fn get_latest_rounds(
173 &self,
174 peer: AuthorityIndex,
175 timeout: Duration,
176 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
177
178 /// Sends a serialized SignedBlock to a peer.
179 #[cfg(test)]
180 async fn send_block(
181 &self,
182 peer: AuthorityIndex,
183 block: &VerifiedBlock,
184 timeout: Duration,
185 ) -> ConsensusResult<()>;
186}
187
188/// Validator network service for handling requests from validator peers.
189#[async_trait]
190pub(crate) trait ValidatorNetworkService: Send + Sync + 'static {
191 /// Handles the block sent from the peer via either unicast RPC or subscription stream.
192 /// Peer value can be trusted to be a valid authority index.
193 /// But serialized_block must be verified before its contents are trusted.
194 /// Excluded ancestors are also included as part of an effort to further propagate
195 /// blocks to peers despite the current exclusion.
196 async fn handle_send_block(
197 &self,
198 peer: AuthorityIndex,
199 block: ExtendedSerializedBlock,
200 ) -> ConsensusResult<()>;
201
202 /// Handles the subscription request from the peer.
203 /// A stream of newly proposed blocks is returned to the peer.
204 /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
205 /// occurs.
206 async fn handle_subscribe_blocks(
207 &self,
208 peer: AuthorityIndex,
209 last_received: Round,
210 ) -> ConsensusResult<BlockStream>;
211
212 /// Handles the request to fetch blocks by references from the peer.
213 async fn handle_fetch_blocks(
214 &self,
215 peer: AuthorityIndex,
216 block_refs: Vec<BlockRef>,
217 fetch_after_rounds: Vec<Round>,
218 fetch_missing_ancestors: bool,
219 ) -> ConsensusResult<Vec<Bytes>>;
220
221 /// Handles the request to fetch commits by index range from the peer.
222 async fn handle_fetch_commits(
223 &self,
224 peer: AuthorityIndex,
225 commit_range: CommitRange,
226 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
227
228 /// Handles the request to fetch the latest block for the provided `authorities`.
229 async fn handle_fetch_latest_blocks(
230 &self,
231 peer: AuthorityIndex,
232 authorities: Vec<AuthorityIndex>,
233 ) -> ConsensusResult<Vec<Bytes>>;
234
235 /// Handles the request to get the latest received & accepted rounds of all authorities.
236 async fn handle_get_latest_rounds(
237 &self,
238 peer: AuthorityIndex,
239 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
240}
241
242/// Observer block stream type.
243pub(crate) type ObserverBlockStream = Pin<Box<dyn Stream<Item = Vec<Bytes>> + Send + 'static>>;
244
245/// Observer network service for handling requests from observer nodes.
246/// Unlike ValidatorNetworkService which uses AuthorityIndex, this uses NodeId (NetworkPublicKey)
247/// to identify peers since observers are not part of the committee.
248#[async_trait]
249#[allow(dead_code)]
250pub(crate) trait ObserverNetworkService: Send + Sync + 'static {
251 /// Handles a block received from a peer subscription. Used by ObserverSubscriber to process
252 /// blocks streamed from validators or other observers.
253 async fn handle_block(&self, peer: PeerId, block: Bytes) -> ConsensusResult<()>;
254
255 /// Handles the block streaming request from an observer peer.
256 /// Returns a stream of blocks with the highest commit index for each block.
257 /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
258 async fn handle_stream_blocks(
259 &self,
260 peer: NodeId,
261 highest_round_per_authority: Vec<u64>,
262 ) -> ConsensusResult<ObserverBlockStream>;
263
264 /// Handles the request to fetch blocks by references from an observer peer.
265 #[allow(unused)]
266 async fn handle_fetch_blocks(
267 &self,
268 peer: NodeId,
269 block_refs: Vec<BlockRef>,
270 fetch_after_rounds: Vec<Round>,
271 fetch_missing_ancestors: bool,
272 ) -> ConsensusResult<Vec<Bytes>>;
273
274 /// Handles the request to fetch commits by index range from an observer peer.
275 #[allow(unused)]
276 /// Returns serialized commits and certifier blocks.
277 async fn handle_fetch_commits(
278 &self,
279 peer: NodeId,
280 commit_range: CommitRange,
281 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
282}
283
284/// Observer network client for communicating with validators' observer ports or other observers.
285/// Unlike ValidatorNetworkClient which uses AuthorityIndex, this uses PeerId to identify peers
286/// since the observer server can serve both validators and observer nodes.
287#[async_trait]
288#[allow(dead_code)]
289pub(crate) trait ObserverNetworkClient: Send + Sync + Sized + 'static {
290 /// Initiates block streaming with a peer (validator or observer).
291 /// Returns a stream of blocks with the highest commit index.
292 /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
293 async fn stream_blocks(
294 &self,
295 peer: PeerId,
296 highest_round_per_authority: Vec<u64>,
297 timeout: Duration,
298 ) -> ConsensusResult<ObserverBlockStream>;
299
300 /// Fetches serialized blocks by references from a peer.
301 async fn fetch_blocks(
302 &self,
303 peer: PeerId,
304 block_refs: Vec<BlockRef>,
305 fetch_after_rounds: Vec<Round>,
306 fetch_missing_ancestors: bool,
307 timeout: Duration,
308 ) -> ConsensusResult<Vec<Bytes>>;
309
310 /// Fetches serialized commits in the commit range from a peer.
311 /// Returns a tuple of both the serialized commits, and serialized blocks that contain
312 /// votes certifying the last commit.
313 async fn fetch_commits(
314 &self,
315 peer: PeerId,
316 commit_range: CommitRange,
317 timeout: Duration,
318 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
319}
320
321/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
322/// Dropping `NetworkManager` will shutdown the network service.
323pub(crate) trait NetworkManager: Send + Sync {
324 type ValidatorClient: ValidatorNetworkClient;
325 type ObserverClient: ObserverNetworkClient;
326
327 /// Creates a new network manager.
328 fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
329
330 /// Returns the validator network client.
331 fn validator_client(&self) -> Arc<Self::ValidatorClient>;
332
333 /// Returns the observer network client.
334 #[allow(dead_code)]
335 fn observer_client(&self) -> Arc<Self::ObserverClient>;
336
337 /// Starts the validator network server with the provided service.
338 async fn start_validator_server<V>(&mut self, service: Arc<V>)
339 where
340 V: ValidatorNetworkService;
341
342 /// Starts the observer network server with the provided service.
343 async fn start_observer_server<O>(&mut self, service: Arc<O>)
344 where
345 O: ObserverNetworkService;
346
347 /// Stops the network service.
348 async fn stop(&mut self);
349
350 /// Updates the network address for a peer identified by their authority index.
351 /// If address is None, the override is cleared and the committee address will be used.
352 fn update_peer_address(&self, peer: AuthorityIndex, address: Option<Multiaddr>);
353}
354
355// Re-export the concrete client implementations.
356pub(crate) use clients::{CommitSyncerClient, SynchronizerClient};
357
358/// Serialized block with extended information from the proposing authority.
359#[derive(Clone, PartialEq, Eq, Debug)]
360pub(crate) struct ExtendedSerializedBlock {
361 pub(crate) block: Bytes,
362 // Serialized BlockRefs that are excluded from the blocks ancestors.
363 pub(crate) excluded_ancestors: Vec<Vec<u8>>,
364}
365
366impl From<ExtendedBlock> for ExtendedSerializedBlock {
367 fn from(extended_block: ExtendedBlock) -> Self {
368 Self {
369 block: extended_block.block.serialized().clone(),
370 excluded_ancestors: extended_block
371 .excluded_ancestors
372 .iter()
373 .filter_map(|r| match bcs::to_bytes(r) {
374 Ok(serialized) => Some(serialized),
375 Err(e) => {
376 tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
377 None
378 }
379 })
380 .collect(),
381 }
382 }
383}
384
385/// Attempts to convert a multiaddr of the form `/[ip4,ip6,dns]/{}/udp/{port}` into
386/// a host:port string.
387pub(crate) fn to_host_port_str(addr: &Multiaddr) -> Result<String, String> {
388 let mut iter = addr.iter();
389
390 match (iter.next(), iter.next()) {
391 (Some(Protocol::Ip4(ipaddr)), Some(Protocol::Udp(port))) => {
392 Ok(format!("{}:{}", ipaddr, port))
393 }
394 (Some(Protocol::Ip6(ipaddr)), Some(Protocol::Udp(port))) => {
395 Ok(format!("{}", SocketAddrV6::new(ipaddr, port, 0, 0)))
396 }
397 (Some(Protocol::Dns(hostname)), Some(Protocol::Udp(port))) => {
398 Ok(format!("{}:{}", hostname, port))
399 }
400
401 _ => Err(format!("unsupported multiaddr: {addr}")),
402 }
403}