consensus_core/network/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplifying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{
20    fmt::{Display, Formatter},
21    net::SocketAddrV6,
22    pin::Pin,
23    sync::Arc,
24    time::Duration,
25};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey};
30use consensus_types::block::{BlockRef, Round};
31use fastcrypto::encoding::{Encoding, Hex};
32use futures::Stream;
33use mysten_network::{Multiaddr, multiaddr::Protocol};
34
35use crate::{
36    block::{ExtendedBlock, VerifiedBlock},
37    commit::{CommitRange, TrustedCommit},
38    context::Context,
39    error::ConsensusResult,
40};
41
42/// Identifies an observer node by its network public key.
43pub type NodeId = NetworkPublicKey;
44
45/// Identifies a peer in the network, which can be either a validator or an observer.
46/// The Observer variant is boxed to keep the enum small, since `NodeId` (32 bytes) is
47/// much larger than `AuthorityIndex` (4 bytes).
48#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
49pub enum PeerId {
50    /// A validator node identified by its authority index.
51    Validator(AuthorityIndex),
52    /// An observer node identified by its network public key.
53    Observer(Box<NodeId>),
54}
55
56impl Display for PeerId {
57    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58        match self {
59            PeerId::Validator(authority) => write!(f, "[{}]", authority),
60            PeerId::Observer(node_id) => {
61                let bytes = node_id.to_bytes();
62                let s = Hex::encode(bytes.get(0..4).ok_or(std::fmt::Error)?);
63                write!(f, "o#{}..", s)
64            }
65        }
66    }
67}
68
69impl PeerId {
70    /// Returns a human-readable name suitable for logging. For observers, prints
71    /// the first 8 hex digits of the public key.
72    pub(crate) fn hostname(&self, context: &Context) -> String {
73        match self {
74            PeerId::Validator(index) => context.committee.authority(*index).hostname.to_string(),
75            PeerId::Observer(node_id) => {
76                let bytes = node_id.to_bytes();
77                format!(
78                    "[Observer]{:02x}{:02x}{:02x}{:02x}",
79                    bytes[0], bytes[1], bytes[2], bytes[3]
80                )
81            }
82        }
83    }
84
85    /// Returns a short label suitable for use in metrics. Does not include
86    /// full public keys to avoid high-cardinality metric labels.
87    pub(crate) fn labelname(&self, context: &Context) -> String {
88        match self {
89            PeerId::Validator(index) => context.committee.authority(*index).hostname.to_string(),
90            PeerId::Observer(_) => "observer".to_string(),
91        }
92    }
93}
94
95// Tonic generated RPC stubs.
96mod tonic_gen {
97    include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
98    include!(concat!(env!("OUT_DIR"), "/consensus.ObserverService.rs"));
99}
100
101mod clients;
102pub(crate) mod metrics;
103mod metrics_layer;
104#[cfg(all(test, not(msim)))]
105mod network_tests;
106#[cfg(not(msim))]
107pub(crate) mod observer;
108#[cfg(msim)]
109pub mod observer;
110#[cfg(test)]
111pub(crate) mod test_network;
112#[cfg(not(msim))]
113pub(crate) mod tonic_network;
114#[cfg(msim)]
115pub mod tonic_network;
116mod tonic_tls;
117
118/// A stream of serialized filtered blocks returned over the network.
119pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
120
121/// Validator network client for communicating with validator peers.
122///
123/// NOTE: the timeout parameters help saving resources at client and potentially server.
124/// But it is up to the server implementation if the timeout is honored.
125/// - To bound server resources, server should implement own timeout for incoming requests.
126#[async_trait]
127pub(crate) trait ValidatorNetworkClient: Send + Sync + Sized + 'static {
128    /// Subscribes to blocks from a peer after last_received round.
129    async fn subscribe_blocks(
130        &self,
131        peer: AuthorityIndex,
132        last_received: Round,
133        timeout: Duration,
134    ) -> ConsensusResult<BlockStream>;
135
136    // TODO: add a parameter for maximum total size of blocks returned.
137    /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
138    /// of the requested blocks according to the provided `fetch_after_rounds`. The `fetch_after_rounds`
139    /// length should be equal to the committee size. If `fetch_after_rounds` is empty then it will
140    /// be simply ignored.
141    async fn fetch_blocks(
142        &self,
143        peer: AuthorityIndex,
144        block_refs: Vec<BlockRef>,
145        fetch_after_rounds: Vec<Round>,
146        fetch_missing_ancestors: bool,
147        timeout: Duration,
148    ) -> ConsensusResult<Vec<Bytes>>;
149
150    /// Fetches serialized commits in the commit range from a peer.
151    /// Returns a tuple of both the serialized commits, and serialized blocks that contain
152    /// votes certifying the last commit.
153    async fn fetch_commits(
154        &self,
155        peer: AuthorityIndex,
156        commit_range: CommitRange,
157        timeout: Duration,
158    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
159
160    /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
161    /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
162    /// blocks per peer as its possible to have equivocations.
163    async fn fetch_latest_blocks(
164        &self,
165        peer: AuthorityIndex,
166        authorities: Vec<AuthorityIndex>,
167        timeout: Duration,
168    ) -> ConsensusResult<Vec<Bytes>>;
169
170    /// Gets the latest received & accepted rounds of all authorities from the peer.
171    async fn get_latest_rounds(
172        &self,
173        peer: AuthorityIndex,
174        timeout: Duration,
175    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
176
177    /// Sends a serialized SignedBlock to a peer.
178    #[cfg(test)]
179    async fn send_block(
180        &self,
181        peer: AuthorityIndex,
182        block: &VerifiedBlock,
183        timeout: Duration,
184    ) -> ConsensusResult<()>;
185}
186
187/// Validator network service for handling requests from validator peers.
188#[async_trait]
189pub(crate) trait ValidatorNetworkService: Send + Sync + 'static {
190    /// Handles the block sent from the peer via either unicast RPC or subscription stream.
191    /// Peer value can be trusted to be a valid authority index.
192    /// But serialized_block must be verified before its contents are trusted.
193    /// Excluded ancestors are also included as part of an effort to further propagate
194    /// blocks to peers despite the current exclusion.
195    async fn handle_send_block(
196        &self,
197        peer: AuthorityIndex,
198        block: ExtendedSerializedBlock,
199    ) -> ConsensusResult<()>;
200
201    /// Handles the subscription request from the peer.
202    /// A stream of newly proposed blocks is returned to the peer.
203    /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
204    /// occurs.
205    async fn handle_subscribe_blocks(
206        &self,
207        peer: AuthorityIndex,
208        last_received: Round,
209    ) -> ConsensusResult<BlockStream>;
210
211    /// Handles the request to fetch blocks by references from the peer.
212    async fn handle_fetch_blocks(
213        &self,
214        peer: AuthorityIndex,
215        block_refs: Vec<BlockRef>,
216        fetch_after_rounds: Vec<Round>,
217        fetch_missing_ancestors: bool,
218    ) -> ConsensusResult<Vec<Bytes>>;
219
220    /// Handles the request to fetch commits by index range from the peer.
221    async fn handle_fetch_commits(
222        &self,
223        peer: AuthorityIndex,
224        commit_range: CommitRange,
225    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
226
227    /// Handles the request to fetch the latest block for the provided `authorities`.
228    async fn handle_fetch_latest_blocks(
229        &self,
230        peer: AuthorityIndex,
231        authorities: Vec<AuthorityIndex>,
232    ) -> ConsensusResult<Vec<Bytes>>;
233
234    /// Handles the request to get the latest received & accepted rounds of all authorities.
235    async fn handle_get_latest_rounds(
236        &self,
237        peer: AuthorityIndex,
238    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
239}
240
241/// Handler for randomness round signatures exchanged between validators and
242/// observer nodes via the consensus block stream.
243pub trait RandomnessSignatureHandler: Send + Sync + 'static {
244    /// Called by the observer subscriber for each randomness round signature
245    /// received from the block stream.
246    fn handle_randomness_signature(&self, data: Bytes);
247
248    /// Returns a receiver for broadcast randomness signatures. Called by
249    /// the observer service to merge signatures into the outgoing block stream.
250    fn subscribe_randomness_signatures(&self) -> tokio::sync::broadcast::Receiver<Bytes>;
251}
252
253/// A single item in the observer block stream, carrying both blocks and auxiliary data.
254pub(crate) struct ObserverStreamItem {
255    pub(crate) blocks: Vec<Bytes>,
256    pub(crate) auxiliary_data: observer::AuxiliaryData,
257}
258
259/// Observer block stream type.
260pub(crate) type ObserverBlockStream =
261    Pin<Box<dyn Stream<Item = ObserverStreamItem> + Send + 'static>>;
262
263/// Observer network service for handling requests from observer nodes.
264/// Unlike ValidatorNetworkService which uses AuthorityIndex, this uses NodeId (NetworkPublicKey)
265/// to identify peers since observers are not part of the committee.
266#[async_trait]
267pub(crate) trait ObserverNetworkService: Send + Sync + 'static {
268    /// Handles a block received from a peer subscription. Used by ObserverSubscriber to process
269    /// blocks streamed from validators or other observers.
270    async fn handle_block(&self, peer: PeerId, block: Bytes) -> ConsensusResult<()>;
271
272    /// Handles the block streaming request from an observer peer.
273    /// Returns a stream of blocks with the highest commit index for each block.
274    /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
275    async fn handle_stream_blocks(
276        &self,
277        peer: NodeId,
278        highest_round_per_authority: Vec<u64>,
279    ) -> ConsensusResult<ObserverBlockStream>;
280
281    /// Handles the request to fetch blocks by references from an observer peer.
282    async fn handle_fetch_blocks(
283        &self,
284        peer: NodeId,
285        block_refs: Vec<BlockRef>,
286        fetch_after_rounds: Vec<Round>,
287        fetch_missing_ancestors: bool,
288    ) -> ConsensusResult<Vec<Bytes>>;
289
290    /// Handles the request to fetch commits by index range from an observer peer.
291    /// Returns serialized commits and certifier blocks.
292    async fn handle_fetch_commits(
293        &self,
294        peer: NodeId,
295        commit_range: CommitRange,
296    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
297}
298
299/// Observer network client for communicating with validators' observer ports or other observers.
300/// Unlike ValidatorNetworkClient which uses AuthorityIndex, this uses PeerId to identify peers
301/// since the observer server can serve both validators and observer nodes.
302#[async_trait]
303pub(crate) trait ObserverNetworkClient: Send + Sync + Sized + 'static {
304    /// Initiates block streaming with a peer (validator or observer).
305    /// Returns a stream of blocks with the highest commit index.
306    /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
307    async fn stream_blocks(
308        &self,
309        peer: PeerId,
310        highest_round_per_authority: Vec<u64>,
311        timeout: Duration,
312    ) -> ConsensusResult<ObserverBlockStream>;
313
314    /// Fetches serialized blocks by references from a peer.
315    async fn fetch_blocks(
316        &self,
317        peer: PeerId,
318        block_refs: Vec<BlockRef>,
319        fetch_after_rounds: Vec<Round>,
320        fetch_missing_ancestors: bool,
321        timeout: Duration,
322    ) -> ConsensusResult<Vec<Bytes>>;
323
324    /// Fetches serialized commits in the commit range from a peer.
325    /// Returns a tuple of both the serialized commits, and serialized blocks that contain
326    /// votes certifying the last commit.
327    async fn fetch_commits(
328        &self,
329        peer: PeerId,
330        commit_range: CommitRange,
331        timeout: Duration,
332    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
333}
334
335/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
336/// Dropping `NetworkManager` will shutdown the network service.
337pub(crate) trait NetworkManager: Send + Sync {
338    type ValidatorClient: ValidatorNetworkClient;
339    type ObserverClient: ObserverNetworkClient;
340
341    /// Creates a new network manager.
342    fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
343
344    /// Returns the validator network client.
345    fn validator_client(&self) -> Arc<Self::ValidatorClient>;
346
347    /// Returns the observer network client.
348    fn observer_client(&self) -> Arc<Self::ObserverClient>;
349
350    /// Starts the validator network server with the provided service.
351    async fn start_validator_server<V>(&mut self, service: Arc<V>)
352    where
353        V: ValidatorNetworkService;
354
355    /// Starts the observer network server with the provided service.
356    async fn start_observer_server<O>(&mut self, service: Arc<O>)
357    where
358        O: ObserverNetworkService;
359
360    /// Stops the network service.
361    async fn stop(&mut self);
362
363    /// Updates the network address for a peer identified by their authority index.
364    /// If address is None, the override is cleared and the committee address will be used.
365    fn update_peer_address(&self, peer: AuthorityIndex, address: Option<Multiaddr>);
366}
367
368// Re-export the concrete client implementations.
369pub(crate) use clients::{CommitSyncerClient, SynchronizerClient};
370
371/// Serialized block with extended information from the proposing authority.
372#[derive(Clone, PartialEq, Eq, Debug)]
373pub(crate) struct ExtendedSerializedBlock {
374    pub(crate) block: Bytes,
375    // Serialized BlockRefs that are excluded from the blocks ancestors.
376    pub(crate) excluded_ancestors: Vec<Vec<u8>>,
377}
378
379impl From<ExtendedBlock> for ExtendedSerializedBlock {
380    fn from(extended_block: ExtendedBlock) -> Self {
381        Self {
382            block: extended_block.block.serialized().clone(),
383            excluded_ancestors: extended_block
384                .excluded_ancestors
385                .iter()
386                .filter_map(|r| match bcs::to_bytes(r) {
387                    Ok(serialized) => Some(serialized),
388                    Err(e) => {
389                        tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
390                        None
391                    }
392                })
393                .collect(),
394        }
395    }
396}
397
398/// Attempts to convert a multiaddr of the form `/[ip4,ip6,dns]/{}/udp/{port}` into
399/// a host:port string.
400pub(crate) fn to_host_port_str(addr: &Multiaddr) -> Result<String, String> {
401    let mut iter = addr.iter();
402
403    match (iter.next(), iter.next()) {
404        (Some(Protocol::Ip4(ipaddr)), Some(Protocol::Udp(port))) => {
405            Ok(format!("{}:{}", ipaddr, port))
406        }
407        (Some(Protocol::Ip6(ipaddr)), Some(Protocol::Udp(port))) => {
408            Ok(format!("{}", SocketAddrV6::new(ipaddr, port, 0, 0)))
409        }
410        (Some(Protocol::Dns(hostname)), Some(Protocol::Udp(port))) => {
411            Ok(format!("{}:{}", hostname, port))
412        }
413
414        _ => Err(format!("unsupported multiaddr: {addr}")),
415    }
416}