1use std::{
4 collections::{BTreeMap, BTreeSet, HashMap},
5 sync::Arc,
6 time::Duration,
7};
8
9use bytes::Bytes;
10use consensus_config::AuthorityIndex;
11use consensus_types::block::{BlockRef, Round};
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use itertools::Itertools as _;
14use mysten_common::debug_fatal;
15use mysten_metrics::{
16 monitored_future,
17 monitored_mpsc::{Receiver, Sender, channel},
18 monitored_scope,
19};
20use parking_lot::{Mutex, RwLock};
21use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
22use sui_macros::fail_point_async;
23use tap::TapFallible;
24use tokio::{
25 runtime::Handle,
26 sync::{mpsc::error::TrySendError, oneshot},
27 task::{JoinError, JoinSet},
28 time::{Instant, sleep, sleep_until, timeout},
29};
30use tracing::{debug, error, info, trace, warn};
31
32use crate::{
33 BlockAPI,
34 block::{SignedBlock, VerifiedBlock},
35 block_verifier::BlockVerifier,
36 commit_vote_monitor::CommitVoteMonitor,
37 context::Context,
38 dag_state::DagState,
39 error::{ConsensusError, ConsensusResult},
40 network::NetworkClient,
41};
42use crate::{
43 authority_service::COMMIT_LAG_MULTIPLIER, core_thread::CoreThreadDispatcher,
44 transaction_certifier::TransactionCertifier,
45};
46
47const FETCH_BLOCKS_CONCURRENCY: usize = 5;
49
50const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
52const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
53
54const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
55
56const MAX_PERIODIC_SYNC_PEERS: usize = 3;
58
59struct BlocksGuard {
60 map: Arc<InflightBlocksMap>,
61 block_refs: BTreeSet<BlockRef>,
62 peer: AuthorityIndex,
63}
64
65impl Drop for BlocksGuard {
66 fn drop(&mut self) {
67 self.map.unlock_blocks(&self.block_refs, self.peer);
68 }
69}
70
71struct InflightBlocksMap {
76 inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
77}
78
79impl InflightBlocksMap {
80 fn new() -> Arc<Self> {
81 Arc::new(Self {
82 inner: Mutex::new(HashMap::new()),
83 })
84 }
85
86 fn lock_blocks(
92 self: &Arc<Self>,
93 missing_block_refs: BTreeSet<BlockRef>,
94 peer: AuthorityIndex,
95 ) -> Option<BlocksGuard> {
96 let mut blocks = BTreeSet::new();
97 let mut inner = self.inner.lock();
98
99 for block_ref in missing_block_refs {
100 let authorities = inner.entry(block_ref).or_default();
103 if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
104 && authorities.get(&peer).is_none()
105 {
106 assert!(authorities.insert(peer));
107 blocks.insert(block_ref);
108 }
109 }
110
111 if blocks.is_empty() {
112 None
113 } else {
114 Some(BlocksGuard {
115 map: self.clone(),
116 block_refs: blocks,
117 peer,
118 })
119 }
120 }
121
122 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
126 let mut blocks_to_fetch = self.inner.lock();
128 for block_ref in block_refs {
129 let authorities = blocks_to_fetch
130 .get_mut(block_ref)
131 .expect("Should have found a non empty map");
132
133 assert!(authorities.remove(&peer), "Peer index should be present!");
134
135 if authorities.is_empty() {
137 blocks_to_fetch.remove(block_ref);
138 }
139 }
140 }
141
142 fn swap_locks(
146 self: &Arc<Self>,
147 blocks_guard: BlocksGuard,
148 peer: AuthorityIndex,
149 ) -> Option<BlocksGuard> {
150 let block_refs = blocks_guard.block_refs.clone();
151
152 drop(blocks_guard);
154
155 self.lock_blocks(block_refs, peer)
157 }
158
159 #[cfg(test)]
160 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
161 let inner = self.inner.lock();
162 inner.len()
163 }
164}
165
166enum Command {
167 FetchBlocks {
168 missing_block_refs: BTreeSet<BlockRef>,
169 peer_index: AuthorityIndex,
170 result: oneshot::Sender<Result<(), ConsensusError>>,
171 },
172 FetchOwnLastBlock,
173 KickOffScheduler,
174}
175
176pub(crate) struct SynchronizerHandle {
177 commands_sender: Sender<Command>,
178 tasks: tokio::sync::Mutex<JoinSet<()>>,
179}
180
181impl SynchronizerHandle {
182 pub(crate) async fn fetch_blocks(
185 &self,
186 missing_block_refs: BTreeSet<BlockRef>,
187 peer_index: AuthorityIndex,
188 ) -> ConsensusResult<()> {
189 let (sender, receiver) = oneshot::channel();
190 self.commands_sender
191 .send(Command::FetchBlocks {
192 missing_block_refs,
193 peer_index,
194 result: sender,
195 })
196 .await
197 .map_err(|_err| ConsensusError::Shutdown)?;
198 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
199 }
200
201 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
202 let mut tasks = self.tasks.lock().await;
203 tasks.abort_all();
204 while let Some(result) = tasks.join_next().await {
205 result?
206 }
207 Ok(())
208 }
209}
210
211pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
233 context: Arc<Context>,
234 commands_receiver: Receiver<Command>,
235 fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
236 core_dispatcher: Arc<D>,
237 commit_vote_monitor: Arc<CommitVoteMonitor>,
238 dag_state: Arc<RwLock<DagState>>,
239 fetch_blocks_scheduler_task: JoinSet<()>,
240 fetch_own_last_block_task: JoinSet<()>,
241 network_client: Arc<C>,
242 block_verifier: Arc<V>,
243 transaction_certifier: TransactionCertifier,
244 inflight_blocks_map: Arc<InflightBlocksMap>,
245 commands_sender: Sender<Command>,
246}
247
248impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
249 pub(crate) fn start(
250 network_client: Arc<C>,
251 context: Arc<Context>,
252 core_dispatcher: Arc<D>,
253 commit_vote_monitor: Arc<CommitVoteMonitor>,
254 block_verifier: Arc<V>,
255 transaction_certifier: TransactionCertifier,
256 dag_state: Arc<RwLock<DagState>>,
257 sync_last_known_own_block: bool,
258 ) -> Arc<SynchronizerHandle> {
259 let (commands_sender, commands_receiver) =
260 channel("consensus_synchronizer_commands", 1_000);
261 let inflight_blocks_map = InflightBlocksMap::new();
262
263 let mut fetch_block_senders = BTreeMap::new();
265 let mut tasks = JoinSet::new();
266 for (index, _) in context.committee.authorities() {
267 if index == context.own_index {
268 continue;
269 }
270 let (sender, receiver) =
271 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
272 let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
273 index,
274 network_client.clone(),
275 block_verifier.clone(),
276 transaction_certifier.clone(),
277 commit_vote_monitor.clone(),
278 context.clone(),
279 core_dispatcher.clone(),
280 dag_state.clone(),
281 receiver,
282 commands_sender.clone(),
283 );
284 tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
285 fetch_block_senders.insert(index, sender);
286 }
287
288 let commands_sender_clone = commands_sender.clone();
289
290 if sync_last_known_own_block {
291 commands_sender
292 .try_send(Command::FetchOwnLastBlock)
293 .expect("Failed to sync our last block");
294 }
295
296 tasks.spawn(monitored_future!(async move {
298 let mut s = Self {
299 context,
300 commands_receiver,
301 fetch_block_senders,
302 core_dispatcher,
303 commit_vote_monitor,
304 fetch_blocks_scheduler_task: JoinSet::new(),
305 fetch_own_last_block_task: JoinSet::new(),
306 network_client,
307 block_verifier,
308 transaction_certifier,
309 inflight_blocks_map,
310 commands_sender: commands_sender_clone,
311 dag_state,
312 };
313 s.run().await;
314 }));
315
316 Arc::new(SynchronizerHandle {
317 commands_sender,
318 tasks: tokio::sync::Mutex::new(tasks),
319 })
320 }
321
322 async fn run(&mut self) {
324 const PERIODIC_FETCH_INTERVAL: Duration = Duration::from_millis(200);
326 let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_INTERVAL);
327
328 tokio::pin!(scheduler_timeout);
329
330 loop {
331 tokio::select! {
332 Some(command) = self.commands_receiver.recv() => {
333 match command {
334 Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
335 if peer_index == self.context.own_index {
336 error!("We should never attempt to fetch blocks from our own node");
337 continue;
338 }
339
340 let missing_block_refs = missing_block_refs
344 .into_iter()
345 .take(self.context.parameters.max_blocks_per_sync)
346 .collect();
347
348 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
349 let Some(blocks_guard) = blocks_guard else {
350 result.send(Ok(())).ok();
351 continue;
352 };
353
354 let r = self
357 .fetch_block_senders
358 .get(&peer_index)
359 .expect("Fatal error, sender should be present")
360 .try_send(blocks_guard)
361 .map_err(|err| {
362 match err {
363 TrySendError::Full(_) => {
364 let peer_hostname = &self.context.committee.authority(peer_index).hostname;
365 self.context
366 .metrics
367 .node_metrics
368 .synchronizer_skipped_fetch_requests
369 .with_label_values(&[peer_hostname])
370 .inc();
371 ConsensusError::SynchronizerSaturated(peer_index)
372 },
373 TrySendError::Closed(_) => ConsensusError::Shutdown
374 }
375 });
376
377 result.send(r).ok();
378 }
379 Command::FetchOwnLastBlock => {
380 if self.fetch_own_last_block_task.is_empty() {
381 self.start_fetch_own_last_block_task();
382 }
383 }
384 Command::KickOffScheduler => {
385 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
388 Instant::now()
389 } else {
390 Instant::now() + PERIODIC_FETCH_INTERVAL.checked_div(2).unwrap()
391 };
392
393 if timeout < scheduler_timeout.deadline() {
395 scheduler_timeout.as_mut().reset(timeout);
396 }
397 }
398 }
399 },
400 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
401 match result {
402 Ok(()) => {},
403 Err(e) => {
404 if e.is_cancelled() {
405 } else if e.is_panic() {
406 std::panic::resume_unwind(e.into_panic());
407 } else {
408 panic!("fetch our last block task failed: {e}");
409 }
410 },
411 };
412 },
413 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
414 match result {
415 Ok(()) => {},
416 Err(e) => {
417 if e.is_cancelled() {
418 } else if e.is_panic() {
419 std::panic::resume_unwind(e.into_panic());
420 } else {
421 panic!("fetch blocks scheduler task failed: {e}");
422 }
423 },
424 };
425 },
426 () = &mut scheduler_timeout => {
427 if self.fetch_blocks_scheduler_task.is_empty()
430 && let Err(err) = self.start_fetch_missing_blocks_task().await {
431 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
432 return;
433 };
434
435 scheduler_timeout
436 .as_mut()
437 .reset(Instant::now() + PERIODIC_FETCH_INTERVAL);
438 }
439 }
440 }
441 }
442
443 async fn fetch_blocks_from_authority(
444 peer_index: AuthorityIndex,
445 network_client: Arc<C>,
446 block_verifier: Arc<V>,
447 transaction_certifier: TransactionCertifier,
448 commit_vote_monitor: Arc<CommitVoteMonitor>,
449 context: Arc<Context>,
450 core_dispatcher: Arc<D>,
451 dag_state: Arc<RwLock<DagState>>,
452 mut receiver: Receiver<BlocksGuard>,
453 commands_sender: Sender<Command>,
454 ) {
455 const MAX_RETRIES: u32 = 3;
456 let peer_hostname = &context.committee.authority(peer_index).hostname;
457 let mut requests = FuturesUnordered::new();
458
459 loop {
460 tokio::select! {
461 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
462 let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
464
465 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, true, FETCH_REQUEST_TIMEOUT, 1))
466 },
467 Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
468 match response {
469 Ok(blocks) => {
470 if let Err(err) = Self::process_fetched_blocks(blocks,
471 peer_index,
472 blocks_guard,
473 core_dispatcher.clone(),
474 block_verifier.clone(),
475 transaction_certifier.clone(),
476 commit_vote_monitor.clone(),
477 context.clone(),
478 commands_sender.clone(),
479 "live"
480 ).await {
481 warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
482 context.metrics.node_metrics.synchronizer_process_fetched_failures.with_label_values(&[peer_hostname, "live"]).inc();
483 }
484 },
485 Err(_) => {
486 context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname, "live"]).inc();
487 if retries <= MAX_RETRIES {
488 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, true, FETCH_REQUEST_TIMEOUT, retries))
489 } else {
490 warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
491 drop(blocks_guard);
493 }
494 }
495 }
496 },
497 else => {
498 info!("Fetching blocks from authority {peer_index} task will now abort.");
499 break;
500 }
501 }
502 }
503 }
504
505 async fn process_fetched_blocks(
508 mut serialized_blocks: Vec<Bytes>,
509 peer_index: AuthorityIndex,
510 requested_blocks_guard: BlocksGuard,
511 core_dispatcher: Arc<D>,
512 block_verifier: Arc<V>,
513 transaction_certifier: TransactionCertifier,
514 commit_vote_monitor: Arc<CommitVoteMonitor>,
515 context: Arc<Context>,
516 commands_sender: Sender<Command>,
517 sync_method: &str,
518 ) -> ConsensusResult<()> {
519 if serialized_blocks.is_empty() {
520 return Ok(());
521 }
522
523 serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
525
526 let blocks = Handle::current()
528 .spawn_blocking({
529 let block_verifier = block_verifier.clone();
530 let context = context.clone();
531 move || {
532 Self::verify_blocks(
533 serialized_blocks,
534 block_verifier,
535 transaction_certifier,
536 &context,
537 peer_index,
538 )
539 }
540 })
541 .await
542 .expect("Spawn blocking should not fail")?;
543
544 for block in &blocks {
546 commit_vote_monitor.observe_block(block);
547 }
548
549 let metrics = &context.metrics.node_metrics;
550 let peer_hostname = &context.committee.authority(peer_index).hostname;
551 metrics
552 .synchronizer_fetched_blocks_by_peer
553 .with_label_values(&[peer_hostname, sync_method])
554 .inc_by(blocks.len() as u64);
555 for block in &blocks {
556 let block_hostname = &context.committee.authority(block.author()).hostname;
557 metrics
558 .synchronizer_fetched_blocks_by_authority
559 .with_label_values(&[block_hostname, sync_method])
560 .inc();
561 }
562
563 debug!(
564 "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
565 blocks.len(),
566 blocks.iter().map(|b| b.reference().to_string()).join(", "),
567 );
568
569 let missing_blocks = core_dispatcher
573 .add_blocks(blocks)
574 .await
575 .map_err(|_| ConsensusError::Shutdown)?;
576
577 drop(requested_blocks_guard);
579
580 if !missing_blocks.is_empty() {
582 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
584 {
585 warn!("Commands channel is full")
586 }
587 }
588
589 context
590 .metrics
591 .node_metrics
592 .missing_blocks_after_fetch_total
593 .inc_by(missing_blocks.len() as u64);
594
595 Ok(())
596 }
597
598 fn get_highest_accepted_rounds(
599 dag_state: Arc<RwLock<DagState>>,
600 context: &Arc<Context>,
601 ) -> Vec<Round> {
602 let blocks = dag_state
603 .read()
604 .get_last_cached_block_per_authority(Round::MAX);
605 assert_eq!(blocks.len(), context.committee.size());
606
607 blocks
608 .into_iter()
609 .map(|(block, _)| block.round())
610 .collect::<Vec<_>>()
611 }
612
613 fn verify_blocks(
614 serialized_blocks: Vec<Bytes>,
615 block_verifier: Arc<V>,
616 transaction_certifier: TransactionCertifier,
617 context: &Context,
618 peer_index: AuthorityIndex,
619 ) -> ConsensusResult<Vec<VerifiedBlock>> {
620 let mut verified_blocks = Vec::new();
621 let mut voted_blocks = Vec::new();
622 for serialized_block in serialized_blocks {
623 let signed_block: SignedBlock =
624 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
625
626 let (verified_block, reject_txn_votes) = block_verifier
628 .verify_and_vote(signed_block, serialized_block)
629 .tap_err(|e| {
630 let hostname = context.committee.authority(peer_index).hostname.clone();
631 context
632 .metrics
633 .node_metrics
634 .invalid_blocks
635 .with_label_values(&[&hostname, "synchronizer", e.clone().name()])
636 .inc();
637 info!("Invalid block received from {}: {}", peer_index, e);
638 })?;
639
640 let now = context.clock.timestamp_utc_ms();
642 let drift = verified_block.timestamp_ms().saturating_sub(now);
643 if drift > 0 {
644 let peer_hostname = &context
645 .committee
646 .authority(verified_block.author())
647 .hostname;
648 context
649 .metrics
650 .node_metrics
651 .block_timestamp_drift_ms
652 .with_label_values(&[peer_hostname, "synchronizer"])
653 .inc_by(drift);
654
655 trace!(
656 "Synced block {} timestamp {} is in the future (now={}).",
657 verified_block.reference(),
658 verified_block.timestamp_ms(),
659 now
660 );
661 }
662
663 verified_blocks.push(verified_block.clone());
664 voted_blocks.push((verified_block, reject_txn_votes));
665 }
666
667 if context.protocol_config.mysticeti_fastpath() {
668 transaction_certifier.add_voted_blocks(voted_blocks);
669 }
670
671 Ok(verified_blocks)
672 }
673
674 async fn fetch_blocks_request(
675 network_client: Arc<C>,
676 peer: AuthorityIndex,
677 blocks_guard: BlocksGuard,
678 highest_rounds: Vec<Round>,
679 breadth_first: bool,
680 request_timeout: Duration,
681 mut retries: u32,
682 ) -> (
683 ConsensusResult<Vec<Bytes>>,
684 BlocksGuard,
685 u32,
686 AuthorityIndex,
687 Vec<Round>,
688 ) {
689 let start = Instant::now();
690 let resp = timeout(
691 request_timeout,
692 network_client.fetch_blocks(
693 peer,
694 blocks_guard
695 .block_refs
696 .clone()
697 .into_iter()
698 .collect::<Vec<_>>(),
699 highest_rounds.clone().into_iter().collect::<Vec<_>>(),
700 breadth_first,
701 request_timeout,
702 ),
703 )
704 .await;
705
706 fail_point_async!("consensus-delay");
707
708 let resp = match resp {
709 Ok(Err(err)) => {
710 sleep_until(start + request_timeout).await;
713 retries += 1;
714 Err(err)
715 } Err(err) => {
717 sleep_until(start + request_timeout).await;
719 retries += 1;
720 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
721 }
722 Ok(result) => result,
723 };
724 (resp, blocks_guard, retries, peer, highest_rounds)
725 }
726
727 fn start_fetch_own_last_block_task(&mut self) {
728 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
729 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
730
731 let context = self.context.clone();
732 let dag_state = self.dag_state.clone();
733 let network_client = self.network_client.clone();
734 let block_verifier = self.block_verifier.clone();
735 let core_dispatcher = self.core_dispatcher.clone();
736
737 self.fetch_own_last_block_task
738 .spawn(monitored_future!(async move {
739 let _scope = monitored_scope("FetchOwnLastBlockTask");
740
741 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
742 let network_client_cloned = network_client.clone();
743 let own_index = context.own_index;
744 async move {
745 sleep(fetch_own_block_delay).await;
746 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
747 (r, authority_index)
748 }
749 };
750
751 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
752 let mut result = Vec::new();
753 for serialized_block in blocks {
754 let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
755 let (verified_block, _) = block_verifier.verify_and_vote(signed_block, serialized_block).tap_err(|err|{
756 let hostname = context.committee.authority(authority_index).hostname.clone();
757 context
758 .metrics
759 .node_metrics
760 .invalid_blocks
761 .with_label_values(&[&hostname, "synchronizer_own_block", err.clone().name()])
762 .inc();
763 warn!("Invalid block received from {}: {}", authority_index, err);
764 })?;
765
766 if verified_block.author() != context.own_index {
767 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
768 }
769 result.push(verified_block);
770 }
771 Ok(result)
772 };
773
774 let mut highest_round;
776 let mut retries = 0;
777 let mut retry_delay_step = Duration::from_millis(500);
778 'main:loop {
779 if context.committee.size() == 1 {
780 highest_round = dag_state.read().get_last_proposed_block().round();
781 info!("Only one node in the network, will not try fetching own last block from peers.");
782 break 'main;
783 }
784
785 let mut total_stake = 0;
786 highest_round = 0;
787
788 let mut results = FuturesUnordered::new();
790
791 for (authority_index, _authority) in context.committee.authorities() {
792 if authority_index != context.own_index {
793 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
794 }
795 }
796
797 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
799 tokio::pin!(timer);
800
801 'inner: loop {
802 tokio::select! {
803 result = results.next() => {
804 let Some((result, authority_index)) = result else {
805 break 'inner;
806 };
807 match result {
808 Ok(result) => {
809 match process_blocks(result, authority_index) {
810 Ok(blocks) => {
811 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
812 highest_round = highest_round.max(max_round);
813
814 total_stake += context.committee.stake(authority_index);
815 },
816 Err(err) => {
817 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
818 }
819 }
820 },
821 Err(err) => {
822 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
823 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
824 }
825 }
826 },
827 () = &mut timer => {
828 info!("Timeout while trying to sync our own last block from peers");
829 break 'inner;
830 }
831 }
832 }
833
834 if context.committee.reached_validity(total_stake) {
836 info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
837 break 'main;
838 }
839
840 retries += 1;
841 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
842 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
843
844 sleep(retry_delay_step).await;
845
846 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
847 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
848 }
849
850 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
852
853 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
854 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
855 }
856 }));
857 }
858
859 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
860 if self.context.committee.size() == 1 {
861 trace!(
862 "Only one node in the network, will not try fetching missing blocks from peers."
863 );
864 return Ok(());
865 }
866
867 let missing_blocks = self
868 .core_dispatcher
869 .get_missing_blocks()
870 .await
871 .map_err(|_err| ConsensusError::Shutdown)?;
872
873 if missing_blocks.is_empty() {
875 return Ok(());
876 }
877
878 let context = self.context.clone();
879 let network_client = self.network_client.clone();
880 let block_verifier = self.block_verifier.clone();
881 let transaction_certifier = self.transaction_certifier.clone();
882 let commit_vote_monitor = self.commit_vote_monitor.clone();
883 let core_dispatcher = self.core_dispatcher.clone();
884 let blocks_to_fetch = self.inflight_blocks_map.clone();
885 let commands_sender = self.commands_sender.clone();
886 let dag_state = self.dag_state.clone();
887
888 if self.is_commit_lagging() {
891 return Ok(());
892 }
893
894 self.fetch_blocks_scheduler_task
895 .spawn(monitored_future!(async move {
896 let _scope = monitored_scope("FetchMissingBlocksScheduler");
897 context
898 .metrics
899 .node_metrics
900 .fetch_blocks_scheduler_inflight
901 .inc();
902 let total_requested = missing_blocks.len();
903
904 fail_point_async!("consensus-delay");
905 let results = Self::fetch_blocks_from_authorities(
907 context.clone(),
908 blocks_to_fetch.clone(),
909 network_client,
910 missing_blocks,
911 dag_state,
912 )
913 .await;
914 context
915 .metrics
916 .node_metrics
917 .fetch_blocks_scheduler_inflight
918 .dec();
919 if results.is_empty() {
920 return;
921 }
922
923 let mut total_fetched = 0;
925 for (blocks_guard, fetched_blocks, peer) in results {
926 total_fetched += fetched_blocks.len();
927
928 if let Err(err) = Self::process_fetched_blocks(
929 fetched_blocks,
930 peer,
931 blocks_guard,
932 core_dispatcher.clone(),
933 block_verifier.clone(),
934 transaction_certifier.clone(),
935 commit_vote_monitor.clone(),
936 context.clone(),
937 commands_sender.clone(),
938 "periodic",
939 )
940 .await
941 {
942 warn!(
943 "Error occurred while processing fetched blocks from peer {peer}: {err}"
944 );
945 context
946 .metrics
947 .node_metrics
948 .synchronizer_process_fetched_failures
949 .with_label_values(&[
950 &context.committee.authority(peer).hostname,
951 "periodic",
952 ])
953 .inc();
954 }
955 }
956
957 debug!(
958 "Total blocks requested to fetch: {}, total fetched: {}",
959 total_requested, total_fetched
960 );
961 }));
962 Ok(())
963 }
964
965 fn is_commit_lagging(&self) -> bool {
966 let last_commit_index = self.dag_state.read().last_commit_index();
967 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
968 let commit_threshold = last_commit_index
969 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
970 commit_threshold < quorum_commit_index
971 }
972
973 async fn fetch_blocks_from_authorities(
979 context: Arc<Context>,
980 inflight_blocks: Arc<InflightBlocksMap>,
981 network_client: Arc<C>,
982 missing_blocks: BTreeSet<BlockRef>,
983 dag_state: Arc<RwLock<DagState>>,
984 ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
985 let missing_blocks = missing_blocks
989 .into_iter()
990 .take(2 * MAX_PERIODIC_SYNC_PEERS * context.parameters.max_blocks_per_sync)
991 .collect::<Vec<_>>();
992
993 let mut authorities = BTreeMap::<AuthorityIndex, Vec<BlockRef>>::new();
995 for block_ref in &missing_blocks {
996 authorities
997 .entry(block_ref.author)
998 .or_default()
999 .push(*block_ref);
1000 }
1001 let num_authorities_per_peer = authorities
1004 .len()
1005 .div_ceil((context.committee.size() - 1).min(MAX_PERIODIC_SYNC_PEERS));
1006
1007 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1009 for (authority, blocks) in &authorities {
1010 missing_blocks_per_authority[*authority] += blocks.len();
1011 }
1012 for (missing, (_, authority)) in missing_blocks_per_authority
1013 .into_iter()
1014 .zip(context.committee.authorities())
1015 {
1016 context
1017 .metrics
1018 .node_metrics
1019 .synchronizer_missing_blocks_by_authority
1020 .with_label_values(&[&authority.hostname])
1021 .inc_by(missing as u64);
1022 context
1023 .metrics
1024 .node_metrics
1025 .synchronizer_current_missing_blocks_by_authority
1026 .with_label_values(&[&authority.hostname])
1027 .set(missing as i64);
1028 }
1029
1030 let mut peers = context
1031 .committee
1032 .authorities()
1033 .filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
1034 .collect::<Vec<_>>();
1035
1036 if cfg!(not(test)) {
1038 peers.shuffle(&mut ThreadRng::default());
1040 }
1041
1042 let mut peers = peers.into_iter();
1043 let mut request_futures = FuturesUnordered::new();
1044
1045 let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1046
1047 let mut authorities = authorities.into_values().collect::<Vec<_>>();
1049 if cfg!(not(test)) {
1050 authorities.shuffle(&mut ThreadRng::default());
1052 }
1053
1054 for batch in authorities.chunks(num_authorities_per_peer) {
1056 let Some(peer) = peers.next() else {
1057 debug_fatal!("No more peers left to fetch blocks!");
1058 break;
1059 };
1060 let peer_hostname = &context.committee.authority(peer).hostname;
1061 let block_refs = batch
1064 .iter()
1065 .flatten()
1066 .cloned()
1067 .collect::<BTreeSet<_>>()
1068 .into_iter()
1069 .take(context.parameters.max_blocks_per_sync)
1070 .collect::<BTreeSet<_>>();
1071
1072 if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1074 info!(
1075 "Periodic sync of {} missing blocks from peer {} {}: {}",
1076 block_refs.len(),
1077 peer,
1078 peer_hostname,
1079 block_refs
1080 .iter()
1081 .map(|b| b.to_string())
1082 .collect::<Vec<_>>()
1083 .join(", ")
1084 );
1085 request_futures.push(Self::fetch_blocks_request(
1086 network_client.clone(),
1087 peer,
1088 blocks_guard,
1089 highest_rounds.clone(),
1090 false,
1091 FETCH_REQUEST_TIMEOUT,
1092 1,
1093 ));
1094 }
1095 }
1096
1097 let mut results = Vec::new();
1098 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1099
1100 tokio::pin!(fetcher_timeout);
1101
1102 loop {
1103 tokio::select! {
1104 Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1105 let peer_hostname = &context.committee.authority(peer_index).hostname;
1106 match response {
1107 Ok(fetched_blocks) => {
1108 results.push((blocks_guard, fetched_blocks, peer_index));
1109
1110 if request_futures.is_empty() {
1112 break;
1113 }
1114 },
1115 Err(_) => {
1116 context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname, "periodic"]).inc();
1117 if let Some(next_peer) = peers.next() {
1119 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1121 info!(
1122 "Retrying syncing {} missing blocks from peer {}: {}",
1123 blocks_guard.block_refs.len(),
1124 peer_hostname,
1125 blocks_guard.block_refs
1126 .iter()
1127 .map(|b| b.to_string())
1128 .collect::<Vec<_>>()
1129 .join(", ")
1130 );
1131 request_futures.push(Self::fetch_blocks_request(
1132 network_client.clone(),
1133 next_peer,
1134 blocks_guard,
1135 highest_rounds,
1136 false,
1137 FETCH_REQUEST_TIMEOUT,
1138 1,
1139 ));
1140 } else {
1141 debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1142 }
1143 } else {
1144 debug!("No more peers left to fetch blocks");
1145 }
1146 }
1147 }
1148 },
1149 _ = &mut fetcher_timeout => {
1150 debug!("Timed out while fetching missing blocks");
1151 break;
1152 }
1153 }
1154 }
1155
1156 results
1157 }
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162 use std::{
1163 collections::{BTreeMap, BTreeSet},
1164 sync::Arc,
1165 time::Duration,
1166 };
1167
1168 use async_trait::async_trait;
1169 use bytes::Bytes;
1170 use consensus_config::{AuthorityIndex, Parameters};
1171 use consensus_types::block::{BlockDigest, BlockRef, Round};
1172 use mysten_metrics::monitored_mpsc;
1173 use parking_lot::RwLock;
1174 use tokio::{sync::Mutex, time::sleep};
1175
1176 use crate::commit::{CommitVote, TrustedCommit};
1177 use crate::{
1178 CommitDigest, CommitIndex,
1179 block::{TestBlock, VerifiedBlock},
1180 block_verifier::NoopBlockVerifier,
1181 commit::CommitRange,
1182 commit_vote_monitor::CommitVoteMonitor,
1183 context::Context,
1184 core_thread::CoreThreadDispatcher,
1185 dag_state::DagState,
1186 error::{ConsensusError, ConsensusResult},
1187 network::{BlockStream, NetworkClient},
1188 storage::mem_store::MemStore,
1189 synchronizer::{
1190 FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, Synchronizer,
1191 },
1192 };
1193 use crate::{
1194 authority_service::COMMIT_LAG_MULTIPLIER, core_thread::MockCoreThreadDispatcher,
1195 transaction_certifier::TransactionCertifier,
1196 };
1197
1198 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1199 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1200 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1201 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1202
1203 #[derive(Default)]
1204 struct MockNetworkClient {
1205 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1206 fetch_latest_blocks_requests:
1207 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1208 }
1209
1210 impl MockNetworkClient {
1211 async fn stub_fetch_blocks(
1212 &self,
1213 blocks: Vec<VerifiedBlock>,
1214 peer: AuthorityIndex,
1215 latency: Option<Duration>,
1216 ) {
1217 let mut lock = self.fetch_blocks_requests.lock().await;
1218 let block_refs = blocks
1219 .iter()
1220 .map(|block| block.reference())
1221 .collect::<Vec<_>>();
1222 lock.insert((block_refs, peer), (blocks, latency));
1223 }
1224
1225 async fn stub_fetch_latest_blocks(
1226 &self,
1227 blocks: Vec<VerifiedBlock>,
1228 peer: AuthorityIndex,
1229 authorities: Vec<AuthorityIndex>,
1230 latency: Option<Duration>,
1231 ) {
1232 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1233 lock.entry((peer, authorities))
1234 .or_default()
1235 .push((blocks, latency));
1236 }
1237
1238 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1239 let lock = self.fetch_latest_blocks_requests.lock().await;
1240 lock.len()
1241 }
1242 }
1243
1244 #[async_trait]
1245 impl NetworkClient for MockNetworkClient {
1246 async fn send_block(
1247 &self,
1248 _peer: AuthorityIndex,
1249 _serialized_block: &VerifiedBlock,
1250 _timeout: Duration,
1251 ) -> ConsensusResult<()> {
1252 unimplemented!("Unimplemented")
1253 }
1254
1255 async fn subscribe_blocks(
1256 &self,
1257 _peer: AuthorityIndex,
1258 _last_received: Round,
1259 _timeout: Duration,
1260 ) -> ConsensusResult<BlockStream> {
1261 unimplemented!("Unimplemented")
1262 }
1263
1264 async fn fetch_blocks(
1265 &self,
1266 peer: AuthorityIndex,
1267 block_refs: Vec<BlockRef>,
1268 _highest_accepted_rounds: Vec<Round>,
1269 _breadth_first: bool,
1270 _timeout: Duration,
1271 ) -> ConsensusResult<Vec<Bytes>> {
1272 let mut lock = self.fetch_blocks_requests.lock().await;
1273 let response = lock.remove(&(block_refs.clone(), peer)).unwrap_or_else(|| {
1274 panic!(
1275 "Unexpected fetch blocks request made: {:?} {}. Current lock: {:?}",
1276 block_refs, peer, lock
1277 );
1278 });
1279
1280 let serialised = response
1281 .0
1282 .into_iter()
1283 .map(|block| block.serialized().clone())
1284 .collect::<Vec<_>>();
1285
1286 drop(lock);
1287
1288 if let Some(latency) = response.1 {
1289 sleep(latency).await;
1290 }
1291
1292 Ok(serialised)
1293 }
1294
1295 async fn fetch_commits(
1296 &self,
1297 _peer: AuthorityIndex,
1298 _commit_range: CommitRange,
1299 _timeout: Duration,
1300 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1301 unimplemented!("Unimplemented")
1302 }
1303
1304 async fn fetch_latest_blocks(
1305 &self,
1306 peer: AuthorityIndex,
1307 authorities: Vec<AuthorityIndex>,
1308 _timeout: Duration,
1309 ) -> ConsensusResult<Vec<Bytes>> {
1310 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1311 let mut responses = lock
1312 .remove(&(peer, authorities.clone()))
1313 .expect("Unexpected fetch blocks request made");
1314
1315 let response = responses.remove(0);
1316 let serialised = response
1317 .0
1318 .into_iter()
1319 .map(|block| block.serialized().clone())
1320 .collect::<Vec<_>>();
1321
1322 if !responses.is_empty() {
1323 lock.insert((peer, authorities), responses);
1324 }
1325
1326 drop(lock);
1327
1328 if let Some(latency) = response.1 {
1329 sleep(latency).await;
1330 }
1331
1332 Ok(serialised)
1333 }
1334
1335 async fn get_latest_rounds(
1336 &self,
1337 _peer: AuthorityIndex,
1338 _timeout: Duration,
1339 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1340 unimplemented!("Unimplemented")
1341 }
1342 }
1343
1344 #[test]
1345 fn test_inflight_blocks_map() {
1346 let map = InflightBlocksMap::new();
1348 let some_block_refs = [
1349 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1350 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1351 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1352 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1353 ];
1354 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1355
1356 {
1358 let mut all_guards = Vec::new();
1359
1360 for i in 1..=2 {
1362 let authority = AuthorityIndex::new_for_test(i);
1363
1364 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1365 let guard = guard.expect("Guard should be created");
1366 assert_eq!(guard.block_refs.len(), 4);
1367
1368 all_guards.push(guard);
1369
1370 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1372 assert!(guard.is_none());
1373 }
1374
1375 let authority_3 = AuthorityIndex::new_for_test(3);
1377
1378 let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1379 assert!(guard.is_none());
1380
1381 drop(all_guards.remove(0));
1383
1384 let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1385 let guard = guard.expect("Guard should be successfully acquired");
1386
1387 assert_eq!(guard.block_refs, missing_block_refs);
1388
1389 drop(guard);
1391 drop(all_guards);
1392
1393 assert_eq!(map.num_of_locked_blocks(), 0);
1394 }
1395
1396 {
1398 let authority_1 = AuthorityIndex::new_for_test(1);
1400 let guard = map
1401 .lock_blocks(missing_block_refs.clone(), authority_1)
1402 .unwrap();
1403
1404 let authority_2 = AuthorityIndex::new_for_test(2);
1406 let guard = map.swap_locks(guard, authority_2);
1407
1408 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1409 }
1410 }
1411
1412 #[tokio::test]
1413 async fn successful_fetch_blocks_from_peer() {
1414 let (context, _) = Context::new_for_test(4);
1416 let context = Arc::new(context);
1417 let block_verifier = Arc::new(NoopBlockVerifier {});
1418 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1419 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1420 let network_client = Arc::new(MockNetworkClient::default());
1421 let (blocks_sender, _blocks_receiver) =
1422 monitored_mpsc::unbounded_channel("consensus_block_output");
1423 let store = Arc::new(MemStore::new());
1424 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1425 let transaction_certifier = TransactionCertifier::new(
1426 context.clone(),
1427 block_verifier.clone(),
1428 dag_state.clone(),
1429 blocks_sender,
1430 );
1431
1432 let handle = Synchronizer::start(
1433 network_client.clone(),
1434 context,
1435 core_dispatcher.clone(),
1436 commit_vote_monitor,
1437 block_verifier,
1438 transaction_certifier,
1439 dag_state,
1440 false,
1441 );
1442
1443 let expected_blocks = (0..10)
1445 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1446 .collect::<Vec<_>>();
1447 let missing_blocks = expected_blocks
1448 .iter()
1449 .map(|block| block.reference())
1450 .collect::<BTreeSet<_>>();
1451
1452 let peer = AuthorityIndex::new_for_test(1);
1454 network_client
1455 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1456 .await;
1457
1458 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1460
1461 sleep(Duration::from_millis(1_000)).await;
1463
1464 let added_blocks = core_dispatcher.get_add_blocks().await;
1466 assert_eq!(added_blocks, expected_blocks);
1467 }
1468
1469 #[tokio::test]
1470 async fn saturate_fetch_blocks_from_peer() {
1471 let (context, _) = Context::new_for_test(4);
1473 let context = Arc::new(context);
1474 let block_verifier = Arc::new(NoopBlockVerifier {});
1475 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1476 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1477 let network_client = Arc::new(MockNetworkClient::default());
1478 let (blocks_sender, _blocks_receiver) =
1479 monitored_mpsc::unbounded_channel("consensus_block_output");
1480 let store = Arc::new(MemStore::new());
1481 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1482 let transaction_certifier = TransactionCertifier::new(
1483 context.clone(),
1484 block_verifier.clone(),
1485 dag_state.clone(),
1486 blocks_sender,
1487 );
1488
1489 let handle = Synchronizer::start(
1490 network_client.clone(),
1491 context,
1492 core_dispatcher.clone(),
1493 commit_vote_monitor,
1494 block_verifier,
1495 transaction_certifier,
1496 dag_state,
1497 false,
1498 );
1499
1500 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1502 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1503 .collect::<Vec<_>>();
1504
1505 let peer = AuthorityIndex::new_for_test(1);
1507 let mut iter = expected_blocks.iter().peekable();
1508 while let Some(block) = iter.next() {
1509 network_client
1512 .stub_fetch_blocks(
1513 vec![block.clone()],
1514 peer,
1515 Some(Duration::from_millis(5_000)),
1516 )
1517 .await;
1518
1519 let mut missing_blocks = BTreeSet::new();
1520 missing_blocks.insert(block.reference());
1521
1522 if iter.peek().is_none() {
1525 match handle.fetch_blocks(missing_blocks, peer).await {
1526 Err(ConsensusError::SynchronizerSaturated(index)) => {
1527 assert_eq!(index, peer);
1528 }
1529 _ => panic!("A saturated synchronizer error was expected"),
1530 }
1531 } else {
1532 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1533 }
1534 }
1535 }
1536
1537 #[tokio::test(flavor = "current_thread", start_paused = true)]
1538 async fn synchronizer_periodic_task_fetch_blocks() {
1539 let (context, _) = Context::new_for_test(4);
1541 let context = Arc::new(context);
1542 let block_verifier = Arc::new(NoopBlockVerifier {});
1543 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1544 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1545 let network_client = Arc::new(MockNetworkClient::default());
1546 let (blocks_sender, _blocks_receiver) =
1547 monitored_mpsc::unbounded_channel("consensus_block_output");
1548 let store = Arc::new(MemStore::new());
1549 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1550 let transaction_certifier = TransactionCertifier::new(
1551 context.clone(),
1552 block_verifier.clone(),
1553 dag_state.clone(),
1554 blocks_sender,
1555 );
1556
1557 let expected_blocks = (0..10)
1559 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1560 .collect::<Vec<_>>();
1561 let missing_blocks = expected_blocks
1562 .iter()
1563 .map(|block| block.reference())
1564 .collect::<BTreeSet<_>>();
1565
1566 core_dispatcher
1568 .stub_missing_blocks(missing_blocks.clone())
1569 .await;
1570
1571 network_client
1575 .stub_fetch_blocks(
1576 expected_blocks.clone(),
1577 AuthorityIndex::new_for_test(1),
1578 Some(FETCH_REQUEST_TIMEOUT),
1579 )
1580 .await;
1581 network_client
1582 .stub_fetch_blocks(
1583 expected_blocks.clone(),
1584 AuthorityIndex::new_for_test(2),
1585 None,
1586 )
1587 .await;
1588
1589 let _handle = Synchronizer::start(
1591 network_client.clone(),
1592 context,
1593 core_dispatcher.clone(),
1594 commit_vote_monitor,
1595 block_verifier,
1596 transaction_certifier,
1597 dag_state,
1598 false,
1599 );
1600
1601 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1602
1603 let added_blocks = core_dispatcher.get_add_blocks().await;
1605 assert_eq!(added_blocks, expected_blocks);
1606
1607 assert!(
1609 core_dispatcher
1610 .get_missing_blocks()
1611 .await
1612 .unwrap()
1613 .is_empty()
1614 );
1615 }
1616
1617 #[tokio::test(flavor = "current_thread", start_paused = true)]
1618 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1619 let (context, _) = Context::new_for_test(4);
1621 let context = Arc::new(context);
1622 let block_verifier = Arc::new(NoopBlockVerifier {});
1623 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1624 let network_client = Arc::new(MockNetworkClient::default());
1625 let (blocks_sender, _blocks_receiver) =
1626 monitored_mpsc::unbounded_channel("consensus_block_output");
1627 let store = Arc::new(MemStore::new());
1628 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1629 let transaction_certifier = TransactionCertifier::new(
1630 context.clone(),
1631 block_verifier.clone(),
1632 dag_state.clone(),
1633 blocks_sender,
1634 );
1635 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1636
1637 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1639 let stub_blocks = (sync_missing_block_round_threshold * 2
1640 ..sync_missing_block_round_threshold * 3)
1641 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1642 .collect::<Vec<_>>();
1643 let missing_blocks = stub_blocks
1644 .iter()
1645 .map(|block| block.reference())
1646 .collect::<BTreeSet<_>>();
1647 core_dispatcher
1648 .stub_missing_blocks(missing_blocks.clone())
1649 .await;
1650
1651 let mut expected_blocks = stub_blocks
1655 .iter()
1656 .take(context.parameters.max_blocks_per_sync)
1657 .cloned()
1658 .collect::<Vec<_>>();
1659 network_client
1660 .stub_fetch_blocks(
1661 expected_blocks.clone(),
1662 AuthorityIndex::new_for_test(1),
1663 Some(FETCH_REQUEST_TIMEOUT),
1664 )
1665 .await;
1666 network_client
1667 .stub_fetch_blocks(
1668 expected_blocks.clone(),
1669 AuthorityIndex::new_for_test(2),
1670 None,
1671 )
1672 .await;
1673
1674 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1676 let commit_index: CommitIndex = round - 1;
1677 let blocks = (0..4)
1678 .map(|authority| {
1679 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1680 let block = TestBlock::new(round, authority)
1681 .set_commit_votes(commit_votes)
1682 .build();
1683
1684 VerifiedBlock::new_for_test(block)
1685 })
1686 .collect::<Vec<_>>();
1687
1688 for block in blocks {
1691 commit_vote_monitor.observe_block(&block);
1692 }
1693
1694 let _handle = Synchronizer::start(
1696 network_client.clone(),
1697 context.clone(),
1698 core_dispatcher.clone(),
1699 commit_vote_monitor.clone(),
1700 block_verifier,
1701 transaction_certifier,
1702 dag_state.clone(),
1703 false,
1704 );
1705
1706 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1707
1708 let added_blocks = core_dispatcher.get_add_blocks().await;
1711 assert_eq!(added_blocks, vec![]);
1712
1713 {
1716 let mut d = dag_state.write();
1717 for index in 1..=commit_index {
1718 let commit =
1719 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
1720
1721 d.add_commit(commit);
1722 }
1723
1724 assert_eq!(
1725 d.last_commit_index(),
1726 commit_vote_monitor.quorum_commit_index()
1727 );
1728 }
1729
1730 core_dispatcher
1732 .stub_missing_blocks(missing_blocks.clone())
1733 .await;
1734
1735 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1736
1737 let mut added_blocks = core_dispatcher.get_add_blocks().await;
1739
1740 added_blocks.sort_by_key(|block| block.reference());
1741 expected_blocks.sort_by_key(|block| block.reference());
1742
1743 assert_eq!(added_blocks, expected_blocks);
1744 }
1745
1746 #[tokio::test(flavor = "current_thread", start_paused = true)]
1747 async fn synchronizer_fetch_own_last_block() {
1748 let (context, _) = Context::new_for_test(4);
1750 let context = Arc::new(context.with_parameters(Parameters {
1751 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1752 ..Default::default()
1753 }));
1754 let block_verifier = Arc::new(NoopBlockVerifier {});
1755 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1756 let network_client = Arc::new(MockNetworkClient::default());
1757 let (blocks_sender, _blocks_receiver) =
1758 monitored_mpsc::unbounded_channel("consensus_block_output");
1759 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1760 let store = Arc::new(MemStore::new());
1761 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1762 let transaction_certifier = TransactionCertifier::new(
1763 context.clone(),
1764 block_verifier.clone(),
1765 dag_state.clone(),
1766 blocks_sender,
1767 );
1768 let our_index = AuthorityIndex::new_for_test(0);
1769
1770 let mut expected_blocks = (9..=10)
1772 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1773 .collect::<Vec<_>>();
1774
1775 let block_1 = expected_blocks.pop().unwrap();
1778 network_client
1779 .stub_fetch_latest_blocks(
1780 vec![block_1.clone()],
1781 AuthorityIndex::new_for_test(1),
1782 vec![our_index],
1783 None,
1784 )
1785 .await;
1786 network_client
1787 .stub_fetch_latest_blocks(
1788 vec![block_1],
1789 AuthorityIndex::new_for_test(1),
1790 vec![our_index],
1791 None,
1792 )
1793 .await;
1794
1795 let block_2 = expected_blocks.pop().unwrap();
1797 network_client
1798 .stub_fetch_latest_blocks(
1799 vec![block_2.clone()],
1800 AuthorityIndex::new_for_test(2),
1801 vec![our_index],
1802 Some(Duration::from_secs(10)),
1803 )
1804 .await;
1805 network_client
1806 .stub_fetch_latest_blocks(
1807 vec![block_2],
1808 AuthorityIndex::new_for_test(2),
1809 vec![our_index],
1810 None,
1811 )
1812 .await;
1813
1814 network_client
1816 .stub_fetch_latest_blocks(
1817 vec![],
1818 AuthorityIndex::new_for_test(3),
1819 vec![our_index],
1820 Some(Duration::from_secs(10)),
1821 )
1822 .await;
1823 network_client
1824 .stub_fetch_latest_blocks(
1825 vec![],
1826 AuthorityIndex::new_for_test(3),
1827 vec![our_index],
1828 None,
1829 )
1830 .await;
1831
1832 let handle = Synchronizer::start(
1834 network_client.clone(),
1835 context.clone(),
1836 core_dispatcher.clone(),
1837 commit_vote_monitor,
1838 block_verifier,
1839 transaction_certifier,
1840 dag_state,
1841 true,
1842 );
1843
1844 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
1846
1847 assert_eq!(
1849 core_dispatcher.get_last_own_proposed_round().await,
1850 vec![10]
1851 );
1852
1853 assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
1855
1856 assert_eq!(
1858 context
1859 .metrics
1860 .node_metrics
1861 .sync_last_known_own_block_retries
1862 .get(),
1863 1
1864 );
1865
1866 if let Err(err) = handle.stop().await
1868 && err.is_panic()
1869 {
1870 std::panic::resume_unwind(err.into_panic());
1871 }
1872 }
1873
1874 #[tokio::test]
1875 async fn test_process_fetched_blocks() {
1876 let (context, _) = Context::new_for_test(4);
1878 let context = Arc::new(context);
1879 let block_verifier = Arc::new(NoopBlockVerifier {});
1880 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1881 let (blocks_sender, _blocks_receiver) =
1882 monitored_mpsc::unbounded_channel("consensus_block_output");
1883 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1884 let store = Arc::new(MemStore::new());
1885 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1886 let transaction_certifier = TransactionCertifier::new(
1887 context.clone(),
1888 block_verifier.clone(),
1889 dag_state.clone(),
1890 blocks_sender,
1891 );
1892 let (commands_sender, _commands_receiver) =
1893 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
1894
1895 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
1899 expected_blocks.extend(
1900 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
1901 );
1902 assert_eq!(
1903 expected_blocks.len(),
1904 context.parameters.max_blocks_per_sync
1905 );
1906
1907 let expected_serialized_blocks = expected_blocks
1908 .iter()
1909 .map(|b| b.serialized().clone())
1910 .collect::<Vec<_>>();
1911
1912 let expected_block_refs = expected_blocks
1913 .iter()
1914 .map(|b| b.reference())
1915 .collect::<BTreeSet<_>>();
1916
1917 let peer_index = AuthorityIndex::new_for_test(2);
1919
1920 let inflight_blocks_map = InflightBlocksMap::new();
1922 let blocks_guard = inflight_blocks_map
1923 .lock_blocks(expected_block_refs.clone(), peer_index)
1924 .expect("Failed to lock blocks");
1925
1926 assert_eq!(
1927 inflight_blocks_map.num_of_locked_blocks(),
1928 expected_block_refs.len()
1929 );
1930
1931 let result = Synchronizer::<
1933 MockNetworkClient,
1934 NoopBlockVerifier,
1935 MockCoreThreadDispatcher,
1936 >::process_fetched_blocks(
1937 expected_serialized_blocks,
1938 peer_index,
1939 blocks_guard, core_dispatcher.clone(),
1941 block_verifier,
1942 transaction_certifier,
1943 commit_vote_monitor,
1944 context.clone(),
1945 commands_sender,
1946 "test",
1947 )
1948 .await;
1949
1950 assert!(result.is_ok());
1952
1953 let added_blocks = core_dispatcher.get_add_blocks().await;
1955 assert_eq!(
1956 added_blocks
1957 .iter()
1958 .map(|b| b.reference())
1959 .collect::<BTreeSet<_>>(),
1960 expected_block_refs,
1961 );
1962
1963 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
1965 }
1966}