1use std::{
4 collections::{BTreeMap, BTreeSet},
5 sync::Arc,
6 time::Duration,
7};
8
9use bytes::Bytes;
10use consensus_config::AuthorityIndex;
11use consensus_types::block::{BlockRef, Round};
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use itertools::Itertools as _;
14use mysten_common::{ZipDebugEqIteratorExt, debug_fatal};
15use mysten_metrics::{
16 monitored_future,
17 monitored_mpsc::{Receiver, Sender, channel},
18 monitored_scope,
19};
20use parking_lot::{Mutex, RwLock};
21use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
22use sui_macros::fail_point_async;
23use tap::TapFallible;
24use tokio::{
25 runtime::Handle,
26 sync::{mpsc::error::TrySendError, oneshot},
27 task::{JoinError, JoinSet},
28 time::{Instant, sleep, sleep_until, timeout},
29};
30use tracing::{debug, info, trace, warn};
31
32use crate::{
33 BlockAPI,
34 block::{ExtendedBlock, SignedBlock, VerifiedBlock},
35 block_verifier::BlockVerifier,
36 commit::CommitIndex,
37 commit_vote_monitor::{CommitVoteMonitor, is_commit_lagging},
38 context::Context,
39 dag_state::DagState,
40 error::{ConsensusError, ConsensusResult},
41 network::{ObserverNetworkClient, PeerId, SynchronizerClient, ValidatorNetworkClient},
42 peers_pool::PeersPool,
43 round_tracker::RoundTracker,
44};
45use crate::{core_thread::CoreThreadDispatcher, transaction_vote_tracker::TransactionVoteTracker};
46
47const FETCH_BLOCKS_CONCURRENCY: usize = 5;
49
50const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
52const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
53
54const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
55
56const MAX_PERIODIC_SYNC_PEERS: usize = 3;
58
59const COMMIT_PROGRESS_TIMEOUT: Duration = Duration::from_secs(10);
61
62struct BlocksGuard {
63 map: Arc<InflightBlocksMap>,
64 block_refs: BTreeSet<BlockRef>,
65 peer: PeerId,
66}
67
68impl Drop for BlocksGuard {
69 fn drop(&mut self) {
70 self.map.unlock_blocks(&self.block_refs, self.peer.clone());
71 }
72}
73
74struct InflightBlocksMap {
79 inner: Mutex<BTreeMap<BlockRef, BTreeSet<PeerId>>>,
80}
81
82impl InflightBlocksMap {
83 fn new() -> Arc<Self> {
84 Arc::new(Self {
85 inner: Mutex::new(BTreeMap::new()),
86 })
87 }
88
89 fn lock_blocks(
95 self: &Arc<Self>,
96 missing_block_refs: BTreeSet<BlockRef>,
97 peer: PeerId,
98 ) -> Option<BlocksGuard> {
99 let mut blocks = BTreeSet::new();
100 let mut inner = self.inner.lock();
101
102 for block_ref in missing_block_refs {
103 let peers = inner.entry(block_ref).or_default();
106 if peers.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK && peers.get(&peer).is_none() {
107 assert!(peers.insert(peer.clone()));
108 blocks.insert(block_ref);
109 }
110 }
111
112 if blocks.is_empty() {
113 None
114 } else {
115 Some(BlocksGuard {
116 map: self.clone(),
117 block_refs: blocks,
118 peer,
119 })
120 }
121 }
122
123 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: PeerId) {
127 let mut blocks_to_fetch = self.inner.lock();
129 for block_ref in block_refs {
130 let peers = blocks_to_fetch
131 .get_mut(block_ref)
132 .expect("Should have found a non empty map");
133
134 assert!(peers.remove(&peer), "Peer should be present!");
135
136 if peers.is_empty() {
138 blocks_to_fetch.remove(block_ref);
139 }
140 }
141 }
142
143 fn swap_locks(
147 self: &Arc<Self>,
148 blocks_guard: BlocksGuard,
149 peer: PeerId,
150 ) -> Option<BlocksGuard> {
151 let block_refs = blocks_guard.block_refs.clone();
152
153 drop(blocks_guard);
155
156 self.lock_blocks(block_refs, peer)
158 }
159
160 #[cfg(test)]
161 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
162 let inner = self.inner.lock();
163 inner.len()
164 }
165}
166
167enum Command {
168 FetchBlocks {
169 missing_block_refs: BTreeSet<BlockRef>,
170 peer: PeerId,
171 result: oneshot::Sender<Result<(), ConsensusError>>,
172 },
173 FetchOwnLastBlock,
174 KickOffScheduler,
175}
176
177pub(crate) struct SynchronizerHandle {
178 commands_sender: Sender<Command>,
179 tasks: tokio::sync::Mutex<JoinSet<()>>,
180}
181
182impl SynchronizerHandle {
183 pub(crate) async fn fetch_blocks(
186 &self,
187 missing_block_refs: BTreeSet<BlockRef>,
188 peer: PeerId,
189 ) -> ConsensusResult<()> {
190 let (sender, receiver) = oneshot::channel();
191 self.commands_sender
192 .send(Command::FetchBlocks {
193 missing_block_refs,
194 peer,
195 result: sender,
196 })
197 .await
198 .map_err(|_err| ConsensusError::Shutdown)?;
199 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
200 }
201
202 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
203 let mut tasks = self.tasks.lock().await;
204 tasks.abort_all();
205 while let Some(result) = tasks.join_next().await {
206 result?
207 }
208 Ok(())
209 }
210
211 #[cfg(test)]
212 pub(crate) fn new_for_test() -> Arc<Self> {
214 use tokio::task::JoinSet;
215 let (tx, _rx) = channel("test_synchronizer", 1);
216 Arc::new(Self {
217 commands_sender: tx,
218 tasks: tokio::sync::Mutex::new(JoinSet::new()),
219 })
220 }
221}
222
223pub(crate) struct Synchronizer<
245 V: BlockVerifier,
246 D: CoreThreadDispatcher,
247 VC: ValidatorNetworkClient,
248 OC: ObserverNetworkClient,
249> {
250 context: Arc<Context>,
251 commands_receiver: Receiver<Command>,
252 fetch_block_senders: BTreeMap<PeerId, Sender<BlocksGuard>>,
253 core_dispatcher: Arc<D>,
254 commit_vote_monitor: Arc<CommitVoteMonitor>,
255 dag_state: Arc<RwLock<DagState>>,
256 fetch_blocks_scheduler_task: JoinSet<()>,
257 fetch_own_last_block_task: JoinSet<()>,
258 network_client: Arc<SynchronizerClient<VC, OC>>,
259 block_verifier: Arc<V>,
260 transaction_vote_tracker: TransactionVoteTracker,
261 round_tracker: Arc<RwLock<RoundTracker>>,
262 inflight_blocks_map: Arc<InflightBlocksMap>,
263 commands_sender: Sender<Command>,
264 last_changed_commit_index: CommitIndex,
265 last_commit_change_time: Instant,
266 commit_sync_failover: bool,
268 peers_pool: Arc<PeersPool>,
269}
270
271impl<V, D, VC, OC> Synchronizer<V, D, VC, OC>
272where
273 V: BlockVerifier,
274 D: CoreThreadDispatcher,
275 VC: ValidatorNetworkClient,
276 OC: ObserverNetworkClient,
277{
278 pub(crate) fn start(
279 network_client: Arc<SynchronizerClient<VC, OC>>,
280 context: Arc<Context>,
281 core_dispatcher: Arc<D>,
282 commit_vote_monitor: Arc<CommitVoteMonitor>,
283 block_verifier: Arc<V>,
284 transaction_vote_tracker: TransactionVoteTracker,
285 round_tracker: Arc<RwLock<RoundTracker>>,
286 dag_state: Arc<RwLock<DagState>>,
287 peers_pool: Arc<PeersPool>,
288 sync_last_known_own_block: bool,
289 ) -> Arc<SynchronizerHandle> {
290 let (commands_sender, commands_receiver) =
291 channel("consensus_synchronizer_commands", 1_000);
292 let inflight_blocks_map = InflightBlocksMap::new();
293
294 let mut fetch_block_senders = BTreeMap::new();
296 let mut tasks = JoinSet::new();
297
298 let known_peers = peers_pool.get_known_peers();
301 for peer in known_peers {
302 let (sender, receiver) =
303 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
304 let fetch_blocks_from_peer_async = Self::fetch_blocks_from_peer(
305 peer.clone(),
306 network_client.clone(),
307 block_verifier.clone(),
308 transaction_vote_tracker.clone(),
309 commit_vote_monitor.clone(),
310 context.clone(),
311 core_dispatcher.clone(),
312 dag_state.clone(),
313 receiver,
314 commands_sender.clone(),
315 round_tracker.clone(),
316 peers_pool.clone(),
317 );
318 tasks.spawn(monitored_future!(fetch_blocks_from_peer_async));
319 fetch_block_senders.insert(peer, sender);
320 }
321
322 let commands_sender_clone = commands_sender.clone();
323
324 if sync_last_known_own_block {
325 commands_sender
326 .try_send(Command::FetchOwnLastBlock)
327 .expect("Failed to sync our last block");
328 }
329
330 tasks.spawn(monitored_future!(async move {
332 let mut s = Self {
333 context,
334 commands_receiver,
335 fetch_block_senders,
336 core_dispatcher,
337 commit_vote_monitor,
338 fetch_blocks_scheduler_task: JoinSet::new(),
339 fetch_own_last_block_task: JoinSet::new(),
340 network_client,
341 block_verifier,
342 transaction_vote_tracker,
343 inflight_blocks_map,
344 commands_sender: commands_sender_clone,
345 dag_state,
346 round_tracker,
347 last_changed_commit_index: 0,
348 last_commit_change_time: Instant::now(),
349 commit_sync_failover: false,
350 peers_pool,
351 };
352 s.run().await;
353 }));
354
355 Arc::new(SynchronizerHandle {
356 commands_sender,
357 tasks: tokio::sync::Mutex::new(tasks),
358 })
359 }
360
361 async fn run(&mut self) {
363 const PERIODIC_FETCH_INTERVAL: Duration = Duration::from_millis(200);
365 let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_INTERVAL);
366
367 tokio::pin!(scheduler_timeout);
368
369 loop {
370 tokio::select! {
371 Some(command) = self.commands_receiver.recv() => {
372 match command {
373 Command::FetchBlocks{ missing_block_refs, peer, result } => {
374 if !self.peers_pool.is_peer_known(&peer) {
376 result.send(Err(ConsensusError::PeerUnavailable(format!("{:?}", peer)))).ok();
377 continue;
378 }
379
380 let missing_block_refs = missing_block_refs
384 .into_iter()
385 .take(self.context.parameters.max_blocks_per_sync)
386 .collect();
387
388 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer.clone());
389 let Some(blocks_guard) = blocks_guard else {
390 result.send(Ok(())).ok();
391 continue;
392 };
393
394 let r = self
397 .fetch_block_senders
398 .get(&peer)
399 .ok_or(ConsensusError::PeerNotFound(format!("Peer {} not found in fetch_block_senders", peer)))
400 .and_then(|sender| {
401 sender
402 .try_send(blocks_guard)
403 .map_err(|err| {
404 match err {
405 TrySendError::Full(_) => {
406 let peer_name = peer.labelname(&self.context);
407 self.context
408 .metrics
409 .node_metrics
410 .synchronizer_skipped_fetch_requests
411 .with_label_values(&[peer_name])
412 .inc();
413 ConsensusError::SynchronizerSaturated(format!("{:?}", peer))
414 },
415 TrySendError::Closed(_) => ConsensusError::Shutdown
416 }
417 })
418 });
419
420 result.send(r).ok();
421 }
422 Command::FetchOwnLastBlock => {
423 if self.fetch_own_last_block_task.is_empty() {
424 self.start_fetch_own_last_block_task();
425 }
426 }
427 Command::KickOffScheduler => {
428 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
431 Instant::now()
432 } else {
433 Instant::now() + PERIODIC_FETCH_INTERVAL.checked_div(2).unwrap()
434 };
435
436 if timeout < scheduler_timeout.deadline() {
438 scheduler_timeout.as_mut().reset(timeout);
439 }
440 }
441 }
442 },
443 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
444 match result {
445 Ok(()) => {},
446 Err(e) => {
447 if e.is_cancelled() {
448 } else if e.is_panic() {
449 std::panic::resume_unwind(e.into_panic());
450 } else {
451 panic!("fetch our last block task failed: {e}");
452 }
453 },
454 };
455 },
456 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
457 match result {
458 Ok(()) => {},
459 Err(e) => {
460 if e.is_cancelled() {
461 } else if e.is_panic() {
462 std::panic::resume_unwind(e.into_panic());
463 } else {
464 panic!("fetch blocks scheduler task failed: {e}");
465 }
466 },
467 };
468 },
469 () = &mut scheduler_timeout => {
470 if self.fetch_blocks_scheduler_task.is_empty()
473 && let Err(err) = self.start_fetch_missing_blocks_task().await {
474 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
475 return;
476 };
477
478 scheduler_timeout
479 .as_mut()
480 .reset(Instant::now() + PERIODIC_FETCH_INTERVAL);
481 }
482 }
483 }
484 }
485
486 async fn fetch_blocks_from_peer(
487 peer: PeerId,
488 network_client: Arc<SynchronizerClient<VC, OC>>,
489 block_verifier: Arc<V>,
490 transaction_vote_tracker: TransactionVoteTracker,
491 commit_vote_monitor: Arc<CommitVoteMonitor>,
492 context: Arc<Context>,
493 core_dispatcher: Arc<D>,
494 dag_state: Arc<RwLock<DagState>>,
495 mut receiver: Receiver<BlocksGuard>,
496 commands_sender: Sender<Command>,
497 round_tracker: Arc<RwLock<RoundTracker>>,
498 _peers_pool: Arc<PeersPool>,
499 ) {
500 const MAX_RETRIES: u32 = 3;
501 let mut requests = FuturesUnordered::new();
502
503 loop {
504 tokio::select! {
505 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
506 let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
507
508 requests.push(Self::fetch_blocks_request(network_client.clone(), peer.clone(), blocks_guard, fetch_after_rounds, true, FETCH_REQUEST_TIMEOUT, 1))
509 },
510 Some((response, blocks_guard, retries, _peer, fetch_after_rounds)) = requests.next() => {
511 match response {
512 Ok(blocks) => {
513 if let Err(err) = Self::process_fetched_blocks(blocks,
514 peer.clone(),
515 blocks_guard,
516 core_dispatcher.clone(),
517 block_verifier.clone(),
518 transaction_vote_tracker.clone(),
519 commit_vote_monitor.clone(),
520 context.clone(),
521 commands_sender.clone(),
522 round_tracker.clone(),
523 "live"
524 ).await {
525 warn!("Error while processing fetched blocks from peer {}: {err}", peer.hostname(&context));
526 context.metrics.node_metrics.synchronizer_process_fetched_failures.with_label_values(&[peer.labelname(&context).as_str(), "live"]).inc();
527 }
528 },
529 Err(_) => {
530 context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer.labelname(&context).as_str(), "live"]).inc();
531 if retries <= MAX_RETRIES {
532 requests.push(Self::fetch_blocks_request(network_client.clone(), peer.clone(), blocks_guard, fetch_after_rounds, true, FETCH_REQUEST_TIMEOUT, retries))
533 } else {
534 warn!("Max retries {retries} reached while trying to fetch blocks from peer {}.", peer.hostname(&context));
535 drop(blocks_guard);
537 }
538 }
539 }
540 },
541 else => {
542 info!("Fetching blocks from peer {} task will now abort.", peer.hostname(&context));
543 break;
544 }
545 }
546 }
547 }
548
549 async fn process_fetched_blocks(
552 mut serialized_blocks: Vec<Bytes>,
553 peer: PeerId,
554 requested_blocks_guard: BlocksGuard,
555 core_dispatcher: Arc<D>,
556 block_verifier: Arc<V>,
557 transaction_vote_tracker: TransactionVoteTracker,
558 commit_vote_monitor: Arc<CommitVoteMonitor>,
559 context: Arc<Context>,
560 commands_sender: Sender<Command>,
561 round_tracker: Arc<RwLock<RoundTracker>>,
562 sync_method: &str,
563 ) -> ConsensusResult<()> {
564 if serialized_blocks.is_empty() {
565 return Ok(());
566 }
567
568 serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
570
571 let blocks = Handle::current()
573 .spawn_blocking({
574 let block_verifier = block_verifier.clone();
575 let context = context.clone();
576 let peer = peer.clone();
577 move || {
578 Self::verify_blocks(
579 serialized_blocks,
580 block_verifier,
581 transaction_vote_tracker,
582 &context,
583 peer,
584 )
585 }
586 })
587 .await
588 .expect("Spawn blocking should not fail")?;
589
590 for block in &blocks {
592 commit_vote_monitor.observe_block(block);
593 }
594
595 {
598 let mut tracker = round_tracker.write();
599 for block in &blocks {
600 tracker.update_from_verified_block(&ExtendedBlock {
601 block: block.clone(),
602 excluded_ancestors: vec![],
603 });
604 }
605 }
606
607 let metrics = &context.metrics.node_metrics;
608 metrics
609 .synchronizer_fetched_blocks_by_peer
610 .with_label_values(&[peer.labelname(&context).as_str(), sync_method])
611 .inc_by(blocks.len() as u64);
612 for block in &blocks {
613 let block_hostname = &context.committee.authority(block.author()).hostname;
614 metrics
615 .synchronizer_fetched_blocks_by_authority
616 .with_label_values(&[block_hostname.as_str(), sync_method])
617 .inc();
618 }
619
620 debug!(
621 "Synced {} missing blocks from peer {:?}: {}",
622 blocks.len(),
623 peer,
624 blocks.iter().map(|b| b.reference().to_string()).join(", "),
625 );
626
627 let missing_blocks = core_dispatcher
631 .add_blocks(blocks)
632 .await
633 .map_err(|_| ConsensusError::Shutdown)?;
634
635 drop(requested_blocks_guard);
637
638 if !missing_blocks.is_empty() {
640 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
642 {
643 warn!("Commands channel is full")
644 }
645 }
646
647 context
648 .metrics
649 .node_metrics
650 .missing_blocks_after_fetch_total
651 .inc_by(missing_blocks.len() as u64);
652
653 Ok(())
654 }
655
656 fn get_fetch_after_rounds(
657 context: &Arc<Context>,
658 dag_state: Arc<RwLock<DagState>>,
659 ) -> Vec<Round> {
660 let (blocks, gc_round) = {
661 let dag_state = dag_state.read();
662 (
663 dag_state.get_last_cached_block_per_authority(Round::MAX),
664 dag_state.gc_round(),
665 )
666 };
667 assert_eq!(blocks.len(), context.committee.size());
668
669 blocks
670 .into_iter()
671 .map(|(block, _)| block.round().max(gc_round))
672 .collect::<Vec<_>>()
673 }
674
675 fn verify_blocks(
676 serialized_blocks: Vec<Bytes>,
677 block_verifier: Arc<V>,
678 transaction_vote_tracker: TransactionVoteTracker,
679 context: &Context,
680 peer: PeerId,
681 ) -> ConsensusResult<Vec<VerifiedBlock>> {
682 let mut verified_blocks = Vec::new();
683 let mut voted_blocks = Vec::new();
684 for serialized_block in serialized_blocks {
685 let signed_block: SignedBlock =
686 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
687
688 let (verified_block, reject_txn_votes) = block_verifier
690 .verify_and_vote(signed_block, serialized_block)
691 .tap_err(|e| {
692 let peer_label = peer.labelname(context);
693 context
694 .metrics
695 .node_metrics
696 .invalid_blocks
697 .with_label_values(&[peer_label.as_str(), "synchronizer", e.clone().name()])
698 .inc();
699 info!("Invalid block received from {}: {}", peer, e);
700 })?;
701
702 let now = context.clock.timestamp_utc_ms();
704 let drift = verified_block.timestamp_ms().saturating_sub(now);
705 if drift > 0 {
706 let peer_hostname = &context
707 .committee
708 .authority(verified_block.author())
709 .hostname;
710 context
711 .metrics
712 .node_metrics
713 .block_timestamp_drift_ms
714 .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
715 .inc_by(drift);
716
717 trace!(
718 "Synced block {} timestamp {} is in the future (now={}).",
719 verified_block.reference(),
720 verified_block.timestamp_ms(),
721 now
722 );
723 }
724
725 verified_blocks.push(verified_block.clone());
726 voted_blocks.push((verified_block, reject_txn_votes));
727 }
728
729 if context.protocol_config.transaction_voting_enabled() {
730 transaction_vote_tracker.add_voted_blocks(voted_blocks);
731 }
732
733 Ok(verified_blocks)
734 }
735
736 async fn fetch_blocks_request(
737 network_client: Arc<SynchronizerClient<VC, OC>>,
738 peer: PeerId,
739 blocks_guard: BlocksGuard,
740 fetch_after_rounds: Vec<Round>,
741 fetch_missing_ancestors: bool,
742 request_timeout: Duration,
743 mut retries: u32,
744 ) -> (
745 ConsensusResult<Vec<Bytes>>,
746 BlocksGuard,
747 u32,
748 PeerId,
749 Vec<Round>,
750 ) {
751 let start = Instant::now();
752 let resp = timeout(
753 request_timeout,
754 network_client.fetch_blocks(
755 peer.clone(),
756 blocks_guard
757 .block_refs
758 .clone()
759 .into_iter()
760 .collect::<Vec<_>>(),
761 fetch_after_rounds.clone().into_iter().collect::<Vec<_>>(),
762 fetch_missing_ancestors,
763 request_timeout,
764 ),
765 )
766 .await;
767
768 fail_point_async!("consensus-delay");
769
770 let resp = match resp {
771 Ok(Err(err)) => {
772 sleep_until(start + request_timeout).await;
775 retries += 1;
776 Err(err)
777 } Err(err) => {
779 sleep_until(start + request_timeout).await;
781 retries += 1;
782 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
783 }
784 Ok(result) => result,
785 };
786 (resp, blocks_guard, retries, peer, fetch_after_rounds)
787 }
788
789 fn start_fetch_own_last_block_task(&mut self) {
790 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
791 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
792
793 let context = self.context.clone();
794 let dag_state = self.dag_state.clone();
795 let network_client = self.network_client.clone();
796 let block_verifier = self.block_verifier.clone();
797 let core_dispatcher = self.core_dispatcher.clone();
798
799 self.fetch_own_last_block_task
800 .spawn(monitored_future!(async move {
801 let _scope = monitored_scope("FetchOwnLastBlockTask");
802
803 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
804 let network_client_cloned = network_client.clone();
805 let own_index = context.own_index;
806 async move {
807 sleep(fetch_own_block_delay).await;
808 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
809 (r, authority_index)
810 }
811 };
812
813
814 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
815 let mut result = Vec::new();
816 for serialized_block in blocks {
817 let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
818 let (verified_block, _) = block_verifier.verify_and_vote(signed_block, serialized_block).tap_err(|err|{
819 let hostname = context.committee.authority(authority_index).hostname.clone();
820 context
821 .metrics
822 .node_metrics
823 .invalid_blocks
824 .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
825 .inc();
826 warn!("Invalid block received from {}: {}", authority_index, err);
827 })?;
828
829 if verified_block.author() != context.own_index {
830 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
831 }
832 result.push(verified_block);
833 }
834 Ok(result)
835 };
836
837 let mut highest_round;
839 let mut retries = 0;
840 let mut retry_delay_step = Duration::from_millis(500);
841 'main:loop {
842 if context.committee.size() == 1 {
843 highest_round = dag_state.read().get_last_proposed_block().expect("Last proposed block should be returned on validators").round();
844 info!("Only one node in the network, will not try fetching own last block from peers.");
845 break 'main;
846 }
847
848 let mut total_stake = 0;
849 highest_round = 0;
850
851 let mut results = FuturesUnordered::new();
853
854 for (authority_index, _authority) in context.committee.authorities() {
855 if authority_index != context.own_index {
856 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
857 }
858 }
859
860 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
862 tokio::pin!(timer);
863
864 'inner: loop {
865 tokio::select! {
866 result = results.next() => {
867 let Some((result, authority_index)) = result else {
868 break 'inner;
869 };
870 match result {
871 Ok(result) => {
872 match process_blocks(result, authority_index) {
873 Ok(blocks) => {
874 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
875 highest_round = highest_round.max(max_round);
876
877 total_stake += context.committee.stake(authority_index);
878 },
879 Err(err) => {
880 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
881 }
882 }
883 },
884 Err(err) => {
885 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
886 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
887 }
888 }
889 },
890 () = &mut timer => {
891 info!("Timeout while trying to sync our own last block from peers");
892 break 'inner;
893 }
894 }
895 }
896
897 if context.committee.reached_validity(total_stake) {
899 info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
900 break 'main;
901 }
902
903 retries += 1;
904 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
905 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
906
907 sleep(retry_delay_step).await;
908
909 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
910 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
911 }
912
913 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
915
916 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
917 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
918 }
919 }));
920 }
921
922 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
923 if self.context.committee.size() == 1 {
924 trace!(
925 "Only one node in the network, will not try fetching missing blocks from peers."
926 );
927 return Ok(());
928 }
929
930 if !self.should_run_periodic_sync() {
934 return Ok(());
935 }
936
937 let context = self.context.clone();
938 let network_client = self.network_client.clone();
939 let block_verifier = self.block_verifier.clone();
940 let transaction_vote_tracker = self.transaction_vote_tracker.clone();
941 let commit_vote_monitor = self.commit_vote_monitor.clone();
942 let core_dispatcher = self.core_dispatcher.clone();
943 let blocks_to_fetch = self.inflight_blocks_map.clone();
944 let commands_sender = self.commands_sender.clone();
945 let dag_state = self.dag_state.clone();
946 let round_tracker = self.round_tracker.clone();
947 let peers_pool = self.peers_pool.clone();
948
949 let mut missing_blocks = self
950 .core_dispatcher
951 .get_missing_blocks()
952 .await
953 .map_err(|_err| ConsensusError::Shutdown)?;
954 if self.commit_sync_failover {
955 let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
958 missing_blocks.retain(|block| block.round <= fetch_after_rounds[block.author.value()]);
959 } else if missing_blocks.is_empty() {
960 return Ok(());
961 }
962
963 self.fetch_blocks_scheduler_task
964 .spawn(monitored_future!(async move {
965 let _scope = monitored_scope("FetchMissingBlocksScheduler");
966 context
967 .metrics
968 .node_metrics
969 .fetch_blocks_scheduler_inflight
970 .inc();
971 let total_requested = missing_blocks.len();
972
973 let results = if missing_blocks.is_empty() {
974 let _scope = monitored_scope("BlockSync::Periodic::HighestAcceptedRounds");
975 Self::fetch_blocks_with_fetch_after_rounds(
977 context.clone(),
978 blocks_to_fetch.clone(),
979 network_client,
980 dag_state,
981 peers_pool,
982 )
983 .await
984 } else {
985 let _scope = monitored_scope("BlockSync::Periodic::MissingBlocks");
986 Self::fetch_blocks_from_peers(
988 context.clone(),
989 blocks_to_fetch.clone(),
990 network_client,
991 missing_blocks,
992 dag_state,
993 peers_pool,
994 )
995 .await
996 };
997 context
998 .metrics
999 .node_metrics
1000 .fetch_blocks_scheduler_inflight
1001 .dec();
1002 if results.is_empty() {
1003 return;
1004 }
1005
1006 fail_point_async!("consensus-delay");
1007
1008 let mut total_fetched = 0;
1010 for (blocks_guard, fetched_blocks, peer) in results {
1011 total_fetched += fetched_blocks.len();
1012
1013 if let Err(err) = Self::process_fetched_blocks(
1014 fetched_blocks,
1015 peer.clone(),
1016 blocks_guard,
1017 core_dispatcher.clone(),
1018 block_verifier.clone(),
1019 transaction_vote_tracker.clone(),
1020 commit_vote_monitor.clone(),
1021 context.clone(),
1022 commands_sender.clone(),
1023 round_tracker.clone(),
1024 "periodic",
1025 )
1026 .await
1027 {
1028 warn!(
1029 "Error occurred while processing fetched blocks from peer {:?}: {err}",
1030 peer
1031 );
1032 let peer_name = peer.labelname(&context);
1033 context
1034 .metrics
1035 .node_metrics
1036 .synchronizer_process_fetched_failures
1037 .with_label_values(&[peer_name.as_str(), "periodic"])
1038 .inc();
1039 }
1040 }
1041
1042 debug!(
1043 "Total blocks requested to fetch: {}, total fetched: {}",
1044 total_requested, total_fetched
1045 );
1046 }));
1047
1048 Ok(())
1049 }
1050
1051 fn should_run_periodic_sync(&mut self) -> bool {
1052 let current_commit_index = self.dag_state.read().last_commit_index();
1053 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1054 let now = Instant::now();
1055 let metrics = &self.context.metrics.node_metrics;
1056
1057 if !is_commit_lagging(
1059 self.context.as_ref(),
1060 current_commit_index,
1061 quorum_commit_index,
1062 ) {
1063 metrics
1064 .synchronizer_periodic_sync_decision
1065 .with_label_values(&["true", "default"])
1066 .inc();
1067 self.last_changed_commit_index = current_commit_index;
1069 self.last_commit_change_time = now;
1070 self.commit_sync_failover = false;
1071 return true;
1073 }
1074 if self.commit_sync_failover {
1078 if current_commit_index
1079 < self.last_changed_commit_index + self.context.parameters.commit_sync_batch_size
1080 {
1081 metrics
1082 .synchronizer_periodic_sync_decision
1083 .with_label_values(&["true", "commit_catchup::run"])
1084 .inc();
1085 return true;
1089 } else {
1090 metrics
1091 .synchronizer_periodic_sync_decision
1092 .with_label_values(&["false", "commit_catchup::end"])
1093 .inc();
1094 self.last_changed_commit_index = current_commit_index;
1096 self.last_commit_change_time = now;
1097 self.commit_sync_failover = false;
1098 return false;
1100 }
1101 }
1102
1103 if current_commit_index == self.last_changed_commit_index {
1105 if now.duration_since(self.last_commit_change_time) >= COMMIT_PROGRESS_TIMEOUT {
1107 metrics
1108 .synchronizer_periodic_sync_decision
1109 .with_label_values(&["true", "commit_catchup::start"])
1110 .inc();
1111 self.commit_sync_failover = true;
1112 return true;
1114 }
1115 } else {
1116 self.last_changed_commit_index = current_commit_index;
1118 self.last_commit_change_time = now;
1119 }
1120
1121 metrics
1122 .synchronizer_periodic_sync_decision
1123 .with_label_values(&["false", "commit_lag"])
1124 .inc();
1125 false
1126 }
1127
1128 async fn fetch_blocks_with_fetch_after_rounds(
1131 context: Arc<Context>,
1132 inflight_blocks: Arc<InflightBlocksMap>,
1133 network_client: Arc<SynchronizerClient<VC, OC>>,
1134 dag_state: Arc<RwLock<DagState>>,
1135 peers_pool: Arc<PeersPool>,
1136 ) -> Vec<(BlocksGuard, Vec<Bytes>, PeerId)> {
1137 let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
1138
1139 let mut peers = peers_pool.get_known_peers();
1142
1143 assert!(!peers.is_empty(), "No known peers to fetch blocks from");
1146
1147 if cfg!(not(test)) {
1148 peers.shuffle(&mut ThreadRng::default());
1149 }
1150
1151 let peer = peers.first().unwrap().clone();
1152
1153 let response = timeout(
1154 FETCH_REQUEST_TIMEOUT,
1155 network_client.fetch_blocks(
1156 peer.clone(),
1157 vec![],
1158 fetch_after_rounds,
1159 false,
1160 FETCH_REQUEST_TIMEOUT,
1161 ),
1162 )
1163 .await;
1164
1165 let serialized_blocks = match response {
1166 Ok(Ok(blocks)) => blocks,
1167 Ok(Err(err)) => {
1168 debug!("Failed to fetch blocks with fetch_after_rounds from peer {peer}: {err}");
1169 return vec![];
1170 }
1171 Err(_) => {
1172 debug!("Timed out fetching blocks with fetch_after_rounds from peer {peer}");
1173 return vec![];
1174 }
1175 };
1176
1177 let blocks_guard = BlocksGuard {
1178 map: inflight_blocks,
1179 block_refs: BTreeSet::new(),
1180 peer: peer.clone(),
1181 };
1182
1183 vec![(blocks_guard, serialized_blocks, peer)]
1184 }
1185
1186 async fn fetch_blocks_from_peers(
1192 context: Arc<Context>,
1193 inflight_blocks: Arc<InflightBlocksMap>,
1194 network_client: Arc<SynchronizerClient<VC, OC>>,
1195 missing_blocks: BTreeSet<BlockRef>,
1196 dag_state: Arc<RwLock<DagState>>,
1197 peers_pool: Arc<PeersPool>,
1198 ) -> Vec<(BlocksGuard, Vec<Bytes>, PeerId)> {
1199 let missing_blocks = missing_blocks
1203 .into_iter()
1204 .take(2 * MAX_PERIODIC_SYNC_PEERS * context.parameters.max_blocks_per_fetch)
1205 .collect::<Vec<_>>();
1206
1207 let mut authorities = BTreeMap::<AuthorityIndex, Vec<BlockRef>>::new();
1209 for block_ref in &missing_blocks {
1210 authorities
1211 .entry(block_ref.author)
1212 .or_default()
1213 .push(*block_ref);
1214 }
1215
1216 let mut peers = peers_pool.get_known_peers();
1218
1219 assert!(!peers.is_empty(), "No known peers to fetch blocks from");
1224
1225 let num_authorities_per_peer = authorities
1226 .len()
1227 .div_ceil(peers.len().min(MAX_PERIODIC_SYNC_PEERS));
1228
1229 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1231 for (authority, blocks) in &authorities {
1232 missing_blocks_per_authority[*authority] += blocks.len();
1233 }
1234 for (missing, (_, authority)) in missing_blocks_per_authority
1235 .into_iter()
1236 .zip_debug_eq(context.committee.authorities())
1237 {
1238 context
1239 .metrics
1240 .node_metrics
1241 .synchronizer_missing_blocks_by_authority
1242 .with_label_values(&[&authority.hostname])
1243 .inc_by(missing as u64);
1244 context
1245 .metrics
1246 .node_metrics
1247 .synchronizer_current_missing_blocks_by_authority
1248 .with_label_values(&[&authority.hostname])
1249 .set(missing as i64);
1250 }
1251
1252 if cfg!(not(test)) {
1254 peers.shuffle(&mut ThreadRng::default());
1256 }
1257
1258 let mut peers = peers.into_iter();
1259 let mut request_futures = FuturesUnordered::new();
1260
1261 let mut authorities = authorities.into_values().collect::<Vec<_>>();
1263 if cfg!(not(test)) {
1264 authorities.shuffle(&mut ThreadRng::default());
1266 }
1267
1268 let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
1269
1270 for batch in authorities.chunks(num_authorities_per_peer) {
1272 let Some(peer) = peers.next() else {
1273 debug_fatal!("No more peers left to fetch blocks!");
1274 break;
1275 };
1276 let peer_name = peer.hostname(&context);
1277 let block_refs = batch
1280 .iter()
1281 .flatten()
1282 .cloned()
1283 .collect::<BTreeSet<_>>()
1284 .into_iter()
1285 .take(context.parameters.max_blocks_per_fetch)
1286 .collect::<BTreeSet<_>>();
1287
1288 if let Some(blocks_guard) =
1290 inflight_blocks.lock_blocks(block_refs.clone(), peer.clone())
1291 {
1292 info!(
1293 "Periodic sync of {} missing blocks from peer {} {:?}: {}",
1294 peer_name.as_str(),
1295 block_refs.len(),
1296 peer,
1297 block_refs
1298 .iter()
1299 .map(|b| b.to_string())
1300 .collect::<Vec<_>>()
1301 .join(", ")
1302 );
1303 request_futures.push(Self::fetch_blocks_request(
1304 network_client.clone(),
1305 peer,
1306 blocks_guard,
1307 fetch_after_rounds.clone(),
1308 false,
1309 FETCH_REQUEST_TIMEOUT,
1310 1,
1311 ));
1312 }
1313 }
1314
1315 let mut results = Vec::new();
1316 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1317
1318 tokio::pin!(fetcher_timeout);
1319
1320 loop {
1321 tokio::select! {
1322 Some((response, blocks_guard, _retries, peer, fetch_after_rounds)) = request_futures.next() => {
1323 match response {
1324 Ok(fetched_blocks) => {
1325 results.push((blocks_guard, fetched_blocks, peer));
1326
1327 if request_futures.is_empty() {
1329 break;
1330 }
1331 },
1332 Err(_) => {
1333 let peer_name = peer.labelname(&context);
1334 context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_name.as_str(), "periodic"]).inc();
1335 if let Some(next_peer) = peers.next() {
1337 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer.clone()) {
1339 info!(
1340 "Retrying syncing {} missing blocks from peer {:?}: {}",
1341 blocks_guard.block_refs.len(),
1342 next_peer,
1343 blocks_guard.block_refs
1344 .iter()
1345 .map(|b| b.to_string())
1346 .collect::<Vec<_>>()
1347 .join(", ")
1348 );
1349 request_futures.push(Self::fetch_blocks_request(
1350 network_client.clone(),
1351 next_peer,
1352 blocks_guard,
1353 fetch_after_rounds,
1354 false,
1355 FETCH_REQUEST_TIMEOUT,
1356 1,
1357 ));
1358 } else {
1359 debug!("Couldn't acquire locks to fetch blocks from peer {:?}.", next_peer)
1360 }
1361 } else {
1362 debug!("No more peers left to fetch blocks");
1363 }
1364 }
1365 }
1366 },
1367 _ = &mut fetcher_timeout => {
1368 debug!("Timed out while fetching missing blocks");
1369 break;
1370 }
1371 }
1372 }
1373
1374 results
1375 }
1376}
1377
1378#[cfg(test)]
1379mod tests {
1380 use std::{
1381 collections::{BTreeMap, BTreeSet},
1382 sync::Arc,
1383 time::Duration,
1384 };
1385
1386 use async_trait::async_trait;
1387 use bytes::Bytes;
1388 use consensus_config::{AuthorityIndex, Parameters};
1389 use consensus_types::block::{BlockDigest, BlockRef, Round};
1390 use mysten_metrics::monitored_mpsc;
1391 use parking_lot::RwLock;
1392 use tokio::{sync::Mutex, time::sleep};
1393
1394 use crate::commit::{CommitVote, TrustedCommit};
1395 use crate::{
1396 CommitDigest, CommitIndex,
1397 block::{TestBlock, VerifiedBlock},
1398 block_verifier::NoopBlockVerifier,
1399 commit_vote_monitor::CommitVoteMonitor,
1400 context::Context,
1401 core_thread::CoreThreadDispatcher,
1402 dag_state::DagState,
1403 error::{ConsensusError, ConsensusResult},
1404 network::{
1405 BlockStream, ObserverNetworkClient, PeerId, SynchronizerClient, ValidatorNetworkClient,
1406 },
1407 storage::mem_store::MemStore,
1408 synchronizer::{
1409 COMMIT_PROGRESS_TIMEOUT, FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT,
1410 InflightBlocksMap, Synchronizer,
1411 },
1412 };
1413 use crate::{
1414 commit_vote_monitor::COMMIT_LAG_MULTIPLIER, core_thread::MockCoreThreadDispatcher,
1415 peers_pool::PeersPool, round_tracker::RoundTracker,
1416 transaction_vote_tracker::TransactionVoteTracker,
1417 };
1418
1419 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1420 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1421 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1422 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1423
1424 #[derive(Default)]
1425 struct MockNetworkClient {
1426 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1427 fetch_latest_blocks_requests:
1428 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1429 }
1430
1431 impl MockNetworkClient {
1432 async fn stub_fetch_blocks(
1433 &self,
1434 blocks: Vec<VerifiedBlock>,
1435 peer: AuthorityIndex,
1436 latency: Option<Duration>,
1437 ) {
1438 let mut lock = self.fetch_blocks_requests.lock().await;
1439 let block_refs = blocks
1440 .iter()
1441 .map(|block| block.reference())
1442 .collect::<Vec<_>>();
1443 lock.insert((block_refs, peer), (blocks, latency));
1444 }
1445
1446 async fn stub_fetch_blocks_for_key(
1447 &self,
1448 key_refs: Vec<BlockRef>,
1449 response_blocks: Vec<VerifiedBlock>,
1450 peer: AuthorityIndex,
1451 latency: Option<Duration>,
1452 ) {
1453 let mut lock = self.fetch_blocks_requests.lock().await;
1454 lock.insert((key_refs, peer), (response_blocks, latency));
1455 }
1456
1457 async fn stub_fetch_latest_blocks(
1458 &self,
1459 blocks: Vec<VerifiedBlock>,
1460 peer: AuthorityIndex,
1461 authorities: Vec<AuthorityIndex>,
1462 latency: Option<Duration>,
1463 ) {
1464 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1465 lock.entry((peer, authorities))
1466 .or_default()
1467 .push((blocks, latency));
1468 }
1469
1470 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1471 let lock = self.fetch_latest_blocks_requests.lock().await;
1472 lock.len()
1473 }
1474 }
1475
1476 #[async_trait]
1477 impl ValidatorNetworkClient for MockNetworkClient {
1478 async fn subscribe_blocks(
1479 &self,
1480 _peer: AuthorityIndex,
1481 _last_received: Round,
1482 _timeout: Duration,
1483 ) -> ConsensusResult<BlockStream> {
1484 unimplemented!("subscribe_blocks not implemented in mock")
1485 }
1486
1487 async fn fetch_blocks(
1488 &self,
1489 peer: AuthorityIndex,
1490 block_refs: Vec<BlockRef>,
1491 _fetch_after_rounds: Vec<Round>,
1492 _fetch_missing_ancestors: bool,
1493 _timeout: Duration,
1494 ) -> ConsensusResult<Vec<Bytes>> {
1495 let mut lock = self.fetch_blocks_requests.lock().await;
1496 let response = lock.remove(&(block_refs.clone(), peer)).unwrap_or_else(|| {
1497 panic!(
1498 "Unexpected fetch blocks request made: {:?} {}. Current lock: {:?}",
1499 block_refs, peer, lock
1500 );
1501 });
1502
1503 let serialised = response
1504 .0
1505 .into_iter()
1506 .map(|block| block.serialized().clone())
1507 .collect::<Vec<_>>();
1508
1509 drop(lock);
1510
1511 if let Some(latency) = response.1 {
1512 sleep(latency).await;
1513 }
1514
1515 Ok(serialised)
1516 }
1517
1518 async fn fetch_commits(
1519 &self,
1520 _peer: AuthorityIndex,
1521 _commit_range: crate::commit::CommitRange,
1522 _timeout: Duration,
1523 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1524 unimplemented!("fetch_commits not implemented in mock")
1525 }
1526
1527 async fn fetch_latest_blocks(
1528 &self,
1529 peer: AuthorityIndex,
1530 authorities: Vec<AuthorityIndex>,
1531 _timeout: Duration,
1532 ) -> ConsensusResult<Vec<Bytes>> {
1533 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1534 let mut responses = lock
1535 .remove(&(peer, authorities.clone()))
1536 .expect("Unexpected fetch blocks request made");
1537
1538 let response = responses.remove(0);
1539 let serialised = response
1540 .0
1541 .into_iter()
1542 .map(|block| block.serialized().clone())
1543 .collect::<Vec<_>>();
1544
1545 if !responses.is_empty() {
1546 lock.insert((peer, authorities), responses);
1547 }
1548
1549 drop(lock);
1550
1551 if let Some(latency) = response.1 {
1552 sleep(latency).await;
1553 }
1554
1555 Ok(serialised)
1556 }
1557
1558 async fn get_latest_rounds(
1559 &self,
1560 _peer: AuthorityIndex,
1561 _timeout: Duration,
1562 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1563 unimplemented!("get_latest_rounds not implemented in mock")
1564 }
1565
1566 #[cfg(test)]
1567 async fn send_block(
1568 &self,
1569 _peer: AuthorityIndex,
1570 _block: &VerifiedBlock,
1571 _timeout: Duration,
1572 ) -> ConsensusResult<()> {
1573 unimplemented!("send_block not implemented in mock")
1574 }
1575 }
1576
1577 #[async_trait]
1578 impl ObserverNetworkClient for MockNetworkClient {
1579 async fn stream_blocks(
1580 &self,
1581 _peer: crate::network::PeerId,
1582 _highest_round_per_authority: Vec<u64>,
1583 _timeout: Duration,
1584 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1585 unimplemented!("stream_blocks not implemented in mock")
1586 }
1587
1588 async fn fetch_blocks(
1589 &self,
1590 _peer: crate::network::PeerId,
1591 _block_refs: Vec<BlockRef>,
1592 _fetch_after_rounds: Vec<Round>,
1593 _fetch_missing_ancestors: bool,
1594 _timeout: Duration,
1595 ) -> ConsensusResult<Vec<Bytes>> {
1596 unimplemented!("Observer fetch_blocks not implemented in mock")
1597 }
1598
1599 async fn fetch_commits(
1600 &self,
1601 _peer: crate::network::PeerId,
1602 _commit_range: crate::commit::CommitRange,
1603 _timeout: Duration,
1604 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1605 unimplemented!("Observer fetch_commits not implemented in mock")
1606 }
1607 }
1608
1609 #[test]
1610 fn test_inflight_blocks_map() {
1611 let map = InflightBlocksMap::new();
1613 let some_block_refs = [
1614 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1615 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1616 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1617 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1618 ];
1619 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1620
1621 {
1623 let mut all_guards = Vec::new();
1624
1625 for i in 1..=2 {
1627 let authority = AuthorityIndex::new_for_test(i);
1628 let peer = PeerId::Validator(authority);
1629
1630 let guard = map.lock_blocks(missing_block_refs.clone(), peer.clone());
1631 let guard = guard.expect("Guard should be created");
1632 assert_eq!(guard.block_refs.len(), 4);
1633
1634 all_guards.push(guard);
1635
1636 let guard = map.lock_blocks(missing_block_refs.clone(), peer);
1638 assert!(guard.is_none());
1639 }
1640
1641 let authority_3 = AuthorityIndex::new_for_test(3);
1643 let peer_3 = PeerId::Validator(authority_3);
1644
1645 let guard = map.lock_blocks(missing_block_refs.clone(), peer_3.clone());
1646 assert!(guard.is_none());
1647
1648 drop(all_guards.remove(0));
1650
1651 let guard = map.lock_blocks(missing_block_refs.clone(), peer_3);
1652 let guard = guard.expect("Guard should be successfully acquired");
1653
1654 assert_eq!(guard.block_refs, missing_block_refs);
1655
1656 drop(guard);
1658 drop(all_guards);
1659
1660 assert_eq!(map.num_of_locked_blocks(), 0);
1661 }
1662
1663 {
1665 let authority_1 = AuthorityIndex::new_for_test(1);
1667 let peer_1 = PeerId::Validator(authority_1);
1668 let guard = map.lock_blocks(missing_block_refs.clone(), peer_1).unwrap();
1669
1670 let authority_2 = AuthorityIndex::new_for_test(2);
1672 let peer_2 = PeerId::Validator(authority_2);
1673 let guard = map.swap_locks(guard, peer_2);
1674
1675 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1676 }
1677 }
1678
1679 #[tokio::test]
1680 async fn successful_fetch_blocks_from_peer() {
1681 let (context, _) = Context::new_for_test(4);
1683 let context = Arc::new(context);
1684 let block_verifier = Arc::new(NoopBlockVerifier {});
1685 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1686 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1687 let mock_client = Arc::new(MockNetworkClient::default());
1688 let store = Arc::new(MemStore::new());
1689 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1690 let transaction_vote_tracker =
1691 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1692 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1693
1694 let network_client = Arc::new(SynchronizerClient::new(
1695 context.clone(),
1696 Some(mock_client.clone()),
1697 Some(mock_client.clone()),
1698 ));
1699 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1700 let handle = Synchronizer::start(
1701 network_client,
1702 context.clone(),
1703 core_dispatcher.clone(),
1704 commit_vote_monitor,
1705 block_verifier,
1706 transaction_vote_tracker,
1707 round_tracker,
1708 dag_state,
1709 peers_pool.clone(),
1710 false,
1711 );
1712
1713 let expected_blocks = (0..10)
1715 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1716 .collect::<Vec<_>>();
1717 let missing_blocks = expected_blocks
1718 .iter()
1719 .map(|block| block.reference())
1720 .collect::<BTreeSet<_>>();
1721
1722 let peer = AuthorityIndex::new_for_test(1);
1724 mock_client
1725 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1726 .await;
1727
1728 assert!(
1730 handle
1731 .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1732 .await
1733 .is_ok()
1734 );
1735
1736 sleep(Duration::from_millis(1_000)).await;
1738
1739 let added_blocks = core_dispatcher.get_add_blocks().await;
1741 assert_eq!(added_blocks, expected_blocks);
1742 }
1743
1744 #[tokio::test]
1745 async fn saturate_fetch_blocks_from_peer() {
1746 let (context, _) = Context::new_for_test(4);
1748 let context = Arc::new(context);
1749 let block_verifier = Arc::new(NoopBlockVerifier {});
1750 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1751 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1752 let mock_client = Arc::new(MockNetworkClient::default());
1753 let store = Arc::new(MemStore::new());
1754 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1755 let transaction_vote_tracker =
1756 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1757 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1758
1759 let network_client = Arc::new(SynchronizerClient::new(
1760 context.clone(),
1761 Some(mock_client.clone()),
1762 Some(mock_client.clone()),
1763 ));
1764 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1765 let handle = Synchronizer::start(
1766 network_client,
1767 context.clone(),
1768 core_dispatcher.clone(),
1769 commit_vote_monitor,
1770 block_verifier,
1771 transaction_vote_tracker,
1772 round_tracker,
1773 dag_state,
1774 peers_pool.clone(),
1775 false,
1776 );
1777
1778 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1780 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1781 .collect::<Vec<_>>();
1782
1783 let peer = AuthorityIndex::new_for_test(1);
1785 let mut iter = expected_blocks.iter().peekable();
1786 while let Some(block) = iter.next() {
1787 mock_client
1790 .stub_fetch_blocks(
1791 vec![block.clone()],
1792 peer,
1793 Some(Duration::from_millis(5_000)),
1794 )
1795 .await;
1796
1797 let mut missing_blocks = BTreeSet::new();
1798 missing_blocks.insert(block.reference());
1799
1800 if iter.peek().is_none() {
1803 match handle
1804 .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1805 .await
1806 {
1807 Err(ConsensusError::SynchronizerSaturated(peer_str)) => {
1808 assert_eq!(peer_str, format!("{:?}", PeerId::Validator(peer)));
1809 }
1810 _ => panic!("A saturated synchronizer error was expected"),
1811 }
1812 } else {
1813 assert!(
1814 handle
1815 .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1816 .await
1817 .is_ok()
1818 );
1819 }
1820 }
1821 }
1822
1823 #[tokio::test(flavor = "current_thread", start_paused = true)]
1824 async fn synchronizer_periodic_task_fetch_blocks() {
1825 let (context, _) = Context::new_for_test(4);
1827 let context = Arc::new(context);
1828 let block_verifier = Arc::new(NoopBlockVerifier {});
1829 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1830 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1831 let mock_client = Arc::new(MockNetworkClient::default());
1832 let store = Arc::new(MemStore::new());
1833 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1834 let transaction_vote_tracker =
1835 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1836 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1837
1838 let expected_blocks = (0..10)
1840 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1841 .collect::<Vec<_>>();
1842 let missing_blocks = expected_blocks
1843 .iter()
1844 .map(|block| block.reference())
1845 .collect::<BTreeSet<_>>();
1846
1847 core_dispatcher
1849 .stub_missing_blocks(missing_blocks.clone())
1850 .await;
1851
1852 mock_client
1856 .stub_fetch_blocks(
1857 expected_blocks.clone(),
1858 AuthorityIndex::new_for_test(1),
1859 Some(FETCH_REQUEST_TIMEOUT),
1860 )
1861 .await;
1862 mock_client
1863 .stub_fetch_blocks(
1864 expected_blocks.clone(),
1865 AuthorityIndex::new_for_test(2),
1866 None,
1867 )
1868 .await;
1869
1870 let network_client = Arc::new(SynchronizerClient::new(
1872 context.clone(),
1873 Some(mock_client.clone()),
1874 Some(mock_client.clone()),
1875 ));
1876 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1877 let _handle = Synchronizer::start(
1878 network_client,
1879 context.clone(),
1880 core_dispatcher.clone(),
1881 commit_vote_monitor,
1882 block_verifier,
1883 transaction_vote_tracker,
1884 round_tracker,
1885 dag_state,
1886 peers_pool.clone(),
1887 false,
1888 );
1889
1890 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1891
1892 let added_blocks = core_dispatcher.get_add_blocks().await;
1894 assert_eq!(added_blocks, expected_blocks);
1895
1896 assert!(
1898 core_dispatcher
1899 .get_missing_blocks()
1900 .await
1901 .unwrap()
1902 .is_empty()
1903 );
1904 }
1905
1906 #[tokio::test(flavor = "current_thread", start_paused = true)]
1907 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1908 let (context, _) = Context::new_for_test(4);
1910 let context = Arc::new(context);
1911 let block_verifier = Arc::new(NoopBlockVerifier {});
1912 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1913 let mock_client = Arc::new(MockNetworkClient::default());
1914 let store = Arc::new(MemStore::new());
1915 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1916 let transaction_vote_tracker =
1917 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1918 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1919 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1920
1921 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1923 let stub_blocks = (sync_missing_block_round_threshold * 2
1924 ..sync_missing_block_round_threshold * 3)
1925 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1926 .collect::<Vec<_>>();
1927 let missing_blocks = stub_blocks
1928 .iter()
1929 .map(|block| block.reference())
1930 .collect::<BTreeSet<_>>();
1931 core_dispatcher
1932 .stub_missing_blocks(missing_blocks.clone())
1933 .await;
1934
1935 mock_client
1940 .stub_fetch_blocks(
1941 stub_blocks.clone(),
1942 AuthorityIndex::new_for_test(1),
1943 Some(FETCH_REQUEST_TIMEOUT),
1944 )
1945 .await;
1946 mock_client
1947 .stub_fetch_blocks(stub_blocks.clone(), AuthorityIndex::new_for_test(2), None)
1948 .await;
1949 let mut expected_blocks = stub_blocks
1950 .iter()
1951 .take(context.parameters.max_blocks_per_sync)
1952 .cloned()
1953 .collect::<Vec<_>>();
1954
1955 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1957 let commit_index: CommitIndex = round - 1;
1958 let blocks = (0..4)
1959 .map(|authority| {
1960 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1961 let block = TestBlock::new(round, authority)
1962 .set_commit_votes(commit_votes)
1963 .build();
1964
1965 VerifiedBlock::new_for_test(block)
1966 })
1967 .collect::<Vec<_>>();
1968
1969 for block in blocks {
1972 commit_vote_monitor.observe_block(&block);
1973 }
1974
1975 let network_client = Arc::new(SynchronizerClient::new(
1977 context.clone(),
1978 Some(mock_client.clone()),
1979 Some(mock_client.clone()),
1980 ));
1981 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1982 let _handle = Synchronizer::start(
1983 network_client,
1984 context.clone(),
1985 core_dispatcher.clone(),
1986 commit_vote_monitor.clone(),
1987 block_verifier,
1988 transaction_vote_tracker,
1989 round_tracker,
1990 dag_state.clone(),
1991 peers_pool.clone(),
1992 false,
1993 );
1994
1995 sleep(COMMIT_PROGRESS_TIMEOUT / 2).await;
1998
1999 let added_blocks = core_dispatcher.get_add_blocks().await;
2002 assert_eq!(added_blocks, vec![]);
2003
2004 {
2007 let mut d = dag_state.write();
2008 for index in 1..=commit_index {
2009 let commit =
2010 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2011
2012 d.add_commit(commit);
2013 }
2014
2015 assert_eq!(
2016 d.last_commit_index(),
2017 commit_vote_monitor.quorum_commit_index()
2018 );
2019 }
2020
2021 core_dispatcher
2023 .stub_missing_blocks(missing_blocks.clone())
2024 .await;
2025
2026 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2027
2028 let mut added_blocks = core_dispatcher.get_add_blocks().await;
2030
2031 added_blocks.sort_by_key(|block| block.reference());
2032 expected_blocks.sort_by_key(|block| block.reference());
2033
2034 assert_eq!(added_blocks, expected_blocks);
2035 }
2036
2037 #[tokio::test(flavor = "current_thread", start_paused = true)]
2038 async fn synchronizer_periodic_sync_resumes_when_commit_sync_stalled() {
2039 let (context, _) = Context::new_for_test(4);
2041 let context = Arc::new(context);
2042 let block_verifier = Arc::new(NoopBlockVerifier {});
2043 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2044 let mock_client = Arc::new(MockNetworkClient::default());
2045 let store = Arc::new(MemStore::new());
2046 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2047 let transaction_vote_tracker =
2048 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2049 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2050 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2051
2052 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2054 let stub_blocks = (sync_missing_block_round_threshold * 2
2055 ..sync_missing_block_round_threshold * 3)
2056 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2057 .collect::<Vec<_>>();
2058 let missing_blocks = stub_blocks
2059 .iter()
2060 .map(|block| block.reference())
2061 .collect::<BTreeSet<_>>();
2062 core_dispatcher
2063 .stub_missing_blocks(missing_blocks.clone())
2064 .await;
2065
2066 mock_client
2069 .stub_fetch_blocks(
2070 stub_blocks.clone(),
2071 AuthorityIndex::new_for_test(1),
2072 Some(FETCH_REQUEST_TIMEOUT),
2073 )
2074 .await;
2075 mock_client
2076 .stub_fetch_blocks(stub_blocks.clone(), AuthorityIndex::new_for_test(2), None)
2077 .await;
2078 let mut expected_blocks = stub_blocks
2079 .iter()
2080 .take(context.parameters.max_blocks_per_sync)
2081 .cloned()
2082 .collect::<Vec<_>>();
2083
2084 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2086 let commit_index: CommitIndex = round - 1;
2087 let blocks = (0..4)
2088 .map(|authority| {
2089 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2090 let block = TestBlock::new(round, authority)
2091 .set_commit_votes(commit_votes)
2092 .build();
2093 VerifiedBlock::new_for_test(block)
2094 })
2095 .collect::<Vec<_>>();
2096 for block in blocks {
2097 commit_vote_monitor.observe_block(&block);
2098 }
2099
2100 let network_client = Arc::new(SynchronizerClient::new(
2102 context.clone(),
2103 Some(mock_client.clone()),
2104 Some(mock_client.clone()),
2105 ));
2106 let peers_pool = Arc::new(PeersPool::new(context.clone()));
2107 let _handle = Synchronizer::start(
2108 network_client,
2109 context.clone(),
2110 core_dispatcher.clone(),
2111 commit_vote_monitor.clone(),
2112 block_verifier,
2113 transaction_vote_tracker,
2114 round_tracker,
2115 dag_state.clone(),
2116 peers_pool.clone(),
2117 false,
2118 );
2119
2120 sleep(COMMIT_PROGRESS_TIMEOUT - Duration::from_millis(100)).await;
2122 let added_blocks = core_dispatcher.get_add_blocks().await;
2123 assert_eq!(added_blocks, vec![]);
2124
2125 core_dispatcher
2127 .stub_missing_blocks(missing_blocks.clone())
2128 .await;
2129 mock_client
2132 .stub_fetch_blocks_for_key(
2133 vec![],
2134 expected_blocks.clone(),
2135 AuthorityIndex::new_for_test(1),
2136 Some(Duration::from_millis(500)),
2137 )
2138 .await;
2139
2140 sleep(Duration::from_millis(200) + FETCH_REQUEST_TIMEOUT).await;
2142
2143 let mut added_blocks = core_dispatcher.get_add_blocks().await;
2144 assert!(
2145 !added_blocks.is_empty(),
2146 "Expected periodic sync to resume after commit sync stall"
2147 );
2148 added_blocks.sort_by_key(|block| block.reference());
2149 expected_blocks.sort_by_key(|block| block.reference());
2150 assert_eq!(added_blocks, expected_blocks);
2151
2152 core_dispatcher.get_add_blocks().await;
2155 {
2156 let current = dag_state.read().last_commit_index();
2157 let mut d = dag_state.write();
2158 for index in (current + 1)..=(current + context.parameters.commit_sync_batch_size) {
2160 let commit =
2161 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2162 d.add_commit(commit);
2163 }
2164 }
2165
2166 core_dispatcher
2168 .stub_missing_blocks(missing_blocks.clone())
2169 .await;
2170 mock_client
2171 .stub_fetch_blocks(
2172 stub_blocks
2173 .iter()
2174 .take(context.parameters.max_blocks_per_sync)
2175 .cloned()
2176 .collect::<Vec<_>>(),
2177 AuthorityIndex::new_for_test(2),
2178 None,
2179 )
2180 .await;
2181
2182 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2185 let added_blocks = core_dispatcher.get_add_blocks().await;
2186 assert_eq!(
2187 added_blocks,
2188 vec![],
2189 "Expected periodic sync to be skipped after stall resolved"
2190 );
2191 }
2192
2193 #[tokio::test(flavor = "current_thread", start_paused = true)]
2194 async fn synchronizer_fetch_own_last_block() {
2195 let (context, _) = Context::new_for_test(4);
2197 let context = Arc::new(context.with_parameters(Parameters {
2198 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2199 ..Default::default()
2200 }));
2201 let block_verifier = Arc::new(NoopBlockVerifier {});
2202 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2203 let mock_client = Arc::new(MockNetworkClient::default());
2204 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2205 let store = Arc::new(MemStore::new());
2206 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2207 let transaction_vote_tracker =
2208 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2209 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2210 let our_index = AuthorityIndex::new_for_test(0);
2211
2212 let mut expected_blocks = (9..=10)
2214 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2215 .collect::<Vec<_>>();
2216
2217 let block_1 = expected_blocks.pop().unwrap();
2220 mock_client
2221 .stub_fetch_latest_blocks(
2222 vec![block_1.clone()],
2223 AuthorityIndex::new_for_test(1),
2224 vec![our_index],
2225 None,
2226 )
2227 .await;
2228 mock_client
2229 .stub_fetch_latest_blocks(
2230 vec![block_1],
2231 AuthorityIndex::new_for_test(1),
2232 vec![our_index],
2233 None,
2234 )
2235 .await;
2236
2237 let block_2 = expected_blocks.pop().unwrap();
2239 mock_client
2240 .stub_fetch_latest_blocks(
2241 vec![block_2.clone()],
2242 AuthorityIndex::new_for_test(2),
2243 vec![our_index],
2244 Some(Duration::from_secs(10)),
2245 )
2246 .await;
2247 mock_client
2248 .stub_fetch_latest_blocks(
2249 vec![block_2],
2250 AuthorityIndex::new_for_test(2),
2251 vec![our_index],
2252 None,
2253 )
2254 .await;
2255
2256 mock_client
2258 .stub_fetch_latest_blocks(
2259 vec![],
2260 AuthorityIndex::new_for_test(3),
2261 vec![our_index],
2262 Some(Duration::from_secs(10)),
2263 )
2264 .await;
2265 mock_client
2266 .stub_fetch_latest_blocks(
2267 vec![],
2268 AuthorityIndex::new_for_test(3),
2269 vec![our_index],
2270 None,
2271 )
2272 .await;
2273
2274 let network_client = Arc::new(SynchronizerClient::new(
2276 context.clone(),
2277 Some(mock_client.clone()),
2278 Some(mock_client.clone()),
2279 ));
2280 let peers_pool = Arc::new(PeersPool::new(context.clone()));
2281 let handle = Synchronizer::start(
2282 network_client,
2283 context.clone(),
2284 core_dispatcher.clone(),
2285 commit_vote_monitor,
2286 block_verifier,
2287 transaction_vote_tracker,
2288 round_tracker,
2289 dag_state,
2290 peers_pool.clone(),
2291 true,
2292 );
2293
2294 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2296
2297 assert_eq!(
2299 core_dispatcher.get_last_own_proposed_round().await,
2300 vec![10]
2301 );
2302
2303 assert_eq!(mock_client.fetch_latest_blocks_pending_calls().await, 0);
2305
2306 assert_eq!(
2308 context
2309 .metrics
2310 .node_metrics
2311 .sync_last_known_own_block_retries
2312 .get(),
2313 1
2314 );
2315
2316 if let Err(err) = handle.stop().await
2318 && err.is_panic()
2319 {
2320 std::panic::resume_unwind(err.into_panic());
2321 }
2322 }
2323
2324 #[tokio::test]
2325 async fn test_process_fetched_blocks() {
2326 let (context, _) = Context::new_for_test(4);
2328 let context = Arc::new(context);
2329 let block_verifier = Arc::new(NoopBlockVerifier {});
2330 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2331 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2332 let store = Arc::new(MemStore::new());
2333 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2334 let transaction_vote_tracker =
2335 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2336 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2337 let (commands_sender, _commands_receiver) =
2338 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2339
2340 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2344 expected_blocks.extend(
2345 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2346 );
2347 assert_eq!(
2348 expected_blocks.len(),
2349 context.parameters.max_blocks_per_sync
2350 );
2351
2352 let expected_serialized_blocks = expected_blocks
2353 .iter()
2354 .map(|b| b.serialized().clone())
2355 .collect::<Vec<_>>();
2356
2357 let expected_block_refs = expected_blocks
2358 .iter()
2359 .map(|b| b.reference())
2360 .collect::<BTreeSet<_>>();
2361
2362 let peer_index = AuthorityIndex::new_for_test(2);
2364 let peer = PeerId::Validator(peer_index);
2365
2366 let inflight_blocks_map = InflightBlocksMap::new();
2368 let blocks_guard = inflight_blocks_map
2369 .lock_blocks(expected_block_refs.clone(), peer.clone())
2370 .expect("Failed to lock blocks");
2371
2372 assert_eq!(
2373 inflight_blocks_map.num_of_locked_blocks(),
2374 expected_block_refs.len()
2375 );
2376
2377 let result = Synchronizer::<
2379 NoopBlockVerifier,
2380 MockCoreThreadDispatcher,
2381 MockNetworkClient,
2382 MockNetworkClient,
2383 >::process_fetched_blocks(
2384 expected_serialized_blocks,
2385 peer,
2386 blocks_guard, core_dispatcher.clone(),
2388 block_verifier,
2389 transaction_vote_tracker,
2390 commit_vote_monitor,
2391 context.clone(),
2392 commands_sender,
2393 round_tracker,
2394 "test",
2395 )
2396 .await;
2397
2398 assert!(result.is_ok());
2400
2401 let added_blocks = core_dispatcher.get_add_blocks().await;
2403 assert_eq!(
2404 added_blocks
2405 .iter()
2406 .map(|b| b.reference())
2407 .collect::<BTreeSet<_>>(),
2408 expected_block_refs,
2409 );
2410
2411 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2413 }
2414}