consensus_core/network/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplifying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{
20    fmt::{Display, Formatter},
21    net::SocketAddrV6,
22    pin::Pin,
23    sync::Arc,
24    time::Duration,
25};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey};
30use consensus_types::block::{BlockRef, Round};
31use fastcrypto::encoding::{Encoding, Hex};
32use futures::Stream;
33use mysten_network::{Multiaddr, multiaddr::Protocol};
34
35use crate::{
36    block::{ExtendedBlock, VerifiedBlock},
37    commit::{CommitRange, TrustedCommit},
38    context::Context,
39    error::ConsensusResult,
40};
41
42/// Identifies an observer node by its network public key.
43pub type NodeId = NetworkPublicKey;
44
45/// Identifies a peer in the network, which can be either a validator or an observer.
46/// The Observer variant is boxed to keep the enum small, since `NodeId` (32 bytes) is
47/// much larger than `AuthorityIndex` (4 bytes).
48#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
49pub enum PeerId {
50    /// A validator node identified by its authority index.
51    Validator(AuthorityIndex),
52    /// An observer node identified by its network public key.
53    #[allow(dead_code)]
54    Observer(Box<NodeId>),
55}
56
57impl Display for PeerId {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        match self {
60            PeerId::Validator(authority) => write!(f, "[{}]", authority),
61            PeerId::Observer(node_id) => {
62                let bytes = node_id.to_bytes();
63                let s = Hex::encode(bytes.get(0..4).ok_or(std::fmt::Error)?);
64                write!(f, "o#{}..", s)
65            }
66        }
67    }
68}
69
70impl PeerId {
71    /// Returns a human-readable name suitable for logging. For observers, prints
72    /// the first 8 hex digits of the public key.
73    pub(crate) fn hostname(&self, context: &Context) -> String {
74        match self {
75            PeerId::Validator(index) => context.committee.authority(*index).hostname.to_string(),
76            PeerId::Observer(node_id) => {
77                let bytes = node_id.to_bytes();
78                format!(
79                    "[Observer]{:02x}{:02x}{:02x}{:02x}",
80                    bytes[0], bytes[1], bytes[2], bytes[3]
81                )
82            }
83        }
84    }
85
86    /// Returns a short label suitable for use in metrics. Does not include
87    /// full public keys to avoid high-cardinality metric labels.
88    pub(crate) fn labelname(&self, context: &Context) -> String {
89        match self {
90            PeerId::Validator(index) => context.committee.authority(*index).hostname.to_string(),
91            PeerId::Observer(_) => "observer".to_string(),
92        }
93    }
94}
95
96// Tonic generated RPC stubs.
97mod tonic_gen {
98    include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
99    include!(concat!(env!("OUT_DIR"), "/consensus.ObserverService.rs"));
100}
101
102mod clients;
103pub(crate) mod metrics;
104mod metrics_layer;
105#[cfg(all(test, not(msim)))]
106mod network_tests;
107#[cfg(not(msim))]
108pub(crate) mod observer;
109#[cfg(msim)]
110pub mod observer;
111#[cfg(test)]
112pub(crate) mod test_network;
113#[cfg(not(msim))]
114pub(crate) mod tonic_network;
115#[cfg(msim)]
116pub mod tonic_network;
117mod tonic_tls;
118
119/// A stream of serialized filtered blocks returned over the network.
120pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
121
122/// Validator network client for communicating with validator peers.
123///
124/// NOTE: the timeout parameters help saving resources at client and potentially server.
125/// But it is up to the server implementation if the timeout is honored.
126/// - To bound server resources, server should implement own timeout for incoming requests.
127#[async_trait]
128pub(crate) trait ValidatorNetworkClient: Send + Sync + Sized + 'static {
129    /// Subscribes to blocks from a peer after last_received round.
130    async fn subscribe_blocks(
131        &self,
132        peer: AuthorityIndex,
133        last_received: Round,
134        timeout: Duration,
135    ) -> ConsensusResult<BlockStream>;
136
137    // TODO: add a parameter for maximum total size of blocks returned.
138    /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
139    /// of the requested blocks according to the provided `fetch_after_rounds`. The `fetch_after_rounds`
140    /// length should be equal to the committee size. If `fetch_after_rounds` is empty then it will
141    /// be simply ignored.
142    async fn fetch_blocks(
143        &self,
144        peer: AuthorityIndex,
145        block_refs: Vec<BlockRef>,
146        fetch_after_rounds: Vec<Round>,
147        fetch_missing_ancestors: bool,
148        timeout: Duration,
149    ) -> ConsensusResult<Vec<Bytes>>;
150
151    /// Fetches serialized commits in the commit range from a peer.
152    /// Returns a tuple of both the serialized commits, and serialized blocks that contain
153    /// votes certifying the last commit.
154    async fn fetch_commits(
155        &self,
156        peer: AuthorityIndex,
157        commit_range: CommitRange,
158        timeout: Duration,
159    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
160
161    /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
162    /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
163    /// blocks per peer as its possible to have equivocations.
164    async fn fetch_latest_blocks(
165        &self,
166        peer: AuthorityIndex,
167        authorities: Vec<AuthorityIndex>,
168        timeout: Duration,
169    ) -> ConsensusResult<Vec<Bytes>>;
170
171    /// Gets the latest received & accepted rounds of all authorities from the peer.
172    async fn get_latest_rounds(
173        &self,
174        peer: AuthorityIndex,
175        timeout: Duration,
176    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
177
178    /// Sends a serialized SignedBlock to a peer.
179    #[cfg(test)]
180    async fn send_block(
181        &self,
182        peer: AuthorityIndex,
183        block: &VerifiedBlock,
184        timeout: Duration,
185    ) -> ConsensusResult<()>;
186}
187
188/// Validator network service for handling requests from validator peers.
189#[async_trait]
190pub(crate) trait ValidatorNetworkService: Send + Sync + 'static {
191    /// Handles the block sent from the peer via either unicast RPC or subscription stream.
192    /// Peer value can be trusted to be a valid authority index.
193    /// But serialized_block must be verified before its contents are trusted.
194    /// Excluded ancestors are also included as part of an effort to further propagate
195    /// blocks to peers despite the current exclusion.
196    async fn handle_send_block(
197        &self,
198        peer: AuthorityIndex,
199        block: ExtendedSerializedBlock,
200    ) -> ConsensusResult<()>;
201
202    /// Handles the subscription request from the peer.
203    /// A stream of newly proposed blocks is returned to the peer.
204    /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
205    /// occurs.
206    async fn handle_subscribe_blocks(
207        &self,
208        peer: AuthorityIndex,
209        last_received: Round,
210    ) -> ConsensusResult<BlockStream>;
211
212    /// Handles the request to fetch blocks by references from the peer.
213    async fn handle_fetch_blocks(
214        &self,
215        peer: AuthorityIndex,
216        block_refs: Vec<BlockRef>,
217        fetch_after_rounds: Vec<Round>,
218        fetch_missing_ancestors: bool,
219    ) -> ConsensusResult<Vec<Bytes>>;
220
221    /// Handles the request to fetch commits by index range from the peer.
222    async fn handle_fetch_commits(
223        &self,
224        peer: AuthorityIndex,
225        commit_range: CommitRange,
226    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
227
228    /// Handles the request to fetch the latest block for the provided `authorities`.
229    async fn handle_fetch_latest_blocks(
230        &self,
231        peer: AuthorityIndex,
232        authorities: Vec<AuthorityIndex>,
233    ) -> ConsensusResult<Vec<Bytes>>;
234
235    /// Handles the request to get the latest received & accepted rounds of all authorities.
236    async fn handle_get_latest_rounds(
237        &self,
238        peer: AuthorityIndex,
239    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
240}
241
242/// Observer block stream type.
243pub(crate) type ObserverBlockStream = Pin<Box<dyn Stream<Item = Vec<Bytes>> + Send + 'static>>;
244
245/// Observer network service for handling requests from observer nodes.
246/// Unlike ValidatorNetworkService which uses AuthorityIndex, this uses NodeId (NetworkPublicKey)
247/// to identify peers since observers are not part of the committee.
248#[async_trait]
249#[allow(dead_code)]
250pub(crate) trait ObserverNetworkService: Send + Sync + 'static {
251    /// Handles a block received from a peer subscription. Used by ObserverSubscriber to process
252    /// blocks streamed from validators or other observers.
253    async fn handle_block(&self, peer: PeerId, block: Bytes) -> ConsensusResult<()>;
254
255    /// Handles the block streaming request from an observer peer.
256    /// Returns a stream of blocks with the highest commit index for each block.
257    /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
258    async fn handle_stream_blocks(
259        &self,
260        peer: NodeId,
261        highest_round_per_authority: Vec<u64>,
262    ) -> ConsensusResult<ObserverBlockStream>;
263
264    /// Handles the request to fetch blocks by references from an observer peer.
265    #[allow(unused)]
266    async fn handle_fetch_blocks(
267        &self,
268        peer: NodeId,
269        block_refs: Vec<BlockRef>,
270        fetch_after_rounds: Vec<Round>,
271        fetch_missing_ancestors: bool,
272    ) -> ConsensusResult<Vec<Bytes>>;
273
274    /// Handles the request to fetch commits by index range from an observer peer.
275    #[allow(unused)]
276    /// Returns serialized commits and certifier blocks.
277    async fn handle_fetch_commits(
278        &self,
279        peer: NodeId,
280        commit_range: CommitRange,
281    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
282}
283
284/// Observer network client for communicating with validators' observer ports or other observers.
285/// Unlike ValidatorNetworkClient which uses AuthorityIndex, this uses PeerId to identify peers
286/// since the observer server can serve both validators and observer nodes.
287#[async_trait]
288#[allow(dead_code)]
289pub(crate) trait ObserverNetworkClient: Send + Sync + Sized + 'static {
290    /// Initiates block streaming with a peer (validator or observer).
291    /// Returns a stream of blocks with the highest commit index.
292    /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
293    async fn stream_blocks(
294        &self,
295        peer: PeerId,
296        highest_round_per_authority: Vec<u64>,
297        timeout: Duration,
298    ) -> ConsensusResult<ObserverBlockStream>;
299
300    /// Fetches serialized blocks by references from a peer.
301    async fn fetch_blocks(
302        &self,
303        peer: PeerId,
304        block_refs: Vec<BlockRef>,
305        fetch_after_rounds: Vec<Round>,
306        fetch_missing_ancestors: bool,
307        timeout: Duration,
308    ) -> ConsensusResult<Vec<Bytes>>;
309
310    /// Fetches serialized commits in the commit range from a peer.
311    /// Returns a tuple of both the serialized commits, and serialized blocks that contain
312    /// votes certifying the last commit.
313    async fn fetch_commits(
314        &self,
315        peer: PeerId,
316        commit_range: CommitRange,
317        timeout: Duration,
318    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
319}
320
321/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
322/// Dropping `NetworkManager` will shutdown the network service.
323pub(crate) trait NetworkManager: Send + Sync {
324    type ValidatorClient: ValidatorNetworkClient;
325    type ObserverClient: ObserverNetworkClient;
326
327    /// Creates a new network manager.
328    fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
329
330    /// Returns the validator network client.
331    fn validator_client(&self) -> Arc<Self::ValidatorClient>;
332
333    /// Returns the observer network client.
334    #[allow(dead_code)]
335    fn observer_client(&self) -> Arc<Self::ObserverClient>;
336
337    /// Starts the validator network server with the provided service.
338    async fn start_validator_server<V>(&mut self, service: Arc<V>)
339    where
340        V: ValidatorNetworkService;
341
342    /// Starts the observer network server with the provided service.
343    async fn start_observer_server<O>(&mut self, service: Arc<O>)
344    where
345        O: ObserverNetworkService;
346
347    /// Stops the network service.
348    async fn stop(&mut self);
349
350    /// Updates the network address for a peer identified by their authority index.
351    /// If address is None, the override is cleared and the committee address will be used.
352    fn update_peer_address(&self, peer: AuthorityIndex, address: Option<Multiaddr>);
353}
354
355// Re-export the concrete client implementations.
356pub(crate) use clients::{CommitSyncerClient, SynchronizerClient};
357
358/// Serialized block with extended information from the proposing authority.
359#[derive(Clone, PartialEq, Eq, Debug)]
360pub(crate) struct ExtendedSerializedBlock {
361    pub(crate) block: Bytes,
362    // Serialized BlockRefs that are excluded from the blocks ancestors.
363    pub(crate) excluded_ancestors: Vec<Vec<u8>>,
364}
365
366impl From<ExtendedBlock> for ExtendedSerializedBlock {
367    fn from(extended_block: ExtendedBlock) -> Self {
368        Self {
369            block: extended_block.block.serialized().clone(),
370            excluded_ancestors: extended_block
371                .excluded_ancestors
372                .iter()
373                .filter_map(|r| match bcs::to_bytes(r) {
374                    Ok(serialized) => Some(serialized),
375                    Err(e) => {
376                        tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
377                        None
378                    }
379                })
380                .collect(),
381        }
382    }
383}
384
385/// Attempts to convert a multiaddr of the form `/[ip4,ip6,dns]/{}/udp/{port}` into
386/// a host:port string.
387pub(crate) fn to_host_port_str(addr: &Multiaddr) -> Result<String, String> {
388    let mut iter = addr.iter();
389
390    match (iter.next(), iter.next()) {
391        (Some(Protocol::Ip4(ipaddr)), Some(Protocol::Udp(port))) => {
392            Ok(format!("{}:{}", ipaddr, port))
393        }
394        (Some(Protocol::Ip6(ipaddr)), Some(Protocol::Udp(port))) => {
395            Ok(format!("{}", SocketAddrV6::new(ipaddr, port, 0, 0)))
396        }
397        (Some(Protocol::Dns(hostname)), Some(Protocol::Udp(port))) => {
398            Ok(format!("{}:{}", hostname, port))
399        }
400
401        _ => Err(format!("unsupported multiaddr: {addr}")),
402    }
403}