consensus_core/network/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

//! This module defines the network interface, and provides network implementations for the
//! consensus protocol.
//!
//! Having an abstract network interface allows
//! - simplying the semantics of sending data and serving requests over the network
//! - hiding implementation specific types and semantics from the consensus protocol
//! - allowing easy swapping of network implementations, for better performance or testing
//!
//! When modifying the client and server interfaces, the principle is to keep the interfaces
//! low level, close to underlying implementations in semantics. For example, the client interface
//! exposes sending messages to a specific peer, instead of broadcasting to all peers. Subscribing
//! to a stream of blocks gets back the stream via response, instead of delivering the stream
//! directly to the server. This keeps the logic agnostics to the underlying network outside of
//! this module, so they can be reused easily across network implementations.

use std::{pin::Pin, sync::Arc, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
use consensus_config::{AuthorityIndex, NetworkKeyPair};
use futures::Stream;

use crate::{
    block::{BlockRef, ExtendedBlock, VerifiedBlock},
    commit::{CommitRange, TrustedCommit},
    context::Context,
    error::ConsensusResult,
    Round,
};

// Anemo generated RPC stubs.
mod anemo_gen {
    include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusRpc.rs"));
}

// Tonic generated RPC stubs.
mod tonic_gen {
    include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
}

pub mod connection_monitor;

pub(crate) mod anemo_network;
pub(crate) mod epoch_filter;
pub(crate) mod metrics;
mod metrics_layer;
#[cfg(all(test, not(msim)))]
mod network_tests;
#[cfg(test)]
pub(crate) mod test_network;
#[cfg(not(msim))]
pub(crate) mod tonic_network;
#[cfg(msim)]
pub mod tonic_network;
mod tonic_tls;

/// A stream of serialized filtered blocks returned over the network.
pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;

/// Network client for communicating with peers.
///
/// NOTE: the timeout parameters help saving resources at client and potentially server.
/// But it is up to the server implementation if the timeout is honored.
/// - To bound server resources, server should implement own timeout for incoming requests.
#[async_trait]
pub(crate) trait NetworkClient: Send + Sync + Sized + 'static {
    // Whether the network client streams blocks to subscribed peers.
    const SUPPORT_STREAMING: bool;

    /// Sends a serialized SignedBlock to a peer.
    async fn send_block(
        &self,
        peer: AuthorityIndex,
        block: &VerifiedBlock,
        timeout: Duration,
    ) -> ConsensusResult<()>;

    /// Subscribes to blocks from a peer after last_received round.
    async fn subscribe_blocks(
        &self,
        peer: AuthorityIndex,
        last_received: Round,
        timeout: Duration,
    ) -> ConsensusResult<BlockStream>;

    // TODO: add a parameter for maximum total size of blocks returned.
    /// Fetches serialized `SignedBlock`s from a peer. It also might return additional ancestor blocks
    /// of the requested blocks according to the provided `highest_accepted_rounds`. The `highest_accepted_rounds`
    /// length should be equal to the committee size. If `highest_accepted_rounds` is empty then it will
    /// be simply ignored.
    async fn fetch_blocks(
        &self,
        peer: AuthorityIndex,
        block_refs: Vec<BlockRef>,
        highest_accepted_rounds: Vec<Round>,
        timeout: Duration,
    ) -> ConsensusResult<Vec<Bytes>>;

    /// Fetches serialized commits in the commit range from a peer.
    /// Returns a tuple of both the serialized commits, and serialized blocks that contain
    /// votes certifying the last commit.
    async fn fetch_commits(
        &self,
        peer: AuthorityIndex,
        commit_range: CommitRange,
        timeout: Duration,
    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;

    /// Fetches the latest block from `peer` for the requested `authorities`. The latest blocks
    /// are returned in the serialised format of `SignedBlocks`. The method can return multiple
    /// blocks per peer as its possible to have equivocations.
    async fn fetch_latest_blocks(
        &self,
        peer: AuthorityIndex,
        authorities: Vec<AuthorityIndex>,
        timeout: Duration,
    ) -> ConsensusResult<Vec<Bytes>>;

    /// Gets the latest received & accepted rounds of all authorities from the peer.
    async fn get_latest_rounds(
        &self,
        peer: AuthorityIndex,
        timeout: Duration,
    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
}

/// Network service for handling requests from peers.
/// NOTE: using `async_trait` macro because `NetworkService` methods are called in the trait impl
/// of `anemo_gen::ConsensusRpc`, which itself is annotated with `async_trait`.
#[async_trait]
pub(crate) trait NetworkService: Send + Sync + 'static {
    /// Handles the block sent from the peer via either unicast RPC or subscription stream.
    /// Peer value can be trusted to be a valid authority index.
    /// But serialized_block must be verified before its contents are trusted.
    /// Excluded ancestors are also included as part of an effort to further propagate
    /// blocks to peers despite the current exclusion.
    async fn handle_send_block(
        &self,
        peer: AuthorityIndex,
        block: ExtendedSerializedBlock,
    ) -> ConsensusResult<()>;

    /// Handles the subscription request from the peer.
    /// A stream of newly proposed blocks is returned to the peer.
    /// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
    /// occurs.
    async fn handle_subscribe_blocks(
        &self,
        peer: AuthorityIndex,
        last_received: Round,
    ) -> ConsensusResult<BlockStream>;

    /// Handles the request to fetch blocks by references from the peer.
    async fn handle_fetch_blocks(
        &self,
        peer: AuthorityIndex,
        block_refs: Vec<BlockRef>,
        highest_accepted_rounds: Vec<Round>,
    ) -> ConsensusResult<Vec<Bytes>>;

    /// Handles the request to fetch commits by index range from the peer.
    async fn handle_fetch_commits(
        &self,
        peer: AuthorityIndex,
        commit_range: CommitRange,
    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;

    /// Handles the request to fetch the latest block for the provided `authorities`.
    async fn handle_fetch_latest_blocks(
        &self,
        peer: AuthorityIndex,
        authorities: Vec<AuthorityIndex>,
    ) -> ConsensusResult<Vec<Bytes>>;

    /// Handles the request to get the latest received & accepted rounds of all authorities.
    async fn handle_get_latest_rounds(
        &self,
        peer: AuthorityIndex,
    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
}

/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
/// Dropping `NetworkManager` will shutdown the network service.
pub(crate) trait NetworkManager<S>: Send + Sync
where
    S: NetworkService,
{
    type Client: NetworkClient;

    /// Creates a new network manager.
    fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;

    /// Returns the network client.
    fn client(&self) -> Arc<Self::Client>;

    /// Installs network service.
    async fn install_service(&mut self, service: Arc<S>);

    /// Stops the network service.
    async fn stop(&mut self);
}

/// Serialized block with extended information from the proposing authority.
#[derive(Clone, PartialEq, Eq, Debug)]
pub(crate) struct ExtendedSerializedBlock {
    pub(crate) block: Bytes,
    // Serialized BlockRefs that are excluded from the blocks ancestors.
    pub(crate) excluded_ancestors: Vec<Vec<u8>>,
}

impl From<ExtendedBlock> for ExtendedSerializedBlock {
    fn from(extended_block: ExtendedBlock) -> Self {
        Self {
            block: extended_block.block.serialized().clone(),
            excluded_ancestors: extended_block
                .excluded_ancestors
                .iter()
                .filter_map(|r| match bcs::to_bytes(r) {
                    Ok(serialized) => Some(serialized),
                    Err(e) => {
                        tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
                        None
                    }
                })
                .collect(),
        }
    }
}