consensus_core/network/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplifying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{
20 fmt::{Display, Formatter},
21 net::SocketAddrV6,
22 pin::Pin,
23 sync::Arc,
24 time::Duration,
25};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey};
30use consensus_types::block::{BlockRef, Round};
31use fastcrypto::encoding::{Encoding, Hex};
32use futures::Stream;
33use mysten_network::{Multiaddr, multiaddr::Protocol};
34
35use crate::{
36 block::{ExtendedBlock, VerifiedBlock},
37 commit::{CommitRange, TrustedCommit},
38 context::Context,
39 error::ConsensusResult,
40};
41
42/// Identifies an observer node by its network public key.
43pub type NodeId = NetworkPublicKey;
44
45/// Identifies a peer in the network, which can be either a validator or an observer.
46/// The Observer variant is boxed to keep the enum small, since `NodeId` (32 bytes) is
47/// much larger than `AuthorityIndex` (4 bytes).
48#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
49pub enum PeerId {
50 /// A validator node identified by its authority index.
51 Validator(AuthorityIndex),
52 /// An observer node identified by its network public key.
53 Observer(Box<NodeId>),
54}
55
56impl Display for PeerId {
57 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58 match self {
59 PeerId::Validator(authority) => write!(f, "[{}]", authority),
60 PeerId::Observer(node_id) => {
61 let bytes = node_id.to_bytes();
62 let s = Hex::encode(bytes.get(0..4).ok_or(std::fmt::Error)?);
63 write!(f, "o#{}..", s)
64 }
65 }
66 }
67}
68
69impl PeerId {
70 /// Returns a human-readable name suitable for logging. For observers, prints
71 /// the first 8 hex digits of the public key.
72 pub(crate) fn hostname(&self, context: &Context) -> String {
73 match self {
74 PeerId::Validator(index) => context.committee.authority(*index).hostname.to_string(),
75 PeerId::Observer(node_id) => {
76 let bytes = node_id.to_bytes();
77 format!(
78 "[Observer]{:02x}{:02x}{:02x}{:02x}",
79 bytes[0], bytes[1], bytes[2], bytes[3]
80 )
81 }
82 }
83 }
84
85 /// Returns a short label suitable for use in metrics. Does not include
86 /// full public keys to avoid high-cardinality metric labels.
87 pub(crate) fn labelname(&self, context: &Context) -> String {
88 match self {
89 PeerId::Validator(index) => context.committee.authority(*index).hostname.to_string(),
90 PeerId::Observer(_) => "observer".to_string(),
91 }
92 }
93}
94
95// Tonic generated RPC stubs.
96mod tonic_gen {
97 include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
98 include!(concat!(env!("OUT_DIR"), "/consensus.ObserverService.rs"));
99}
100
101mod clients;
102pub(crate) mod metrics;
103mod metrics_layer;
104#[cfg(all(test, not(msim)))]
105mod network_tests;
106#[cfg(not(msim))]
107pub(crate) mod observer;
108#[cfg(msim)]
109pub mod observer;
110#[cfg(test)]
111pub(crate) mod test_network;
112#[cfg(not(msim))]
113pub(crate) mod tonic_network;
114#[cfg(msim)]
115pub mod tonic_network;
116mod tonic_tls;
117
118/// A stream of serialized filtered blocks returned over the network.
119pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
120
121/// Validator network client for communicating with validator peers.
122///
123/// NOTE: the timeout parameters help saving resources at client and potentially server.
124/// But it is up to the server implementation if the timeout is honored.
125/// - To bound server resources, server should implement own timeout for incoming requests.
126#[async_trait]
127pub(crate) trait ValidatorNetworkClient: Send + Sync + Sized + 'static {
128 /// Subscribes to blocks from a peer after last_received round.
129 async fn subscribe_blocks(
130 &self,
131 peer: AuthorityIndex,
132 last_received: Round,
133 timeout: Duration,
134 ) -> ConsensusResult<BlockStream>;
135
136 // TODO: add a parameter for maximum total size of blocks returned.
137 /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
138 /// of the requested blocks according to the provided `fetch_after_rounds`. The `fetch_after_rounds`
139 /// length should be equal to the committee size. If `fetch_after_rounds` is empty then it will
140 /// be simply ignored.
141 async fn fetch_blocks(
142 &self,
143 peer: AuthorityIndex,
144 block_refs: Vec<BlockRef>,
145 fetch_after_rounds: Vec<Round>,
146 fetch_missing_ancestors: bool,
147 timeout: Duration,
148 ) -> ConsensusResult<Vec<Bytes>>;
149
150 /// Fetches serialized commits in the commit range from a peer.
151 /// Returns a tuple of both the serialized commits, and serialized blocks that contain
152 /// votes certifying the last commit.
153 async fn fetch_commits(
154 &self,
155 peer: AuthorityIndex,
156 commit_range: CommitRange,
157 timeout: Duration,
158 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
159
160 /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
161 /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
162 /// blocks per peer as its possible to have equivocations.
163 async fn fetch_latest_blocks(
164 &self,
165 peer: AuthorityIndex,
166 authorities: Vec<AuthorityIndex>,
167 timeout: Duration,
168 ) -> ConsensusResult<Vec<Bytes>>;
169
170 /// Gets the latest received & accepted rounds of all authorities from the peer.
171 async fn get_latest_rounds(
172 &self,
173 peer: AuthorityIndex,
174 timeout: Duration,
175 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
176
177 /// Sends a serialized SignedBlock to a peer.
178 #[cfg(test)]
179 async fn send_block(
180 &self,
181 peer: AuthorityIndex,
182 block: &VerifiedBlock,
183 timeout: Duration,
184 ) -> ConsensusResult<()>;
185}
186
187/// Validator network service for handling requests from validator peers.
188#[async_trait]
189pub(crate) trait ValidatorNetworkService: Send + Sync + 'static {
190 /// Handles the block sent from the peer via either unicast RPC or subscription stream.
191 /// Peer value can be trusted to be a valid authority index.
192 /// But serialized_block must be verified before its contents are trusted.
193 /// Excluded ancestors are also included as part of an effort to further propagate
194 /// blocks to peers despite the current exclusion.
195 async fn handle_send_block(
196 &self,
197 peer: AuthorityIndex,
198 block: ExtendedSerializedBlock,
199 ) -> ConsensusResult<()>;
200
201 /// Handles the subscription request from the peer.
202 /// A stream of newly proposed blocks is returned to the peer.
203 /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
204 /// occurs.
205 async fn handle_subscribe_blocks(
206 &self,
207 peer: AuthorityIndex,
208 last_received: Round,
209 ) -> ConsensusResult<BlockStream>;
210
211 /// Handles the request to fetch blocks by references from the peer.
212 async fn handle_fetch_blocks(
213 &self,
214 peer: AuthorityIndex,
215 block_refs: Vec<BlockRef>,
216 fetch_after_rounds: Vec<Round>,
217 fetch_missing_ancestors: bool,
218 ) -> ConsensusResult<Vec<Bytes>>;
219
220 /// Handles the request to fetch commits by index range from the peer.
221 async fn handle_fetch_commits(
222 &self,
223 peer: AuthorityIndex,
224 commit_range: CommitRange,
225 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
226
227 /// Handles the request to fetch the latest block for the provided `authorities`.
228 async fn handle_fetch_latest_blocks(
229 &self,
230 peer: AuthorityIndex,
231 authorities: Vec<AuthorityIndex>,
232 ) -> ConsensusResult<Vec<Bytes>>;
233
234 /// Handles the request to get the latest received & accepted rounds of all authorities.
235 async fn handle_get_latest_rounds(
236 &self,
237 peer: AuthorityIndex,
238 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
239}
240
241/// Handler for randomness round signatures exchanged between validators and
242/// observer nodes via the consensus block stream.
243pub trait RandomnessSignatureHandler: Send + Sync + 'static {
244 /// Called by the observer subscriber for each randomness round signature
245 /// received from the block stream.
246 fn handle_randomness_signature(&self, data: Bytes);
247
248 /// Returns a receiver for broadcast randomness signatures. Called by
249 /// the observer service to merge signatures into the outgoing block stream.
250 fn subscribe_randomness_signatures(&self) -> tokio::sync::broadcast::Receiver<Bytes>;
251}
252
253/// A single item in the observer block stream, carrying both blocks and auxiliary data.
254pub(crate) struct ObserverStreamItem {
255 pub(crate) blocks: Vec<Bytes>,
256 pub(crate) auxiliary_data: observer::AuxiliaryData,
257}
258
259/// Observer block stream type.
260pub(crate) type ObserverBlockStream =
261 Pin<Box<dyn Stream<Item = ObserverStreamItem> + Send + 'static>>;
262
263/// Observer network service for handling requests from observer nodes.
264/// Unlike ValidatorNetworkService which uses AuthorityIndex, this uses NodeId (NetworkPublicKey)
265/// to identify peers since observers are not part of the committee.
266#[async_trait]
267pub(crate) trait ObserverNetworkService: Send + Sync + 'static {
268 /// Handles a block received from a peer subscription. Used by ObserverSubscriber to process
269 /// blocks streamed from validators or other observers.
270 async fn handle_block(&self, peer: PeerId, block: Bytes) -> ConsensusResult<()>;
271
272 /// Handles the block streaming request from an observer peer.
273 /// Returns a stream of blocks with the highest commit index for each block.
274 /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
275 async fn handle_stream_blocks(
276 &self,
277 peer: NodeId,
278 highest_round_per_authority: Vec<u64>,
279 ) -> ConsensusResult<ObserverBlockStream>;
280
281 /// Handles the request to fetch blocks by references from an observer peer.
282 async fn handle_fetch_blocks(
283 &self,
284 peer: NodeId,
285 block_refs: Vec<BlockRef>,
286 fetch_after_rounds: Vec<Round>,
287 fetch_missing_ancestors: bool,
288 ) -> ConsensusResult<Vec<Bytes>>;
289
290 /// Handles the request to fetch commits by index range from an observer peer.
291 /// Returns serialized commits and certifier blocks.
292 async fn handle_fetch_commits(
293 &self,
294 peer: NodeId,
295 commit_range: CommitRange,
296 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
297}
298
299/// Observer network client for communicating with validators' observer ports or other observers.
300/// Unlike ValidatorNetworkClient which uses AuthorityIndex, this uses PeerId to identify peers
301/// since the observer server can serve both validators and observer nodes.
302#[async_trait]
303pub(crate) trait ObserverNetworkClient: Send + Sync + Sized + 'static {
304 /// Initiates block streaming with a peer (validator or observer).
305 /// Returns a stream of blocks with the highest commit index.
306 /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
307 async fn stream_blocks(
308 &self,
309 peer: PeerId,
310 highest_round_per_authority: Vec<u64>,
311 timeout: Duration,
312 ) -> ConsensusResult<ObserverBlockStream>;
313
314 /// Fetches serialized blocks by references from a peer.
315 async fn fetch_blocks(
316 &self,
317 peer: PeerId,
318 block_refs: Vec<BlockRef>,
319 fetch_after_rounds: Vec<Round>,
320 fetch_missing_ancestors: bool,
321 timeout: Duration,
322 ) -> ConsensusResult<Vec<Bytes>>;
323
324 /// Fetches serialized commits in the commit range from a peer.
325 /// Returns a tuple of both the serialized commits, and serialized blocks that contain
326 /// votes certifying the last commit.
327 async fn fetch_commits(
328 &self,
329 peer: PeerId,
330 commit_range: CommitRange,
331 timeout: Duration,
332 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
333}
334
335/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
336/// Dropping `NetworkManager` will shutdown the network service.
337pub(crate) trait NetworkManager: Send + Sync {
338 type ValidatorClient: ValidatorNetworkClient;
339 type ObserverClient: ObserverNetworkClient;
340
341 /// Creates a new network manager.
342 fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
343
344 /// Returns the validator network client.
345 fn validator_client(&self) -> Arc<Self::ValidatorClient>;
346
347 /// Returns the observer network client.
348 fn observer_client(&self) -> Arc<Self::ObserverClient>;
349
350 /// Starts the validator network server with the provided service.
351 async fn start_validator_server<V>(&mut self, service: Arc<V>)
352 where
353 V: ValidatorNetworkService;
354
355 /// Starts the observer network server with the provided service.
356 async fn start_observer_server<O>(&mut self, service: Arc<O>)
357 where
358 O: ObserverNetworkService;
359
360 /// Stops the network service.
361 async fn stop(&mut self);
362
363 /// Updates the network address for a peer identified by their authority index.
364 /// If address is None, the override is cleared and the committee address will be used.
365 fn update_peer_address(&self, peer: AuthorityIndex, address: Option<Multiaddr>);
366}
367
368// Re-export the concrete client implementations.
369pub(crate) use clients::{CommitSyncerClient, SynchronizerClient};
370
371/// Serialized block with extended information from the proposing authority.
372#[derive(Clone, PartialEq, Eq, Debug)]
373pub(crate) struct ExtendedSerializedBlock {
374 pub(crate) block: Bytes,
375 // Serialized BlockRefs that are excluded from the blocks ancestors.
376 pub(crate) excluded_ancestors: Vec<Vec<u8>>,
377}
378
379impl From<ExtendedBlock> for ExtendedSerializedBlock {
380 fn from(extended_block: ExtendedBlock) -> Self {
381 Self {
382 block: extended_block.block.serialized().clone(),
383 excluded_ancestors: extended_block
384 .excluded_ancestors
385 .iter()
386 .filter_map(|r| match bcs::to_bytes(r) {
387 Ok(serialized) => Some(serialized),
388 Err(e) => {
389 tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
390 None
391 }
392 })
393 .collect(),
394 }
395 }
396}
397
398/// Attempts to convert a multiaddr of the form `/[ip4,ip6,dns]/{}/udp/{port}` into
399/// a host:port string.
400pub(crate) fn to_host_port_str(addr: &Multiaddr) -> Result<String, String> {
401 let mut iter = addr.iter();
402
403 match (iter.next(), iter.next()) {
404 (Some(Protocol::Ip4(ipaddr)), Some(Protocol::Udp(port))) => {
405 Ok(format!("{}:{}", ipaddr, port))
406 }
407 (Some(Protocol::Ip6(ipaddr)), Some(Protocol::Udp(port))) => {
408 Ok(format!("{}", SocketAddrV6::new(ipaddr, port, 0, 0)))
409 }
410 (Some(Protocol::Dns(hostname)), Some(Protocol::Udp(port))) => {
411 Ok(format!("{}:{}", hostname, port))
412 }
413
414 _ => Err(format!("unsupported multiaddr: {addr}")),
415 }
416}