consensus_core/network/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the network interface, and provides network implementations for the
5//! consensus protocol.
6//!
7//! Having an abstract network interface allows
8//! - simplifying the semantics of sending data and serving requests over the network
9//! - hiding implementation specific types and semantics from the consensus protocol
10//! - allowing easy swapping of network implementations, for better performance or testing
11//!
12//! When modifying the client and server interfaces, the principle is to keep the interfaces
13//! low level, close to underlying implementations in semantics. For example, the client interface
14//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
15//! to a stream of blocks gets back the stream via response, instead of delivering the stream
16//! directly to the server. This keeps the logic agnostics to the underlying network outside of
17//! this module, so they can be reused easily across network implementations.
18
19use std::{
20    fmt::{Display, Formatter},
21    net::SocketAddrV6,
22    pin::Pin,
23    sync::Arc,
24    time::Duration,
25};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey};
30use consensus_types::block::{BlockRef, Round};
31use futures::Stream;
32use mysten_network::{Multiaddr, multiaddr::Protocol};
33
34use crate::{
35    block::{ExtendedBlock, VerifiedBlock},
36    commit::{CommitRange, TrustedCommit},
37    context::Context,
38    error::ConsensusResult,
39};
40
41/// Identifies an observer node by its network public key.
42#[allow(unused)]
43pub(crate) type NodeId = NetworkPublicKey;
44
45/// Identifies a peer in the network, which can be either a validator or an observer.
46#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
47pub(crate) enum PeerId {
48    /// A validator node identified by its authority index.
49    Validator(AuthorityIndex),
50    /// An observer node identified by its network public key.
51    #[allow(dead_code)]
52    Observer(NodeId),
53}
54
55impl Display for PeerId {
56    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57        match self {
58            PeerId::Validator(authority) => write!(f, "[{}]", authority),
59            PeerId::Observer(node_id) => write!(f, "[{:?}]", node_id),
60        }
61    }
62}
63
64// Tonic generated RPC stubs.
65mod tonic_gen {
66    include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
67    include!(concat!(env!("OUT_DIR"), "/consensus.ObserverService.rs"));
68}
69
70mod clients;
71pub(crate) mod metrics;
72mod metrics_layer;
73#[cfg(all(test, not(msim)))]
74mod network_tests;
75#[cfg(not(msim))]
76pub(crate) mod observer;
77#[cfg(msim)]
78pub mod observer;
79#[cfg(test)]
80pub(crate) mod test_network;
81#[cfg(not(msim))]
82pub(crate) mod tonic_network;
83#[cfg(msim)]
84pub mod tonic_network;
85mod tonic_tls;
86
87/// A stream of serialized filtered blocks returned over the network.
88pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
89
90/// Validator network client for communicating with validator peers.
91///
92/// NOTE: the timeout parameters help saving resources at client and potentially server.
93/// But it is up to the server implementation if the timeout is honored.
94/// - To bound server resources, server should implement own timeout for incoming requests.
95#[async_trait]
96pub(crate) trait ValidatorNetworkClient: Send + Sync + Sized + 'static {
97    /// Subscribes to blocks from a peer after last_received round.
98    async fn subscribe_blocks(
99        &self,
100        peer: AuthorityIndex,
101        last_received: Round,
102        timeout: Duration,
103    ) -> ConsensusResult<BlockStream>;
104
105    // TODO: add a parameter for maximum total size of blocks returned.
106    /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
107    /// of the requested blocks according to the provided `fetch_after_rounds`. The `fetch_after_rounds`
108    /// length should be equal to the committee size. If `fetch_after_rounds` is empty then it will
109    /// be simply ignored.
110    async fn fetch_blocks(
111        &self,
112        peer: AuthorityIndex,
113        block_refs: Vec<BlockRef>,
114        fetch_after_rounds: Vec<Round>,
115        fetch_missing_ancestors: bool,
116        timeout: Duration,
117    ) -> ConsensusResult<Vec<Bytes>>;
118
119    /// Fetches serialized commits in the commit range from a peer.
120    /// Returns a tuple of both the serialized commits, and serialized blocks that contain
121    /// votes certifying the last commit.
122    async fn fetch_commits(
123        &self,
124        peer: AuthorityIndex,
125        commit_range: CommitRange,
126        timeout: Duration,
127    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
128
129    /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
130    /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
131    /// blocks per peer as its possible to have equivocations.
132    async fn fetch_latest_blocks(
133        &self,
134        peer: AuthorityIndex,
135        authorities: Vec<AuthorityIndex>,
136        timeout: Duration,
137    ) -> ConsensusResult<Vec<Bytes>>;
138
139    /// Gets the latest received & accepted rounds of all authorities from the peer.
140    async fn get_latest_rounds(
141        &self,
142        peer: AuthorityIndex,
143        timeout: Duration,
144    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
145
146    /// Sends a serialized SignedBlock to a peer.
147    #[cfg(test)]
148    async fn send_block(
149        &self,
150        peer: AuthorityIndex,
151        block: &VerifiedBlock,
152        timeout: Duration,
153    ) -> ConsensusResult<()>;
154}
155
156/// Validator network service for handling requests from validator peers.
157#[async_trait]
158pub(crate) trait ValidatorNetworkService: Send + Sync + 'static {
159    /// Handles the block sent from the peer via either unicast RPC or subscription stream.
160    /// Peer value can be trusted to be a valid authority index.
161    /// But serialized_block must be verified before its contents are trusted.
162    /// Excluded ancestors are also included as part of an effort to further propagate
163    /// blocks to peers despite the current exclusion.
164    async fn handle_send_block(
165        &self,
166        peer: AuthorityIndex,
167        block: ExtendedSerializedBlock,
168    ) -> ConsensusResult<()>;
169
170    /// Handles the subscription request from the peer.
171    /// A stream of newly proposed blocks is returned to the peer.
172    /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
173    /// occurs.
174    async fn handle_subscribe_blocks(
175        &self,
176        peer: AuthorityIndex,
177        last_received: Round,
178    ) -> ConsensusResult<BlockStream>;
179
180    /// Handles the request to fetch blocks by references from the peer.
181    async fn handle_fetch_blocks(
182        &self,
183        peer: AuthorityIndex,
184        block_refs: Vec<BlockRef>,
185        fetch_after_rounds: Vec<Round>,
186        fetch_missing_ancestors: bool,
187    ) -> ConsensusResult<Vec<Bytes>>;
188
189    /// Handles the request to fetch commits by index range from the peer.
190    async fn handle_fetch_commits(
191        &self,
192        peer: AuthorityIndex,
193        commit_range: CommitRange,
194    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
195
196    /// Handles the request to fetch the latest block for the provided `authorities`.
197    async fn handle_fetch_latest_blocks(
198        &self,
199        peer: AuthorityIndex,
200        authorities: Vec<AuthorityIndex>,
201    ) -> ConsensusResult<Vec<Bytes>>;
202
203    /// Handles the request to get the latest received & accepted rounds of all authorities.
204    async fn handle_get_latest_rounds(
205        &self,
206        peer: AuthorityIndex,
207    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
208}
209
210/// A stream item for observer block streaming that includes both the block and highest commit index.
211pub(crate) struct ObserverBlockStreamItem {
212    pub(crate) block: Bytes,
213    pub(crate) highest_commit_index: u64,
214}
215
216/// Observer block stream type.
217pub(crate) type ObserverBlockStream =
218    Pin<Box<dyn Stream<Item = ObserverBlockStreamItem> + Send + 'static>>;
219
220/// Observer block request stream type for bidirectional streaming.
221#[allow(dead_code)]
222pub(crate) type BlockRequestStream =
223    Pin<Box<dyn Stream<Item = crate::network::observer::BlockStreamRequest> + Send + 'static>>;
224
225/// Observer network service for handling requests from observer nodes.
226/// Unlike ValidatorNetworkService which uses AuthorityIndex, this uses NodeId (NetworkPublicKey)
227/// to identify peers since observers are not part of the committee.
228#[async_trait]
229#[allow(dead_code)]
230pub(crate) trait ObserverNetworkService: Send + Sync + 'static {
231    /// Handles the block streaming request from an observer peer.
232    /// Returns a stream of blocks with the highest commit index for each block.
233    /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
234    async fn handle_stream_blocks(
235        &self,
236        peer: NodeId,
237        highest_round_per_authority: Vec<u64>,
238    ) -> ConsensusResult<ObserverBlockStream>;
239
240    /// Handles the request to fetch blocks by references from an observer peer.
241    #[allow(unused)]
242    async fn handle_fetch_blocks(
243        &self,
244        peer: NodeId,
245        block_refs: Vec<BlockRef>,
246    ) -> ConsensusResult<Vec<Bytes>>;
247
248    /// Handles the request to fetch commits by index range from an observer peer.
249    #[allow(unused)]
250    /// Returns serialized commits and certifier blocks.
251    async fn handle_fetch_commits(
252        &self,
253        peer: NodeId,
254        commit_range: CommitRange,
255    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
256}
257
258/// Observer network client for communicating with validators' observer ports or other observers.
259/// Unlike ValidatorNetworkClient which uses AuthorityIndex, this uses PeerId to identify peers
260/// since the observer server can serve both validators and observer nodes.
261#[async_trait]
262#[allow(dead_code)]
263pub(crate) trait ObserverNetworkClient: Send + Sync + Sized + 'static {
264    /// Initiates block streaming with a peer (validator or observer).
265    /// Returns a stream of blocks with the highest commit index.
266    /// Blocks with rounds higher than the highest_round_per_authority will be streamed.
267    async fn stream_blocks(
268        &self,
269        peer: PeerId,
270        highest_round_per_authority: Vec<u64>,
271        timeout: Duration,
272    ) -> ConsensusResult<ObserverBlockStream>;
273
274    /// Fetches serialized blocks by references from a peer.
275    async fn fetch_blocks(
276        &self,
277        peer: PeerId,
278        block_refs: Vec<BlockRef>,
279        timeout: Duration,
280    ) -> ConsensusResult<Vec<Bytes>>;
281
282    /// Fetches serialized commits in the commit range from a peer.
283    /// Returns a tuple of both the serialized commits, and serialized blocks that contain
284    /// votes certifying the last commit.
285    async fn fetch_commits(
286        &self,
287        peer: PeerId,
288        commit_range: CommitRange,
289        timeout: Duration,
290    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
291}
292
293/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
294/// Dropping `NetworkManager` will shutdown the network service.
295pub(crate) trait NetworkManager: Send + Sync {
296    type ValidatorClient: ValidatorNetworkClient;
297    type ObserverClient: ObserverNetworkClient;
298
299    /// Creates a new network manager.
300    fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
301
302    /// Returns the validator network client.
303    fn validator_client(&self) -> Arc<Self::ValidatorClient>;
304
305    /// Returns the observer network client.
306    #[allow(dead_code)]
307    fn observer_client(&self) -> Arc<Self::ObserverClient>;
308
309    /// Starts the validator network server with the provided service.
310    async fn start_validator_server<V>(&mut self, service: Arc<V>)
311    where
312        V: ValidatorNetworkService;
313
314    /// Starts the observer network server with the provided service.
315    async fn start_observer_server<O>(&mut self, service: Arc<O>)
316    where
317        O: ObserverNetworkService;
318
319    /// Stops the network service.
320    async fn stop(&mut self);
321
322    /// Updates the network address for a peer identified by their authority index.
323    /// If address is None, the override is cleared and the committee address will be used.
324    fn update_peer_address(&self, peer: AuthorityIndex, address: Option<Multiaddr>);
325}
326
327// Re-export the concrete client implementations.
328pub(crate) use clients::{CommitSyncerClient, SynchronizerClient};
329
330/// Serialized block with extended information from the proposing authority.
331#[derive(Clone, PartialEq, Eq, Debug)]
332pub(crate) struct ExtendedSerializedBlock {
333    pub(crate) block: Bytes,
334    // Serialized BlockRefs that are excluded from the blocks ancestors.
335    pub(crate) excluded_ancestors: Vec<Vec<u8>>,
336}
337
338impl From<ExtendedBlock> for ExtendedSerializedBlock {
339    fn from(extended_block: ExtendedBlock) -> Self {
340        Self {
341            block: extended_block.block.serialized().clone(),
342            excluded_ancestors: extended_block
343                .excluded_ancestors
344                .iter()
345                .filter_map(|r| match bcs::to_bytes(r) {
346                    Ok(serialized) => Some(serialized),
347                    Err(e) => {
348                        tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
349                        None
350                    }
351                })
352                .collect(),
353        }
354    }
355}
356
357/// Attempts to convert a multiaddr of the form `/[ip4,ip6,dns]/{}/udp/{port}` into
358/// a host:port string.
359pub(crate) fn to_host_port_str(addr: &Multiaddr) -> Result<String, String> {
360    let mut iter = addr.iter();
361
362    match (iter.next(), iter.next()) {
363        (Some(Protocol::Ip4(ipaddr)), Some(Protocol::Udp(port))) => {
364            Ok(format!("{}:{}", ipaddr, port))
365        }
366        (Some(Protocol::Ip6(ipaddr)), Some(Protocol::Udp(port))) => {
367            Ok(format!("{}", SocketAddrV6::new(ipaddr, port, 0, 0)))
368        }
369        (Some(Protocol::Dns(hostname)), Some(Protocol::Udp(port))) => {
370            Ok(format!("{}:{}", hostname, port))
371        }
372
373        _ => Err(format!("unsupported multiaddr: {addr}")),
374    }
375}