use std::{
collections::{BTreeMap, BTreeSet, HashMap},
sync::Arc,
time::Duration,
};
use bytes::Bytes;
use consensus_config::AuthorityIndex;
use futures::{stream::FuturesUnordered, StreamExt as _};
use itertools::Itertools as _;
use mysten_metrics::{
monitored_future,
monitored_mpsc::{channel, Receiver, Sender},
monitored_scope,
};
use parking_lot::{Mutex, RwLock};
use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
use sui_macros::fail_point_async;
use tap::TapFallible;
use tokio::{
runtime::Handle,
sync::{mpsc::error::TrySendError, oneshot},
task::{JoinError, JoinSet},
time::{sleep, sleep_until, timeout, Instant},
};
use tracing::{debug, error, info, trace, warn};
use crate::{
authority_service::COMMIT_LAG_MULTIPLIER, core_thread::CoreThreadDispatcher,
transaction_certifier::TransactionCertifier,
};
use crate::{
block::{BlockRef, SignedBlock, VerifiedBlock},
block_verifier::BlockVerifier,
commit_vote_monitor::CommitVoteMonitor,
context::Context,
dag_state::DagState,
error::{ConsensusError, ConsensusResult},
network::NetworkClient,
BlockAPI, CommitIndex, Round,
};
const FETCH_BLOCKS_CONCURRENCY: usize = 5;
const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
const MAX_PERIODIC_SYNC_PEERS: usize = 3;
struct BlocksGuard {
map: Arc<InflightBlocksMap>,
block_refs: BTreeSet<BlockRef>,
peer: AuthorityIndex,
}
impl Drop for BlocksGuard {
fn drop(&mut self) {
self.map.unlock_blocks(&self.block_refs, self.peer);
}
}
struct InflightBlocksMap {
inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
}
impl InflightBlocksMap {
fn new() -> Arc<Self> {
Arc::new(Self {
inner: Mutex::new(HashMap::new()),
})
}
fn lock_blocks(
self: &Arc<Self>,
missing_block_refs: BTreeSet<BlockRef>,
peer: AuthorityIndex,
) -> Option<BlocksGuard> {
let mut blocks = BTreeSet::new();
let mut inner = self.inner.lock();
for block_ref in missing_block_refs {
let authorities = inner.entry(block_ref).or_default();
if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
&& authorities.get(&peer).is_none()
{
assert!(authorities.insert(peer));
blocks.insert(block_ref);
}
}
if blocks.is_empty() {
None
} else {
Some(BlocksGuard {
map: self.clone(),
block_refs: blocks,
peer,
})
}
}
fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
let mut blocks_to_fetch = self.inner.lock();
for block_ref in block_refs {
let authorities = blocks_to_fetch
.get_mut(block_ref)
.expect("Should have found a non empty map");
assert!(authorities.remove(&peer), "Peer index should be present!");
if authorities.is_empty() {
blocks_to_fetch.remove(block_ref);
}
}
}
fn swap_locks(
self: &Arc<Self>,
blocks_guard: BlocksGuard,
peer: AuthorityIndex,
) -> Option<BlocksGuard> {
let block_refs = blocks_guard.block_refs.clone();
drop(blocks_guard);
self.lock_blocks(block_refs, peer)
}
#[cfg(test)]
fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
let inner = self.inner.lock();
inner.len()
}
}
enum Command {
FetchBlocks {
missing_block_refs: BTreeSet<BlockRef>,
peer_index: AuthorityIndex,
result: oneshot::Sender<Result<(), ConsensusError>>,
},
FetchOwnLastBlock,
KickOffScheduler,
}
pub(crate) struct SynchronizerHandle {
commands_sender: Sender<Command>,
tasks: tokio::sync::Mutex<JoinSet<()>>,
}
impl SynchronizerHandle {
pub(crate) async fn fetch_blocks(
&self,
missing_block_refs: BTreeSet<BlockRef>,
peer_index: AuthorityIndex,
) -> ConsensusResult<()> {
let (sender, receiver) = oneshot::channel();
self.commands_sender
.send(Command::FetchBlocks {
missing_block_refs,
peer_index,
result: sender,
})
.await
.map_err(|_err| ConsensusError::Shutdown)?;
receiver.await.map_err(|_err| ConsensusError::Shutdown)?
}
pub(crate) async fn stop(&self) -> Result<(), JoinError> {
let mut tasks = self.tasks.lock().await;
tasks.abort_all();
while let Some(result) = tasks.join_next().await {
result?
}
Ok(())
}
}
pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
context: Arc<Context>,
commands_receiver: Receiver<Command>,
fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
core_dispatcher: Arc<D>,
commit_vote_monitor: Arc<CommitVoteMonitor>,
dag_state: Arc<RwLock<DagState>>,
fetch_blocks_scheduler_task: JoinSet<()>,
fetch_own_last_block_task: JoinSet<()>,
network_client: Arc<C>,
block_verifier: Arc<V>,
transaction_certifier: TransactionCertifier,
inflight_blocks_map: Arc<InflightBlocksMap>,
commands_sender: Sender<Command>,
}
impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
pub(crate) fn start(
network_client: Arc<C>,
context: Arc<Context>,
core_dispatcher: Arc<D>,
commit_vote_monitor: Arc<CommitVoteMonitor>,
block_verifier: Arc<V>,
transaction_certifier: TransactionCertifier,
dag_state: Arc<RwLock<DagState>>,
sync_last_known_own_block: bool,
) -> Arc<SynchronizerHandle> {
let (commands_sender, commands_receiver) =
channel("consensus_synchronizer_commands", 1_000);
let inflight_blocks_map = InflightBlocksMap::new();
let mut fetch_block_senders = BTreeMap::new();
let mut tasks = JoinSet::new();
for (index, _) in context.committee.authorities() {
if index == context.own_index {
continue;
}
let (sender, receiver) =
channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
index,
network_client.clone(),
block_verifier.clone(),
transaction_certifier.clone(),
commit_vote_monitor.clone(),
context.clone(),
core_dispatcher.clone(),
dag_state.clone(),
receiver,
commands_sender.clone(),
);
tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
fetch_block_senders.insert(index, sender);
}
let commands_sender_clone = commands_sender.clone();
if sync_last_known_own_block {
commands_sender
.try_send(Command::FetchOwnLastBlock)
.expect("Failed to sync our last block");
}
tasks.spawn(monitored_future!(async move {
let mut s = Self {
context,
commands_receiver,
fetch_block_senders,
core_dispatcher,
commit_vote_monitor,
fetch_blocks_scheduler_task: JoinSet::new(),
fetch_own_last_block_task: JoinSet::new(),
network_client,
block_verifier,
transaction_certifier,
inflight_blocks_map,
commands_sender: commands_sender_clone,
dag_state,
};
s.run().await;
}));
Arc::new(SynchronizerHandle {
commands_sender,
tasks: tokio::sync::Mutex::new(tasks),
})
}
async fn run(&mut self) {
const SYNCHRONIZER_TIMEOUT: Duration = Duration::from_millis(500);
let scheduler_timeout = sleep_until(Instant::now() + SYNCHRONIZER_TIMEOUT);
tokio::pin!(scheduler_timeout);
loop {
tokio::select! {
Some(command) = self.commands_receiver.recv() => {
match command {
Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
if peer_index == self.context.own_index {
error!("We should never attempt to fetch blocks from our own node");
continue;
}
let missing_block_refs = missing_block_refs
.into_iter()
.take(self.context.parameters.max_blocks_per_sync)
.collect();
let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
let Some(blocks_guard) = blocks_guard else {
result.send(Ok(())).ok();
continue;
};
let r = self
.fetch_block_senders
.get(&peer_index)
.expect("Fatal error, sender should be present")
.try_send(blocks_guard)
.map_err(|err| {
match err {
TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index),
TrySendError::Closed(_) => ConsensusError::Shutdown
}
});
result.send(r).ok();
}
Command::FetchOwnLastBlock => {
if self.fetch_own_last_block_task.is_empty() {
self.start_fetch_own_last_block_task();
}
}
Command::KickOffScheduler => {
let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
Instant::now()
} else {
Instant::now() + SYNCHRONIZER_TIMEOUT.checked_div(2).unwrap()
};
if timeout < scheduler_timeout.deadline() {
scheduler_timeout.as_mut().reset(timeout);
}
}
}
},
Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
match result {
Ok(()) => {},
Err(e) => {
if e.is_cancelled() {
} else if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
panic!("fetch our last block task failed: {e}");
}
},
};
},
Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
match result {
Ok(()) => {},
Err(e) => {
if e.is_cancelled() {
} else if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
panic!("fetch blocks scheduler task failed: {e}");
}
},
};
},
() = &mut scheduler_timeout => {
if self.fetch_blocks_scheduler_task.is_empty() {
if let Err(err) = self.start_fetch_missing_blocks_task().await {
debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
return;
};
}
scheduler_timeout
.as_mut()
.reset(Instant::now() + SYNCHRONIZER_TIMEOUT);
}
}
}
}
async fn fetch_blocks_from_authority(
peer_index: AuthorityIndex,
network_client: Arc<C>,
block_verifier: Arc<V>,
transaction_certifier: TransactionCertifier,
commit_vote_monitor: Arc<CommitVoteMonitor>,
context: Arc<Context>,
core_dispatcher: Arc<D>,
dag_state: Arc<RwLock<DagState>>,
mut receiver: Receiver<BlocksGuard>,
commands_sender: Sender<Command>,
) {
const MAX_RETRIES: u32 = 5;
let peer_hostname = &context.committee.authority(peer_index).hostname;
let mut requests = FuturesUnordered::new();
loop {
tokio::select! {
Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, 1))
},
Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
match response {
Ok(blocks) => {
if let Err(err) = Self::process_fetched_blocks(blocks,
peer_index,
blocks_guard,
core_dispatcher.clone(),
block_verifier.clone(),
transaction_certifier.clone(),
commit_vote_monitor.clone(),
context.clone(),
commands_sender.clone(),
"live"
).await {
warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
}
},
Err(_) => {
context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname, "live"]).inc();
if retries <= MAX_RETRIES {
requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
} else {
warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
drop(blocks_guard);
}
}
}
},
else => {
info!("Fetching blocks from authority {peer_index} task will now abort.");
break;
}
}
}
}
async fn process_fetched_blocks(
serialized_blocks: Vec<Bytes>,
peer_index: AuthorityIndex,
requested_blocks_guard: BlocksGuard,
core_dispatcher: Arc<D>,
block_verifier: Arc<V>,
transaction_certifier: TransactionCertifier,
commit_vote_monitor: Arc<CommitVoteMonitor>,
context: Arc<Context>,
commands_sender: Sender<Command>,
sync_method: &str,
) -> ConsensusResult<()> {
if serialized_blocks.is_empty() {
return Ok(());
}
const MAX_ADDITIONAL_BLOCKS: usize = 10;
if serialized_blocks.len() > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
{
return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
}
let blocks = Handle::current()
.spawn_blocking({
let block_verifier = block_verifier.clone();
let context = context.clone();
move || {
Self::verify_blocks(
serialized_blocks,
block_verifier,
transaction_certifier,
&context,
peer_index,
)
}
})
.await
.expect("Spawn blocking should not fail")?;
let ancestors = blocks
.iter()
.filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
.flat_map(|b| b.ancestors().to_vec())
.collect::<BTreeSet<BlockRef>>();
for block in &blocks {
if !requested_blocks_guard
.block_refs
.contains(&block.reference())
&& !ancestors.contains(&block.reference())
{
return Err(ConsensusError::UnexpectedFetchedBlock {
index: peer_index,
block_ref: block.reference(),
});
}
}
for block in &blocks {
commit_vote_monitor.observe_block(block);
}
let metrics = &context.metrics.node_metrics;
let peer_hostname = &context.committee.authority(peer_index).hostname;
metrics
.synchronizer_fetched_blocks_by_peer
.with_label_values(&[peer_hostname, sync_method])
.inc_by(blocks.len() as u64);
for block in &blocks {
let block_hostname = &context.committee.authority(block.author()).hostname;
metrics
.synchronizer_fetched_blocks_by_authority
.with_label_values(&[block_hostname, sync_method])
.inc();
}
debug!(
"Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
blocks.len(),
blocks.iter().map(|b| b.reference().to_string()).join(", "),
);
let missing_blocks = core_dispatcher
.add_blocks(blocks)
.await
.map_err(|_| ConsensusError::Shutdown)?;
drop(requested_blocks_guard);
if !missing_blocks.is_empty() {
if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
{
warn!("Commands channel is full")
}
}
context
.metrics
.node_metrics
.missing_blocks_after_fetch_total
.inc_by(missing_blocks.len() as u64);
Ok(())
}
fn get_highest_accepted_rounds(
dag_state: Arc<RwLock<DagState>>,
context: &Arc<Context>,
) -> Vec<Round> {
let blocks = dag_state
.read()
.get_last_cached_block_per_authority(Round::MAX);
assert_eq!(blocks.len(), context.committee.size());
blocks
.into_iter()
.map(|(block, _)| block.round())
.collect::<Vec<_>>()
}
fn verify_blocks(
serialized_blocks: Vec<Bytes>,
block_verifier: Arc<V>,
transaction_certifier: TransactionCertifier,
context: &Context,
peer_index: AuthorityIndex,
) -> ConsensusResult<Vec<VerifiedBlock>> {
let mut verified_blocks = Vec::new();
let mut voted_blocks = Vec::new();
for serialized_block in serialized_blocks {
let signed_block: SignedBlock =
bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
let reject_txn_votes = block_verifier.verify_and_vote(&signed_block).tap_err(|e| {
let hostname = context.committee.authority(peer_index).hostname.clone();
context
.metrics
.node_metrics
.invalid_blocks
.with_label_values(&[&hostname, "synchronizer", e.clone().name()])
.inc();
info!("Invalid block received from {}: {}", peer_index, e);
})?;
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
let now = context.clock.timestamp_utc_ms();
let drift = verified_block.timestamp_ms().saturating_sub(now) as u64;
if drift > 0 {
let peer_hostname = &context
.committee
.authority(verified_block.author())
.hostname;
context
.metrics
.node_metrics
.block_timestamp_drift_ms
.with_label_values(&[peer_hostname, "synchronizer"])
.inc_by(drift);
if context
.protocol_config
.consensus_median_based_commit_timestamp()
{
trace!("Synced block {} timestamp {} is in the future (now={}). Will not ignore as median based timestamp is enabled.", verified_block.reference(), verified_block.timestamp_ms(), now);
} else {
warn!(
"Synced block {} timestamp {} is in the future (now={}). Ignoring.",
verified_block.reference(),
verified_block.timestamp_ms(),
now
);
continue;
}
}
verified_blocks.push(verified_block.clone());
voted_blocks.push((verified_block, reject_txn_votes));
}
if context.protocol_config.mysticeti_fastpath() {
transaction_certifier.add_voted_blocks(voted_blocks);
}
Ok(verified_blocks)
}
async fn fetch_blocks_request(
network_client: Arc<C>,
peer: AuthorityIndex,
blocks_guard: BlocksGuard,
highest_rounds: Vec<Round>,
request_timeout: Duration,
mut retries: u32,
) -> (
ConsensusResult<Vec<Bytes>>,
BlocksGuard,
u32,
AuthorityIndex,
Vec<Round>,
) {
let start = Instant::now();
let resp = timeout(
request_timeout,
network_client.fetch_blocks(
peer,
blocks_guard
.block_refs
.clone()
.into_iter()
.collect::<Vec<_>>(),
highest_rounds.clone().into_iter().collect::<Vec<_>>(),
request_timeout,
),
)
.await;
fail_point_async!("consensus-delay");
let resp = match resp {
Ok(Err(err)) => {
sleep_until(start + request_timeout).await;
retries += 1;
Err(err)
} Err(err) => {
sleep_until(start + request_timeout).await;
retries += 1;
Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
}
Ok(result) => result,
};
(resp, blocks_guard, retries, peer, highest_rounds)
}
fn start_fetch_own_last_block_task(&mut self) {
const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
let context = self.context.clone();
let dag_state = self.dag_state.clone();
let network_client = self.network_client.clone();
let block_verifier = self.block_verifier.clone();
let core_dispatcher = self.core_dispatcher.clone();
self.fetch_own_last_block_task
.spawn(monitored_future!(async move {
let _scope = monitored_scope("FetchOwnLastBlockTask");
let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
let network_client_cloned = network_client.clone();
let own_index = context.own_index;
async move {
sleep(fetch_own_block_delay).await;
let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
(r, authority_index)
}
};
let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
let mut result = Vec::new();
for serialized_block in blocks {
let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
block_verifier.verify_and_vote(&signed_block).tap_err(|err|{
let hostname = context.committee.authority(authority_index).hostname.clone();
context
.metrics
.node_metrics
.invalid_blocks
.with_label_values(&[&hostname, "synchronizer_own_block", err.clone().name()])
.inc();
warn!("Invalid block received from {}: {}", authority_index, err);
})?;
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
if verified_block.author() != context.own_index {
return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
}
result.push(verified_block);
}
Ok(result)
};
let mut highest_round;
let mut retries = 0;
let mut retry_delay_step = Duration::from_millis(500);
'main:loop {
if context.committee.size() == 1 {
highest_round = dag_state.read().get_last_proposed_block().round();
info!("Only one node in the network, will not try fetching own last block from peers.");
break 'main;
}
let mut total_stake = 0;
highest_round = 0;
let mut results = FuturesUnordered::new();
for (authority_index, _authority) in context.committee.authorities() {
if authority_index != context.own_index {
results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
}
}
let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
tokio::pin!(timer);
'inner: loop {
tokio::select! {
result = results.next() => {
let Some((result, authority_index)) = result else {
break 'inner;
};
match result {
Ok(result) => {
match process_blocks(result, authority_index) {
Ok(blocks) => {
let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
highest_round = highest_round.max(max_round);
total_stake += context.committee.stake(authority_index);
},
Err(err) => {
warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
}
}
},
Err(err) => {
warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
}
}
},
() = &mut timer => {
info!("Timeout while trying to sync our own last block from peers");
break 'inner;
}
}
}
if context.committee.reached_validity(total_stake) {
info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
break 'main;
}
retries += 1;
context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
sleep(retry_delay_step).await;
retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
}
context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
}
}));
}
async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
let mut missing_blocks = self
.core_dispatcher
.get_missing_blocks()
.await
.map_err(|_err| ConsensusError::Shutdown)?;
if missing_blocks.is_empty() {
return Ok(());
}
let context = self.context.clone();
let network_client = self.network_client.clone();
let block_verifier = self.block_verifier.clone();
let transaction_certifier = self.transaction_certifier.clone();
let commit_vote_monitor = self.commit_vote_monitor.clone();
let core_dispatcher = self.core_dispatcher.clone();
let blocks_to_fetch = self.inflight_blocks_map.clone();
let commands_sender = self.commands_sender.clone();
let dag_state = self.dag_state.clone();
let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
if commit_lagging {
if dag_state.read().gc_enabled() {
return Ok(());
}
let highest_accepted_round = dag_state.read().highest_accepted_round();
missing_blocks = missing_blocks
.into_iter()
.take_while(|b| {
b.round <= highest_accepted_round + self.missing_block_round_threshold()
})
.collect::<BTreeSet<_>>();
if missing_blocks.is_empty() {
trace!("Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future.");
self.context
.metrics
.node_metrics
.fetch_blocks_scheduler_skipped
.with_label_values(&["commit_lagging"])
.inc();
return Ok(());
}
}
self.fetch_blocks_scheduler_task
.spawn(monitored_future!(async move {
let _scope = monitored_scope("FetchMissingBlocksScheduler");
context.metrics.node_metrics.fetch_blocks_scheduler_inflight.inc();
let total_requested = missing_blocks.len();
fail_point_async!("consensus-delay");
let results = if context.protocol_config.consensus_batched_block_sync() {
Self::fetch_blocks_from_authorities(context.clone(), blocks_to_fetch.clone(), network_client, missing_blocks, dag_state).await
} else {
Self::fetch_blocks_from_authorities_old(context.clone(), blocks_to_fetch.clone(), network_client, missing_blocks, dag_state).await
};
context.metrics.node_metrics.fetch_blocks_scheduler_inflight.dec();
if results.is_empty() {
return;
}
let mut total_fetched = 0;
for (blocks_guard, fetched_blocks, peer) in results {
total_fetched += fetched_blocks.len();
if let Err(err) = Self::process_fetched_blocks(fetched_blocks, peer, blocks_guard, core_dispatcher.clone(), block_verifier.clone(), transaction_certifier.clone(), commit_vote_monitor.clone(), context.clone(), commands_sender.clone(), "periodic").await {
warn!("Error occurred while processing fetched blocks from peer {peer}: {err}");
}
}
debug!("Total blocks requested to fetch: {}, total fetched: {}", total_requested, total_fetched);
}));
Ok(())
}
fn missing_block_round_threshold(&self) -> Round {
self.context.parameters.commit_sync_batch_size
}
fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
let last_commit_index = self.dag_state.read().last_commit_index();
let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
let commit_threshold = last_commit_index
+ self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
(
commit_threshold < quorum_commit_index,
last_commit_index,
quorum_commit_index,
)
}
async fn fetch_blocks_from_authorities_old(
context: Arc<Context>,
inflight_blocks: Arc<InflightBlocksMap>,
network_client: Arc<C>,
missing_blocks: BTreeSet<BlockRef>,
dag_state: Arc<RwLock<DagState>>,
) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
const MAX_PEERS: usize = 3;
const MAX_BLOCKS_PER_FETCH: usize = 32;
let missing_blocks = missing_blocks
.into_iter()
.take(MAX_PEERS * MAX_BLOCKS_PER_FETCH)
.collect::<Vec<_>>();
let mut missing_blocks_per_authority = vec![0; context.committee.size()];
for block in &missing_blocks {
missing_blocks_per_authority[block.author] += 1;
}
for (missing, (_, authority)) in missing_blocks_per_authority
.into_iter()
.zip(context.committee.authorities())
{
context
.metrics
.node_metrics
.synchronizer_missing_blocks_by_authority
.with_label_values(&[&authority.hostname])
.inc_by(missing as u64);
context
.metrics
.node_metrics
.synchronizer_current_missing_blocks_by_authority
.with_label_values(&[&authority.hostname])
.set(missing as i64);
}
let mut peers = context
.committee
.authorities()
.filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
.collect::<Vec<_>>();
if cfg!(not(test)) {
peers.shuffle(&mut ThreadRng::default());
}
let mut peers = peers.into_iter();
let mut request_futures = FuturesUnordered::new();
let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
for blocks in missing_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
let peer = peers
.next()
.expect("Possible misconfiguration as a peer should be found");
let peer_hostname = &context.committee.authority(peer).hostname;
let block_refs = blocks.iter().cloned().collect::<BTreeSet<_>>();
if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
info!(
"Periodic sync of {} missing blocks from peer {} {}: {}",
block_refs.len(),
peer,
peer_hostname,
block_refs
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join(", ")
);
request_futures.push(Self::fetch_blocks_request(
network_client.clone(),
peer,
blocks_guard,
highest_rounds.clone(),
FETCH_REQUEST_TIMEOUT,
1,
));
}
}
let mut results = Vec::new();
let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
tokio::pin!(fetcher_timeout);
loop {
tokio::select! {
Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
let peer_hostname = &context.committee.authority(peer_index).hostname;
match response {
Ok(fetched_blocks) => {
results.push((blocks_guard, fetched_blocks, peer_index));
if request_futures.is_empty() {
break;
}
},
Err(_) => {
context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname, "periodic_old"]).inc();
if let Some(next_peer) = peers.next() {
if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
info!(
"Retrying syncing {} missing blocks from peer {}: {}",
blocks_guard.block_refs.len(),
peer_hostname,
blocks_guard.block_refs
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join(", ")
);
request_futures.push(Self::fetch_blocks_request(
network_client.clone(),
next_peer,
blocks_guard,
highest_rounds,
FETCH_REQUEST_TIMEOUT,
1,
));
} else {
debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
}
} else {
debug!("No more peers left to fetch blocks");
}
}
}
},
_ = &mut fetcher_timeout => {
debug!("Timed out while fetching missing blocks");
break;
}
}
}
results
}
async fn fetch_blocks_from_authorities(
context: Arc<Context>,
inflight_blocks: Arc<InflightBlocksMap>,
network_client: Arc<C>,
missing_blocks: BTreeSet<BlockRef>,
dag_state: Arc<RwLock<DagState>>,
) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
let missing_blocks = missing_blocks
.into_iter()
.take(2 * MAX_PERIODIC_SYNC_PEERS * context.parameters.max_blocks_per_sync)
.collect::<Vec<_>>();
let mut authorities = BTreeMap::<AuthorityIndex, Vec<BlockRef>>::new();
for block_ref in &missing_blocks {
authorities
.entry(block_ref.author)
.or_default()
.push(*block_ref);
}
let num_peers = authorities
.len()
.div_ceil((context.committee.size() - 1).div_ceil(MAX_PERIODIC_SYNC_PEERS));
let num_authorities_per_peer = authorities.len().div_ceil(num_peers);
let mut missing_blocks_per_authority = vec![0; context.committee.size()];
for (authority, blocks) in &authorities {
missing_blocks_per_authority[*authority] += blocks.len();
}
for (missing, (_, authority)) in missing_blocks_per_authority
.into_iter()
.zip(context.committee.authorities())
{
context
.metrics
.node_metrics
.synchronizer_missing_blocks_by_authority
.with_label_values(&[&authority.hostname])
.inc_by(missing as u64);
context
.metrics
.node_metrics
.synchronizer_current_missing_blocks_by_authority
.with_label_values(&[&authority.hostname])
.set(missing as i64);
}
let mut peers = context
.committee
.authorities()
.filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
.collect::<Vec<_>>();
if cfg!(not(test)) {
peers.shuffle(&mut ThreadRng::default());
}
let mut peers = peers.into_iter();
let mut request_futures = FuturesUnordered::new();
let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
let mut authorities = authorities.into_values().collect::<Vec<_>>();
if cfg!(not(test)) {
authorities.shuffle(&mut ThreadRng::default());
}
for batch in authorities.chunks(num_authorities_per_peer) {
let peer = peers
.next()
.expect("Possible misconfiguration as a peer should be found");
let peer_hostname = &context.committee.authority(peer).hostname;
let block_refs = batch
.iter()
.flatten()
.cloned()
.collect::<BTreeSet<_>>()
.into_iter()
.take(context.parameters.max_blocks_per_sync)
.collect::<BTreeSet<_>>();
if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
info!(
"Periodic sync of {} missing blocks from peer {} {}: {}",
block_refs.len(),
peer,
peer_hostname,
block_refs
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join(", ")
);
request_futures.push(Self::fetch_blocks_request(
network_client.clone(),
peer,
blocks_guard,
highest_rounds.clone(),
FETCH_REQUEST_TIMEOUT,
1,
));
}
}
let mut results = Vec::new();
let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
tokio::pin!(fetcher_timeout);
loop {
tokio::select! {
Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
let peer_hostname = &context.committee.authority(peer_index).hostname;
match response {
Ok(fetched_blocks) => {
results.push((blocks_guard, fetched_blocks, peer_index));
if request_futures.is_empty() {
break;
}
},
Err(_) => {
context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname, "periodic"]).inc();
if let Some(next_peer) = peers.next() {
if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
info!(
"Retrying syncing {} missing blocks from peer {}: {}",
blocks_guard.block_refs.len(),
peer_hostname,
blocks_guard.block_refs
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join(", ")
);
request_futures.push(Self::fetch_blocks_request(
network_client.clone(),
next_peer,
blocks_guard,
highest_rounds,
FETCH_REQUEST_TIMEOUT,
1,
));
} else {
debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
}
} else {
debug!("No more peers left to fetch blocks");
}
}
}
},
_ = &mut fetcher_timeout => {
debug!("Timed out while fetching missing blocks");
break;
}
}
}
results
}
}
#[cfg(test)]
mod tests {
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
time::Duration,
};
use async_trait::async_trait;
use bytes::Bytes;
use consensus_config::{AuthorityIndex, Parameters};
use mysten_metrics::monitored_mpsc;
use parking_lot::RwLock;
use tokio::{sync::Mutex, time::sleep};
use crate::{
authority_service::COMMIT_LAG_MULTIPLIER, core_thread::MockCoreThreadDispatcher,
transaction_certifier::TransactionCertifier,
};
use crate::{
block::{BlockDigest, BlockRef, Round, TestBlock, VerifiedBlock},
block_verifier::NoopBlockVerifier,
commit::CommitRange,
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::CoreThreadDispatcher,
dag_state::DagState,
error::{ConsensusError, ConsensusResult},
network::{BlockStream, NetworkClient},
storage::mem_store::MemStore,
synchronizer::{
InflightBlocksMap, Synchronizer, FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT,
},
CommitDigest, CommitIndex,
};
use crate::{
commit::{CommitVote, TrustedCommit},
BlockAPI,
};
type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
#[derive(Default)]
struct MockNetworkClient {
fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
fetch_latest_blocks_requests:
Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
}
impl MockNetworkClient {
async fn stub_fetch_blocks(
&self,
blocks: Vec<VerifiedBlock>,
peer: AuthorityIndex,
latency: Option<Duration>,
) {
let mut lock = self.fetch_blocks_requests.lock().await;
let block_refs = blocks
.iter()
.map(|block| block.reference())
.collect::<Vec<_>>();
lock.insert((block_refs, peer), (blocks, latency));
}
async fn stub_fetch_latest_blocks(
&self,
blocks: Vec<VerifiedBlock>,
peer: AuthorityIndex,
authorities: Vec<AuthorityIndex>,
latency: Option<Duration>,
) {
let mut lock = self.fetch_latest_blocks_requests.lock().await;
lock.entry((peer, authorities))
.or_default()
.push((blocks, latency));
}
async fn fetch_latest_blocks_pending_calls(&self) -> usize {
let lock = self.fetch_latest_blocks_requests.lock().await;
lock.len()
}
}
#[async_trait]
impl NetworkClient for MockNetworkClient {
const SUPPORT_STREAMING: bool = false;
async fn send_block(
&self,
_peer: AuthorityIndex,
_serialized_block: &VerifiedBlock,
_timeout: Duration,
) -> ConsensusResult<()> {
unimplemented!("Unimplemented")
}
async fn subscribe_blocks(
&self,
_peer: AuthorityIndex,
_last_received: Round,
_timeout: Duration,
) -> ConsensusResult<BlockStream> {
unimplemented!("Unimplemented")
}
async fn fetch_blocks(
&self,
peer: AuthorityIndex,
block_refs: Vec<BlockRef>,
_highest_accepted_rounds: Vec<Round>,
_timeout: Duration,
) -> ConsensusResult<Vec<Bytes>> {
let mut lock = self.fetch_blocks_requests.lock().await;
let response = lock.remove(&(block_refs.clone(), peer)).unwrap_or_else(|| {
panic!(
"Unexpected fetch blocks request made: {:?} {}. Current lock: {:?}",
block_refs, peer, lock
);
});
let serialised = response
.0
.into_iter()
.map(|block| block.serialized().clone())
.collect::<Vec<_>>();
drop(lock);
if let Some(latency) = response.1 {
sleep(latency).await;
}
Ok(serialised)
}
async fn fetch_commits(
&self,
_peer: AuthorityIndex,
_commit_range: CommitRange,
_timeout: Duration,
) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
unimplemented!("Unimplemented")
}
async fn fetch_latest_blocks(
&self,
peer: AuthorityIndex,
authorities: Vec<AuthorityIndex>,
_timeout: Duration,
) -> ConsensusResult<Vec<Bytes>> {
let mut lock = self.fetch_latest_blocks_requests.lock().await;
let mut responses = lock
.remove(&(peer, authorities.clone()))
.expect("Unexpected fetch blocks request made");
let response = responses.remove(0);
let serialised = response
.0
.into_iter()
.map(|block| block.serialized().clone())
.collect::<Vec<_>>();
if !responses.is_empty() {
lock.insert((peer, authorities), responses);
}
drop(lock);
if let Some(latency) = response.1 {
sleep(latency).await;
}
Ok(serialised)
}
async fn get_latest_rounds(
&self,
_peer: AuthorityIndex,
_timeout: Duration,
) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
unimplemented!("Unimplemented")
}
}
#[test]
fn inflight_blocks_map() {
let map = InflightBlocksMap::new();
let some_block_refs = [
BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
];
let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
{
let mut all_guards = Vec::new();
for i in 1..=2 {
let authority = AuthorityIndex::new_for_test(i);
let guard = map.lock_blocks(missing_block_refs.clone(), authority);
let guard = guard.expect("Guard should be created");
assert_eq!(guard.block_refs.len(), 4);
all_guards.push(guard);
let guard = map.lock_blocks(missing_block_refs.clone(), authority);
assert!(guard.is_none());
}
let authority_3 = AuthorityIndex::new_for_test(3);
let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
assert!(guard.is_none());
drop(all_guards.remove(0));
let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
let guard = guard.expect("Guard should be successfully acquired");
assert_eq!(guard.block_refs, missing_block_refs);
drop(guard);
drop(all_guards);
assert_eq!(map.num_of_locked_blocks(), 0);
}
{
let authority_1 = AuthorityIndex::new_for_test(1);
let guard = map
.lock_blocks(missing_block_refs.clone(), authority_1)
.unwrap();
let authority_2 = AuthorityIndex::new_for_test(2);
let guard = map.swap_locks(guard, authority_2);
assert_eq!(guard.unwrap().block_refs, missing_block_refs);
}
}
#[tokio::test]
async fn successful_fetch_blocks_from_peer() {
let (context, _) = Context::new_for_test(4);
let context = Arc::new(context);
let block_verifier = Arc::new(NoopBlockVerifier {});
let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
let network_client = Arc::new(MockNetworkClient::default());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let handle = Synchronizer::start(
network_client.clone(),
context,
core_dispatcher.clone(),
commit_vote_monitor,
block_verifier,
transaction_certifier,
dag_state,
false,
);
let expected_blocks = (0..10)
.map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
.collect::<Vec<_>>();
let missing_blocks = expected_blocks
.iter()
.map(|block| block.reference())
.collect::<BTreeSet<_>>();
let peer = AuthorityIndex::new_for_test(1);
network_client
.stub_fetch_blocks(expected_blocks.clone(), peer, None)
.await;
assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
sleep(Duration::from_millis(1_000)).await;
let added_blocks = core_dispatcher.get_add_blocks().await;
assert_eq!(added_blocks, expected_blocks);
}
#[tokio::test]
async fn saturate_fetch_blocks_from_peer() {
let (context, _) = Context::new_for_test(4);
let context = Arc::new(context);
let block_verifier = Arc::new(NoopBlockVerifier {});
let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
let network_client = Arc::new(MockNetworkClient::default());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let handle = Synchronizer::start(
network_client.clone(),
context,
core_dispatcher.clone(),
commit_vote_monitor,
block_verifier,
transaction_certifier,
dag_state,
false,
);
let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
.map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
.collect::<Vec<_>>();
let peer = AuthorityIndex::new_for_test(1);
let mut iter = expected_blocks.iter().peekable();
while let Some(block) = iter.next() {
network_client
.stub_fetch_blocks(
vec![block.clone()],
peer,
Some(Duration::from_millis(5_000)),
)
.await;
let mut missing_blocks = BTreeSet::new();
missing_blocks.insert(block.reference());
if iter.peek().is_none() {
match handle.fetch_blocks(missing_blocks, peer).await {
Err(ConsensusError::SynchronizerSaturated(index)) => {
assert_eq!(index, peer);
}
_ => panic!("A saturated synchronizer error was expected"),
}
} else {
assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
}
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn synchronizer_periodic_task_fetch_blocks() {
let (context, _) = Context::new_for_test(4);
let context = Arc::new(context);
let block_verifier = Arc::new(NoopBlockVerifier {});
let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
let network_client = Arc::new(MockNetworkClient::default());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let expected_blocks = (0..10)
.map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
.collect::<Vec<_>>();
let missing_blocks = expected_blocks
.iter()
.map(|block| block.reference())
.collect::<BTreeSet<_>>();
core_dispatcher
.stub_missing_blocks(missing_blocks.clone())
.await;
network_client
.stub_fetch_blocks(
expected_blocks.clone(),
AuthorityIndex::new_for_test(1),
Some(FETCH_REQUEST_TIMEOUT),
)
.await;
network_client
.stub_fetch_blocks(
expected_blocks.clone(),
AuthorityIndex::new_for_test(2),
None,
)
.await;
let _handle = Synchronizer::start(
network_client.clone(),
context,
core_dispatcher.clone(),
commit_vote_monitor,
block_verifier,
transaction_certifier,
dag_state,
false,
);
sleep(2 * FETCH_REQUEST_TIMEOUT).await;
let added_blocks = core_dispatcher.get_add_blocks().await;
assert_eq!(added_blocks, expected_blocks);
assert!(core_dispatcher
.get_missing_blocks()
.await
.unwrap()
.is_empty());
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn synchronizer_periodic_task_when_commit_lagging_with_missing_blocks_in_acceptable_thresholds(
) {
let (mut context, _) = Context::new_for_test(4);
context
.protocol_config
.set_consensus_gc_depth_for_testing(0);
context
.protocol_config
.set_consensus_batched_block_sync_for_testing(true);
let context = Arc::new(context);
let block_verifier = Arc::new(NoopBlockVerifier {});
let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
let network_client = Arc::new(MockNetworkClient::default());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
let expected_blocks = (1..sync_missing_block_round_threshold * 2)
.flat_map(|round| {
vec![
VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()),
VerifiedBlock::new_for_test(TestBlock::new(round, 1).build()),
]
.into_iter()
})
.collect::<Vec<_>>();
let missing_blocks = expected_blocks
.iter()
.map(|block| block.reference())
.collect::<BTreeSet<_>>();
core_dispatcher.stub_missing_blocks(missing_blocks).await;
let stub_blocks = expected_blocks
.into_iter()
.filter(|block| block.round() <= sync_missing_block_round_threshold)
.collect::<Vec<_>>();
let mut expected_blocks = Vec::new();
for authority in [0, 1] {
let author = AuthorityIndex::new_for_test(authority);
let chunk = stub_blocks
.iter()
.filter(|block| block.author() == author)
.take(context.parameters.max_blocks_per_sync)
.cloned()
.collect::<Vec<_>>();
expected_blocks.extend(chunk.clone());
network_client
.stub_fetch_blocks(
chunk.clone(),
AuthorityIndex::new_for_test(1),
Some(FETCH_REQUEST_TIMEOUT),
)
.await;
network_client
.stub_fetch_blocks(chunk, AuthorityIndex::new_for_test(2), None)
.await;
}
let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
let commit_index: CommitIndex = round - 1;
let blocks = (0..4)
.map(|authority| {
let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
let block = TestBlock::new(round, authority)
.set_commit_votes(commit_votes)
.build();
VerifiedBlock::new_for_test(block)
})
.collect::<Vec<_>>();
for block in blocks {
commit_vote_monitor.observe_block(&block);
}
let _handle = Synchronizer::start(
network_client.clone(),
context.clone(),
core_dispatcher.clone(),
commit_vote_monitor.clone(),
block_verifier.clone(),
transaction_certifier,
dag_state.clone(),
false,
);
sleep(4 * FETCH_REQUEST_TIMEOUT).await;
let mut added_blocks = core_dispatcher.get_add_blocks().await;
added_blocks.sort_by_key(|block| block.reference());
expected_blocks.sort_by_key(|block| block.reference());
assert_eq!(added_blocks, expected_blocks);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
let (mut context, _) = Context::new_for_test(4);
context
.protocol_config
.set_consensus_batched_block_sync_for_testing(true);
let context = Arc::new(context);
let block_verifier = Arc::new(NoopBlockVerifier {});
let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
let network_client = Arc::new(MockNetworkClient::default());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
let stub_blocks = (sync_missing_block_round_threshold * 2
..sync_missing_block_round_threshold * 3)
.map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
.collect::<Vec<_>>();
let missing_blocks = stub_blocks
.iter()
.map(|block| block.reference())
.collect::<BTreeSet<_>>();
core_dispatcher
.stub_missing_blocks(missing_blocks.clone())
.await;
let mut expected_blocks = stub_blocks
.iter()
.take(context.parameters.max_blocks_per_sync)
.cloned()
.collect::<Vec<_>>();
network_client
.stub_fetch_blocks(
expected_blocks.clone(),
AuthorityIndex::new_for_test(1),
Some(FETCH_REQUEST_TIMEOUT),
)
.await;
network_client
.stub_fetch_blocks(
expected_blocks.clone(),
AuthorityIndex::new_for_test(2),
None,
)
.await;
let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
let commit_index: CommitIndex = round - 1;
let blocks = (0..4)
.map(|authority| {
let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
let block = TestBlock::new(round, authority)
.set_commit_votes(commit_votes)
.build();
VerifiedBlock::new_for_test(block)
})
.collect::<Vec<_>>();
for block in blocks {
commit_vote_monitor.observe_block(&block);
}
let _handle = Synchronizer::start(
network_client.clone(),
context.clone(),
core_dispatcher.clone(),
commit_vote_monitor.clone(),
block_verifier,
transaction_certifier,
dag_state.clone(),
false,
);
sleep(4 * FETCH_REQUEST_TIMEOUT).await;
let added_blocks = core_dispatcher.get_add_blocks().await;
assert_eq!(added_blocks, vec![]);
{
let mut d = dag_state.write();
for index in 1..=commit_index {
let commit =
TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
d.add_commit(commit);
}
assert_eq!(
d.last_commit_index(),
commit_vote_monitor.quorum_commit_index()
);
}
core_dispatcher
.stub_missing_blocks(missing_blocks.clone())
.await;
sleep(2 * FETCH_REQUEST_TIMEOUT).await;
let mut added_blocks = core_dispatcher.get_add_blocks().await;
added_blocks.sort_by_key(|block| block.reference());
expected_blocks.sort_by_key(|block| block.reference());
assert_eq!(added_blocks, expected_blocks);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn synchronizer_fetch_own_last_block() {
let (context, _) = Context::new_for_test(4);
let context = Arc::new(context.with_parameters(Parameters {
sync_last_known_own_block_timeout: Duration::from_millis(2_000),
..Default::default()
}));
let block_verifier = Arc::new(NoopBlockVerifier {});
let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
let network_client = Arc::new(MockNetworkClient::default());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let our_index = AuthorityIndex::new_for_test(0);
let mut expected_blocks = (9..=10)
.map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
.collect::<Vec<_>>();
let block_1 = expected_blocks.pop().unwrap();
network_client
.stub_fetch_latest_blocks(
vec![block_1.clone()],
AuthorityIndex::new_for_test(1),
vec![our_index],
None,
)
.await;
network_client
.stub_fetch_latest_blocks(
vec![block_1],
AuthorityIndex::new_for_test(1),
vec![our_index],
None,
)
.await;
let block_2 = expected_blocks.pop().unwrap();
network_client
.stub_fetch_latest_blocks(
vec![block_2.clone()],
AuthorityIndex::new_for_test(2),
vec![our_index],
Some(Duration::from_secs(10)),
)
.await;
network_client
.stub_fetch_latest_blocks(
vec![block_2],
AuthorityIndex::new_for_test(2),
vec![our_index],
None,
)
.await;
network_client
.stub_fetch_latest_blocks(
vec![],
AuthorityIndex::new_for_test(3),
vec![our_index],
Some(Duration::from_secs(10)),
)
.await;
network_client
.stub_fetch_latest_blocks(
vec![],
AuthorityIndex::new_for_test(3),
vec![our_index],
None,
)
.await;
let handle = Synchronizer::start(
network_client.clone(),
context.clone(),
core_dispatcher.clone(),
commit_vote_monitor,
block_verifier,
transaction_certifier,
dag_state,
true,
);
sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
assert_eq!(
core_dispatcher.get_last_own_proposed_round().await,
vec![10]
);
assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
assert_eq!(
context
.metrics
.node_metrics
.sync_last_known_own_block_retries
.get(),
1
);
if let Err(err) = handle.stop().await {
if err.is_panic() {
std::panic::resume_unwind(err.into_panic());
}
}
}
}