1use std::{
4 collections::{BTreeMap, BTreeSet},
5 sync::Arc,
6 time::Duration,
7};
8
9use bytes::Bytes;
10use consensus_config::AuthorityIndex;
11use consensus_types::block::{BlockRef, Round};
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use itertools::Itertools as _;
14use mysten_common::{ZipDebugEqIteratorExt, debug_fatal};
15use mysten_metrics::{
16 monitored_future,
17 monitored_mpsc::{Receiver, Sender, channel},
18 monitored_scope,
19};
20use parking_lot::{Mutex, RwLock};
21use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
22use sui_macros::fail_point_async;
23use tap::TapFallible;
24use tokio::{
25 runtime::Handle,
26 sync::{mpsc::error::TrySendError, oneshot},
27 task::{JoinError, JoinSet},
28 time::{Instant, sleep, sleep_until, timeout},
29};
30use tracing::{debug, info, trace, warn};
31
32use crate::{
33 BlockAPI,
34 block::{ExtendedBlock, SignedBlock, VerifiedBlock},
35 block_verifier::BlockVerifier,
36 commit::CommitIndex,
37 commit_vote_monitor::CommitVoteMonitor,
38 context::Context,
39 dag_state::DagState,
40 error::{ConsensusError, ConsensusResult},
41 network::{ObserverNetworkClient, PeerId, SynchronizerClient, ValidatorNetworkClient},
42 peers_pool::PeersPool,
43 round_tracker::RoundTracker,
44};
45use crate::{
46 authority_service::COMMIT_LAG_MULTIPLIER, core_thread::CoreThreadDispatcher,
47 transaction_vote_tracker::TransactionVoteTracker,
48};
49
50const FETCH_BLOCKS_CONCURRENCY: usize = 5;
52
53const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
55const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
56
57const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
58
59const MAX_PERIODIC_SYNC_PEERS: usize = 3;
61
62const COMMIT_PROGRESS_TIMEOUT: Duration = Duration::from_secs(10);
64
65struct BlocksGuard {
66 map: Arc<InflightBlocksMap>,
67 block_refs: BTreeSet<BlockRef>,
68 peer: PeerId,
69}
70
71impl Drop for BlocksGuard {
72 fn drop(&mut self) {
73 self.map.unlock_blocks(&self.block_refs, self.peer.clone());
74 }
75}
76
77struct InflightBlocksMap {
82 inner: Mutex<BTreeMap<BlockRef, BTreeSet<PeerId>>>,
83}
84
85impl InflightBlocksMap {
86 fn new() -> Arc<Self> {
87 Arc::new(Self {
88 inner: Mutex::new(BTreeMap::new()),
89 })
90 }
91
92 fn lock_blocks(
98 self: &Arc<Self>,
99 missing_block_refs: BTreeSet<BlockRef>,
100 peer: PeerId,
101 ) -> Option<BlocksGuard> {
102 let mut blocks = BTreeSet::new();
103 let mut inner = self.inner.lock();
104
105 for block_ref in missing_block_refs {
106 let peers = inner.entry(block_ref).or_default();
109 if peers.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK && peers.get(&peer).is_none() {
110 assert!(peers.insert(peer.clone()));
111 blocks.insert(block_ref);
112 }
113 }
114
115 if blocks.is_empty() {
116 None
117 } else {
118 Some(BlocksGuard {
119 map: self.clone(),
120 block_refs: blocks,
121 peer,
122 })
123 }
124 }
125
126 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: PeerId) {
130 let mut blocks_to_fetch = self.inner.lock();
132 for block_ref in block_refs {
133 let peers = blocks_to_fetch
134 .get_mut(block_ref)
135 .expect("Should have found a non empty map");
136
137 assert!(peers.remove(&peer), "Peer should be present!");
138
139 if peers.is_empty() {
141 blocks_to_fetch.remove(block_ref);
142 }
143 }
144 }
145
146 fn swap_locks(
150 self: &Arc<Self>,
151 blocks_guard: BlocksGuard,
152 peer: PeerId,
153 ) -> Option<BlocksGuard> {
154 let block_refs = blocks_guard.block_refs.clone();
155
156 drop(blocks_guard);
158
159 self.lock_blocks(block_refs, peer)
161 }
162
163 #[cfg(test)]
164 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
165 let inner = self.inner.lock();
166 inner.len()
167 }
168}
169
170enum Command {
171 FetchBlocks {
172 missing_block_refs: BTreeSet<BlockRef>,
173 peer: PeerId,
174 result: oneshot::Sender<Result<(), ConsensusError>>,
175 },
176 FetchOwnLastBlock,
177 KickOffScheduler,
178}
179
180pub(crate) struct SynchronizerHandle {
181 commands_sender: Sender<Command>,
182 tasks: tokio::sync::Mutex<JoinSet<()>>,
183}
184
185impl SynchronizerHandle {
186 pub(crate) async fn fetch_blocks(
189 &self,
190 missing_block_refs: BTreeSet<BlockRef>,
191 peer: PeerId,
192 ) -> ConsensusResult<()> {
193 let (sender, receiver) = oneshot::channel();
194 self.commands_sender
195 .send(Command::FetchBlocks {
196 missing_block_refs,
197 peer,
198 result: sender,
199 })
200 .await
201 .map_err(|_err| ConsensusError::Shutdown)?;
202 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
203 }
204
205 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
206 let mut tasks = self.tasks.lock().await;
207 tasks.abort_all();
208 while let Some(result) = tasks.join_next().await {
209 result?
210 }
211 Ok(())
212 }
213
214 #[cfg(test)]
215 pub(crate) fn new_for_test() -> Arc<Self> {
217 use tokio::task::JoinSet;
218 let (tx, _rx) = channel("test_synchronizer", 1);
219 Arc::new(Self {
220 commands_sender: tx,
221 tasks: tokio::sync::Mutex::new(JoinSet::new()),
222 })
223 }
224}
225
226pub(crate) struct Synchronizer<
248 V: BlockVerifier,
249 D: CoreThreadDispatcher,
250 VC: ValidatorNetworkClient,
251 OC: ObserverNetworkClient,
252> {
253 context: Arc<Context>,
254 commands_receiver: Receiver<Command>,
255 fetch_block_senders: BTreeMap<PeerId, Sender<BlocksGuard>>,
256 core_dispatcher: Arc<D>,
257 commit_vote_monitor: Arc<CommitVoteMonitor>,
258 dag_state: Arc<RwLock<DagState>>,
259 fetch_blocks_scheduler_task: JoinSet<()>,
260 fetch_own_last_block_task: JoinSet<()>,
261 network_client: Arc<SynchronizerClient<VC, OC>>,
262 block_verifier: Arc<V>,
263 transaction_vote_tracker: TransactionVoteTracker,
264 round_tracker: Arc<RwLock<RoundTracker>>,
265 inflight_blocks_map: Arc<InflightBlocksMap>,
266 commands_sender: Sender<Command>,
267 last_changed_commit_index: CommitIndex,
268 last_commit_change_time: Instant,
269 commit_sync_failover: bool,
271 peers_pool: Arc<PeersPool>,
272}
273
274impl<V, D, VC, OC> Synchronizer<V, D, VC, OC>
275where
276 V: BlockVerifier,
277 D: CoreThreadDispatcher,
278 VC: ValidatorNetworkClient,
279 OC: ObserverNetworkClient,
280{
281 pub(crate) fn start(
282 network_client: Arc<SynchronizerClient<VC, OC>>,
283 context: Arc<Context>,
284 core_dispatcher: Arc<D>,
285 commit_vote_monitor: Arc<CommitVoteMonitor>,
286 block_verifier: Arc<V>,
287 transaction_vote_tracker: TransactionVoteTracker,
288 round_tracker: Arc<RwLock<RoundTracker>>,
289 dag_state: Arc<RwLock<DagState>>,
290 peers_pool: Arc<PeersPool>,
291 sync_last_known_own_block: bool,
292 ) -> Arc<SynchronizerHandle> {
293 let (commands_sender, commands_receiver) =
294 channel("consensus_synchronizer_commands", 1_000);
295 let inflight_blocks_map = InflightBlocksMap::new();
296
297 let mut fetch_block_senders = BTreeMap::new();
299 let mut tasks = JoinSet::new();
300
301 let known_peers = peers_pool.get_known_peers();
304 for peer in known_peers {
305 let (sender, receiver) =
306 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
307 let fetch_blocks_from_peer_async = Self::fetch_blocks_from_peer(
308 peer.clone(),
309 network_client.clone(),
310 block_verifier.clone(),
311 transaction_vote_tracker.clone(),
312 commit_vote_monitor.clone(),
313 context.clone(),
314 core_dispatcher.clone(),
315 dag_state.clone(),
316 receiver,
317 commands_sender.clone(),
318 round_tracker.clone(),
319 peers_pool.clone(),
320 );
321 tasks.spawn(monitored_future!(fetch_blocks_from_peer_async));
322 fetch_block_senders.insert(peer, sender);
323 }
324
325 let commands_sender_clone = commands_sender.clone();
326
327 if sync_last_known_own_block {
328 commands_sender
329 .try_send(Command::FetchOwnLastBlock)
330 .expect("Failed to sync our last block");
331 }
332
333 tasks.spawn(monitored_future!(async move {
335 let mut s = Self {
336 context,
337 commands_receiver,
338 fetch_block_senders,
339 core_dispatcher,
340 commit_vote_monitor,
341 fetch_blocks_scheduler_task: JoinSet::new(),
342 fetch_own_last_block_task: JoinSet::new(),
343 network_client,
344 block_verifier,
345 transaction_vote_tracker,
346 inflight_blocks_map,
347 commands_sender: commands_sender_clone,
348 dag_state,
349 round_tracker,
350 last_changed_commit_index: 0,
351 last_commit_change_time: Instant::now(),
352 commit_sync_failover: false,
353 peers_pool,
354 };
355 s.run().await;
356 }));
357
358 Arc::new(SynchronizerHandle {
359 commands_sender,
360 tasks: tokio::sync::Mutex::new(tasks),
361 })
362 }
363
364 async fn run(&mut self) {
366 const PERIODIC_FETCH_INTERVAL: Duration = Duration::from_millis(200);
368 let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_INTERVAL);
369
370 tokio::pin!(scheduler_timeout);
371
372 loop {
373 tokio::select! {
374 Some(command) = self.commands_receiver.recv() => {
375 match command {
376 Command::FetchBlocks{ missing_block_refs, peer, result } => {
377 if !self.peers_pool.is_peer_known(&peer) {
379 result.send(Err(ConsensusError::PeerUnavailable(format!("{:?}", peer)))).ok();
380 continue;
381 }
382
383 let missing_block_refs = missing_block_refs
387 .into_iter()
388 .take(self.context.parameters.max_blocks_per_sync)
389 .collect();
390
391 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer.clone());
392 let Some(blocks_guard) = blocks_guard else {
393 result.send(Ok(())).ok();
394 continue;
395 };
396
397 let r = self
400 .fetch_block_senders
401 .get(&peer)
402 .ok_or(ConsensusError::PeerNotFound(format!("Peer {} not found in fetch_block_senders", peer)))
403 .and_then(|sender| {
404 sender
405 .try_send(blocks_guard)
406 .map_err(|err| {
407 match err {
408 TrySendError::Full(_) => {
409 let peer_name = peer.labelname(&self.context);
410 self.context
411 .metrics
412 .node_metrics
413 .synchronizer_skipped_fetch_requests
414 .with_label_values(&[peer_name])
415 .inc();
416 ConsensusError::SynchronizerSaturated(format!("{:?}", peer))
417 },
418 TrySendError::Closed(_) => ConsensusError::Shutdown
419 }
420 })
421 });
422
423 result.send(r).ok();
424 }
425 Command::FetchOwnLastBlock => {
426 if self.fetch_own_last_block_task.is_empty() {
427 self.start_fetch_own_last_block_task();
428 }
429 }
430 Command::KickOffScheduler => {
431 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
434 Instant::now()
435 } else {
436 Instant::now() + PERIODIC_FETCH_INTERVAL.checked_div(2).unwrap()
437 };
438
439 if timeout < scheduler_timeout.deadline() {
441 scheduler_timeout.as_mut().reset(timeout);
442 }
443 }
444 }
445 },
446 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
447 match result {
448 Ok(()) => {},
449 Err(e) => {
450 if e.is_cancelled() {
451 } else if e.is_panic() {
452 std::panic::resume_unwind(e.into_panic());
453 } else {
454 panic!("fetch our last block task failed: {e}");
455 }
456 },
457 };
458 },
459 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
460 match result {
461 Ok(()) => {},
462 Err(e) => {
463 if e.is_cancelled() {
464 } else if e.is_panic() {
465 std::panic::resume_unwind(e.into_panic());
466 } else {
467 panic!("fetch blocks scheduler task failed: {e}");
468 }
469 },
470 };
471 },
472 () = &mut scheduler_timeout => {
473 if self.fetch_blocks_scheduler_task.is_empty()
476 && let Err(err) = self.start_fetch_missing_blocks_task().await {
477 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
478 return;
479 };
480
481 scheduler_timeout
482 .as_mut()
483 .reset(Instant::now() + PERIODIC_FETCH_INTERVAL);
484 }
485 }
486 }
487 }
488
489 async fn fetch_blocks_from_peer(
490 peer: PeerId,
491 network_client: Arc<SynchronizerClient<VC, OC>>,
492 block_verifier: Arc<V>,
493 transaction_vote_tracker: TransactionVoteTracker,
494 commit_vote_monitor: Arc<CommitVoteMonitor>,
495 context: Arc<Context>,
496 core_dispatcher: Arc<D>,
497 dag_state: Arc<RwLock<DagState>>,
498 mut receiver: Receiver<BlocksGuard>,
499 commands_sender: Sender<Command>,
500 round_tracker: Arc<RwLock<RoundTracker>>,
501 _peers_pool: Arc<PeersPool>,
502 ) {
503 const MAX_RETRIES: u32 = 3;
504 let mut requests = FuturesUnordered::new();
505
506 loop {
507 tokio::select! {
508 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
509 let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
510
511 requests.push(Self::fetch_blocks_request(network_client.clone(), peer.clone(), blocks_guard, fetch_after_rounds, true, FETCH_REQUEST_TIMEOUT, 1))
512 },
513 Some((response, blocks_guard, retries, _peer, fetch_after_rounds)) = requests.next() => {
514 match response {
515 Ok(blocks) => {
516 if let Err(err) = Self::process_fetched_blocks(blocks,
517 peer.clone(),
518 blocks_guard,
519 core_dispatcher.clone(),
520 block_verifier.clone(),
521 transaction_vote_tracker.clone(),
522 commit_vote_monitor.clone(),
523 context.clone(),
524 commands_sender.clone(),
525 round_tracker.clone(),
526 "live"
527 ).await {
528 warn!("Error while processing fetched blocks from peer {}: {err}", peer.hostname(&context));
529 context.metrics.node_metrics.synchronizer_process_fetched_failures.with_label_values(&[peer.labelname(&context).as_str(), "live"]).inc();
530 }
531 },
532 Err(_) => {
533 context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer.labelname(&context).as_str(), "live"]).inc();
534 if retries <= MAX_RETRIES {
535 requests.push(Self::fetch_blocks_request(network_client.clone(), peer.clone(), blocks_guard, fetch_after_rounds, true, FETCH_REQUEST_TIMEOUT, retries))
536 } else {
537 warn!("Max retries {retries} reached while trying to fetch blocks from peer {}.", peer.hostname(&context));
538 drop(blocks_guard);
540 }
541 }
542 }
543 },
544 else => {
545 info!("Fetching blocks from peer {} task will now abort.", peer.hostname(&context));
546 break;
547 }
548 }
549 }
550 }
551
552 async fn process_fetched_blocks(
555 mut serialized_blocks: Vec<Bytes>,
556 peer: PeerId,
557 requested_blocks_guard: BlocksGuard,
558 core_dispatcher: Arc<D>,
559 block_verifier: Arc<V>,
560 transaction_vote_tracker: TransactionVoteTracker,
561 commit_vote_monitor: Arc<CommitVoteMonitor>,
562 context: Arc<Context>,
563 commands_sender: Sender<Command>,
564 round_tracker: Arc<RwLock<RoundTracker>>,
565 sync_method: &str,
566 ) -> ConsensusResult<()> {
567 if serialized_blocks.is_empty() {
568 return Ok(());
569 }
570
571 serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
573
574 let blocks = Handle::current()
576 .spawn_blocking({
577 let block_verifier = block_verifier.clone();
578 let context = context.clone();
579 let peer = peer.clone();
580 move || {
581 Self::verify_blocks(
582 serialized_blocks,
583 block_verifier,
584 transaction_vote_tracker,
585 &context,
586 peer,
587 )
588 }
589 })
590 .await
591 .expect("Spawn blocking should not fail")?;
592
593 for block in &blocks {
595 commit_vote_monitor.observe_block(block);
596 }
597
598 {
601 let mut tracker = round_tracker.write();
602 for block in &blocks {
603 tracker.update_from_verified_block(&ExtendedBlock {
604 block: block.clone(),
605 excluded_ancestors: vec![],
606 });
607 }
608 }
609
610 let metrics = &context.metrics.node_metrics;
611 metrics
612 .synchronizer_fetched_blocks_by_peer
613 .with_label_values(&[peer.labelname(&context).as_str(), sync_method])
614 .inc_by(blocks.len() as u64);
615 for block in &blocks {
616 let block_hostname = &context.committee.authority(block.author()).hostname;
617 metrics
618 .synchronizer_fetched_blocks_by_authority
619 .with_label_values(&[block_hostname.as_str(), sync_method])
620 .inc();
621 }
622
623 debug!(
624 "Synced {} missing blocks from peer {:?}: {}",
625 blocks.len(),
626 peer,
627 blocks.iter().map(|b| b.reference().to_string()).join(", "),
628 );
629
630 let missing_blocks = core_dispatcher
634 .add_blocks(blocks)
635 .await
636 .map_err(|_| ConsensusError::Shutdown)?;
637
638 drop(requested_blocks_guard);
640
641 if !missing_blocks.is_empty() {
643 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
645 {
646 warn!("Commands channel is full")
647 }
648 }
649
650 context
651 .metrics
652 .node_metrics
653 .missing_blocks_after_fetch_total
654 .inc_by(missing_blocks.len() as u64);
655
656 Ok(())
657 }
658
659 fn get_fetch_after_rounds(
660 context: &Arc<Context>,
661 dag_state: Arc<RwLock<DagState>>,
662 ) -> Vec<Round> {
663 let (blocks, gc_round) = {
664 let dag_state = dag_state.read();
665 (
666 dag_state.get_last_cached_block_per_authority(Round::MAX),
667 dag_state.gc_round(),
668 )
669 };
670 assert_eq!(blocks.len(), context.committee.size());
671
672 blocks
673 .into_iter()
674 .map(|(block, _)| block.round().max(gc_round))
675 .collect::<Vec<_>>()
676 }
677
678 fn verify_blocks(
679 serialized_blocks: Vec<Bytes>,
680 block_verifier: Arc<V>,
681 transaction_vote_tracker: TransactionVoteTracker,
682 context: &Context,
683 peer: PeerId,
684 ) -> ConsensusResult<Vec<VerifiedBlock>> {
685 let mut verified_blocks = Vec::new();
686 let mut voted_blocks = Vec::new();
687 for serialized_block in serialized_blocks {
688 let signed_block: SignedBlock =
689 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
690
691 let (verified_block, reject_txn_votes) = block_verifier
693 .verify_and_vote(signed_block, serialized_block)
694 .tap_err(|e| {
695 let peer_label = peer.labelname(context);
696 context
697 .metrics
698 .node_metrics
699 .invalid_blocks
700 .with_label_values(&[peer_label.as_str(), "synchronizer", e.clone().name()])
701 .inc();
702 info!("Invalid block received from {}: {}", peer, e);
703 })?;
704
705 let now = context.clock.timestamp_utc_ms();
707 let drift = verified_block.timestamp_ms().saturating_sub(now);
708 if drift > 0 {
709 let peer_hostname = &context
710 .committee
711 .authority(verified_block.author())
712 .hostname;
713 context
714 .metrics
715 .node_metrics
716 .block_timestamp_drift_ms
717 .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
718 .inc_by(drift);
719
720 trace!(
721 "Synced block {} timestamp {} is in the future (now={}).",
722 verified_block.reference(),
723 verified_block.timestamp_ms(),
724 now
725 );
726 }
727
728 verified_blocks.push(verified_block.clone());
729 voted_blocks.push((verified_block, reject_txn_votes));
730 }
731
732 if context.protocol_config.transaction_voting_enabled() {
733 transaction_vote_tracker.add_voted_blocks(voted_blocks);
734 }
735
736 Ok(verified_blocks)
737 }
738
739 async fn fetch_blocks_request(
740 network_client: Arc<SynchronizerClient<VC, OC>>,
741 peer: PeerId,
742 blocks_guard: BlocksGuard,
743 fetch_after_rounds: Vec<Round>,
744 fetch_missing_ancestors: bool,
745 request_timeout: Duration,
746 mut retries: u32,
747 ) -> (
748 ConsensusResult<Vec<Bytes>>,
749 BlocksGuard,
750 u32,
751 PeerId,
752 Vec<Round>,
753 ) {
754 let start = Instant::now();
755 let resp = timeout(
756 request_timeout,
757 network_client.fetch_blocks(
758 peer.clone(),
759 blocks_guard
760 .block_refs
761 .clone()
762 .into_iter()
763 .collect::<Vec<_>>(),
764 fetch_after_rounds.clone().into_iter().collect::<Vec<_>>(),
765 fetch_missing_ancestors,
766 request_timeout,
767 ),
768 )
769 .await;
770
771 fail_point_async!("consensus-delay");
772
773 let resp = match resp {
774 Ok(Err(err)) => {
775 sleep_until(start + request_timeout).await;
778 retries += 1;
779 Err(err)
780 } Err(err) => {
782 sleep_until(start + request_timeout).await;
784 retries += 1;
785 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
786 }
787 Ok(result) => result,
788 };
789 (resp, blocks_guard, retries, peer, fetch_after_rounds)
790 }
791
792 fn start_fetch_own_last_block_task(&mut self) {
793 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
794 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
795
796 let context = self.context.clone();
797 let dag_state = self.dag_state.clone();
798 let network_client = self.network_client.clone();
799 let block_verifier = self.block_verifier.clone();
800 let core_dispatcher = self.core_dispatcher.clone();
801
802 self.fetch_own_last_block_task
803 .spawn(monitored_future!(async move {
804 let _scope = monitored_scope("FetchOwnLastBlockTask");
805
806 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
807 let network_client_cloned = network_client.clone();
808 let own_index = context.own_index;
809 async move {
810 sleep(fetch_own_block_delay).await;
811 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
812 (r, authority_index)
813 }
814 };
815
816
817 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
818 let mut result = Vec::new();
819 for serialized_block in blocks {
820 let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
821 let (verified_block, _) = block_verifier.verify_and_vote(signed_block, serialized_block).tap_err(|err|{
822 let hostname = context.committee.authority(authority_index).hostname.clone();
823 context
824 .metrics
825 .node_metrics
826 .invalid_blocks
827 .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
828 .inc();
829 warn!("Invalid block received from {}: {}", authority_index, err);
830 })?;
831
832 if verified_block.author() != context.own_index {
833 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
834 }
835 result.push(verified_block);
836 }
837 Ok(result)
838 };
839
840 let mut highest_round;
842 let mut retries = 0;
843 let mut retry_delay_step = Duration::from_millis(500);
844 'main:loop {
845 if context.committee.size() == 1 {
846 highest_round = dag_state.read().get_last_proposed_block().expect("Last proposed block should be returned on validators").round();
847 info!("Only one node in the network, will not try fetching own last block from peers.");
848 break 'main;
849 }
850
851 let mut total_stake = 0;
852 highest_round = 0;
853
854 let mut results = FuturesUnordered::new();
856
857 for (authority_index, _authority) in context.committee.authorities() {
858 if authority_index != context.own_index {
859 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
860 }
861 }
862
863 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
865 tokio::pin!(timer);
866
867 'inner: loop {
868 tokio::select! {
869 result = results.next() => {
870 let Some((result, authority_index)) = result else {
871 break 'inner;
872 };
873 match result {
874 Ok(result) => {
875 match process_blocks(result, authority_index) {
876 Ok(blocks) => {
877 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
878 highest_round = highest_round.max(max_round);
879
880 total_stake += context.committee.stake(authority_index);
881 },
882 Err(err) => {
883 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
884 }
885 }
886 },
887 Err(err) => {
888 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
889 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
890 }
891 }
892 },
893 () = &mut timer => {
894 info!("Timeout while trying to sync our own last block from peers");
895 break 'inner;
896 }
897 }
898 }
899
900 if context.committee.reached_validity(total_stake) {
902 info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
903 break 'main;
904 }
905
906 retries += 1;
907 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
908 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
909
910 sleep(retry_delay_step).await;
911
912 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
913 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
914 }
915
916 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
918
919 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
920 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
921 }
922 }));
923 }
924
925 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
926 if self.context.committee.size() == 1 {
927 trace!(
928 "Only one node in the network, will not try fetching missing blocks from peers."
929 );
930 return Ok(());
931 }
932
933 if !self.should_run_periodic_sync() {
937 return Ok(());
938 }
939
940 let context = self.context.clone();
941 let network_client = self.network_client.clone();
942 let block_verifier = self.block_verifier.clone();
943 let transaction_vote_tracker = self.transaction_vote_tracker.clone();
944 let commit_vote_monitor = self.commit_vote_monitor.clone();
945 let core_dispatcher = self.core_dispatcher.clone();
946 let blocks_to_fetch = self.inflight_blocks_map.clone();
947 let commands_sender = self.commands_sender.clone();
948 let dag_state = self.dag_state.clone();
949 let round_tracker = self.round_tracker.clone();
950 let peers_pool = self.peers_pool.clone();
951
952 let mut missing_blocks = self
953 .core_dispatcher
954 .get_missing_blocks()
955 .await
956 .map_err(|_err| ConsensusError::Shutdown)?;
957 if self.commit_sync_failover {
958 let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
961 missing_blocks.retain(|block| block.round <= fetch_after_rounds[block.author.value()]);
962 } else if missing_blocks.is_empty() {
963 return Ok(());
964 }
965
966 self.fetch_blocks_scheduler_task
967 .spawn(monitored_future!(async move {
968 let _scope = monitored_scope("FetchMissingBlocksScheduler");
969 context
970 .metrics
971 .node_metrics
972 .fetch_blocks_scheduler_inflight
973 .inc();
974 let total_requested = missing_blocks.len();
975
976 let results = if missing_blocks.is_empty() {
977 let _scope = monitored_scope("BlockSync::Periodic::HighestAcceptedRounds");
978 Self::fetch_blocks_with_fetch_after_rounds(
980 context.clone(),
981 blocks_to_fetch.clone(),
982 network_client,
983 dag_state,
984 peers_pool,
985 )
986 .await
987 } else {
988 let _scope = monitored_scope("BlockSync::Periodic::MissingBlocks");
989 Self::fetch_blocks_from_peers(
991 context.clone(),
992 blocks_to_fetch.clone(),
993 network_client,
994 missing_blocks,
995 dag_state,
996 peers_pool,
997 )
998 .await
999 };
1000 context
1001 .metrics
1002 .node_metrics
1003 .fetch_blocks_scheduler_inflight
1004 .dec();
1005 if results.is_empty() {
1006 return;
1007 }
1008
1009 fail_point_async!("consensus-delay");
1010
1011 let mut total_fetched = 0;
1013 for (blocks_guard, fetched_blocks, peer) in results {
1014 total_fetched += fetched_blocks.len();
1015
1016 if let Err(err) = Self::process_fetched_blocks(
1017 fetched_blocks,
1018 peer.clone(),
1019 blocks_guard,
1020 core_dispatcher.clone(),
1021 block_verifier.clone(),
1022 transaction_vote_tracker.clone(),
1023 commit_vote_monitor.clone(),
1024 context.clone(),
1025 commands_sender.clone(),
1026 round_tracker.clone(),
1027 "periodic",
1028 )
1029 .await
1030 {
1031 warn!(
1032 "Error occurred while processing fetched blocks from peer {:?}: {err}",
1033 peer
1034 );
1035 let peer_name = peer.labelname(&context);
1036 context
1037 .metrics
1038 .node_metrics
1039 .synchronizer_process_fetched_failures
1040 .with_label_values(&[peer_name.as_str(), "periodic"])
1041 .inc();
1042 }
1043 }
1044
1045 debug!(
1046 "Total blocks requested to fetch: {}, total fetched: {}",
1047 total_requested, total_fetched
1048 );
1049 }));
1050
1051 Ok(())
1052 }
1053
1054 fn should_run_periodic_sync(&mut self) -> bool {
1055 let current_commit_index = self.dag_state.read().last_commit_index();
1056 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1057 let commit_threshold = current_commit_index
1058 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1059 let now = Instant::now();
1060 let metrics = &self.context.metrics.node_metrics;
1061
1062 if quorum_commit_index <= commit_threshold {
1064 metrics
1065 .synchronizer_periodic_sync_decision
1066 .with_label_values(&["true", "default"])
1067 .inc();
1068 self.last_changed_commit_index = current_commit_index;
1070 self.last_commit_change_time = now;
1071 self.commit_sync_failover = false;
1072 return true;
1074 }
1075 if self.commit_sync_failover {
1079 if current_commit_index
1080 < self.last_changed_commit_index + self.context.parameters.commit_sync_batch_size
1081 {
1082 metrics
1083 .synchronizer_periodic_sync_decision
1084 .with_label_values(&["true", "commit_catchup::run"])
1085 .inc();
1086 return true;
1090 } else {
1091 metrics
1092 .synchronizer_periodic_sync_decision
1093 .with_label_values(&["false", "commit_catchup::end"])
1094 .inc();
1095 self.last_changed_commit_index = current_commit_index;
1097 self.last_commit_change_time = now;
1098 self.commit_sync_failover = false;
1099 return false;
1101 }
1102 }
1103
1104 if current_commit_index == self.last_changed_commit_index {
1106 if now.duration_since(self.last_commit_change_time) >= COMMIT_PROGRESS_TIMEOUT {
1108 metrics
1109 .synchronizer_periodic_sync_decision
1110 .with_label_values(&["true", "commit_catchup::start"])
1111 .inc();
1112 self.commit_sync_failover = true;
1113 return true;
1115 }
1116 } else {
1117 self.last_changed_commit_index = current_commit_index;
1119 self.last_commit_change_time = now;
1120 }
1121
1122 metrics
1123 .synchronizer_periodic_sync_decision
1124 .with_label_values(&["false", "commit_lag"])
1125 .inc();
1126 false
1127 }
1128
1129 async fn fetch_blocks_with_fetch_after_rounds(
1132 context: Arc<Context>,
1133 inflight_blocks: Arc<InflightBlocksMap>,
1134 network_client: Arc<SynchronizerClient<VC, OC>>,
1135 dag_state: Arc<RwLock<DagState>>,
1136 peers_pool: Arc<PeersPool>,
1137 ) -> Vec<(BlocksGuard, Vec<Bytes>, PeerId)> {
1138 let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
1139
1140 let mut peers = peers_pool.get_known_peers();
1143
1144 assert!(!peers.is_empty(), "No known peers to fetch blocks from");
1147
1148 if cfg!(not(test)) {
1149 peers.shuffle(&mut ThreadRng::default());
1150 }
1151
1152 let peer = peers.first().unwrap().clone();
1153
1154 let response = timeout(
1155 FETCH_REQUEST_TIMEOUT,
1156 network_client.fetch_blocks(
1157 peer.clone(),
1158 vec![],
1159 fetch_after_rounds,
1160 false,
1161 FETCH_REQUEST_TIMEOUT,
1162 ),
1163 )
1164 .await;
1165
1166 let serialized_blocks = match response {
1167 Ok(Ok(blocks)) => blocks,
1168 Ok(Err(err)) => {
1169 debug!("Failed to fetch blocks with fetch_after_rounds from peer {peer}: {err}");
1170 return vec![];
1171 }
1172 Err(_) => {
1173 debug!("Timed out fetching blocks with fetch_after_rounds from peer {peer}");
1174 return vec![];
1175 }
1176 };
1177
1178 let blocks_guard = BlocksGuard {
1179 map: inflight_blocks,
1180 block_refs: BTreeSet::new(),
1181 peer: peer.clone(),
1182 };
1183
1184 vec![(blocks_guard, serialized_blocks, peer)]
1185 }
1186
1187 async fn fetch_blocks_from_peers(
1193 context: Arc<Context>,
1194 inflight_blocks: Arc<InflightBlocksMap>,
1195 network_client: Arc<SynchronizerClient<VC, OC>>,
1196 missing_blocks: BTreeSet<BlockRef>,
1197 dag_state: Arc<RwLock<DagState>>,
1198 peers_pool: Arc<PeersPool>,
1199 ) -> Vec<(BlocksGuard, Vec<Bytes>, PeerId)> {
1200 let missing_blocks = missing_blocks
1204 .into_iter()
1205 .take(2 * MAX_PERIODIC_SYNC_PEERS * context.parameters.max_blocks_per_fetch)
1206 .collect::<Vec<_>>();
1207
1208 let mut authorities = BTreeMap::<AuthorityIndex, Vec<BlockRef>>::new();
1210 for block_ref in &missing_blocks {
1211 authorities
1212 .entry(block_ref.author)
1213 .or_default()
1214 .push(*block_ref);
1215 }
1216
1217 let mut peers = peers_pool.get_known_peers();
1219
1220 assert!(!peers.is_empty(), "No known peers to fetch blocks from");
1225
1226 let num_authorities_per_peer = authorities
1227 .len()
1228 .div_ceil(peers.len().min(MAX_PERIODIC_SYNC_PEERS));
1229
1230 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1232 for (authority, blocks) in &authorities {
1233 missing_blocks_per_authority[*authority] += blocks.len();
1234 }
1235 for (missing, (_, authority)) in missing_blocks_per_authority
1236 .into_iter()
1237 .zip_debug_eq(context.committee.authorities())
1238 {
1239 context
1240 .metrics
1241 .node_metrics
1242 .synchronizer_missing_blocks_by_authority
1243 .with_label_values(&[&authority.hostname])
1244 .inc_by(missing as u64);
1245 context
1246 .metrics
1247 .node_metrics
1248 .synchronizer_current_missing_blocks_by_authority
1249 .with_label_values(&[&authority.hostname])
1250 .set(missing as i64);
1251 }
1252
1253 if cfg!(not(test)) {
1255 peers.shuffle(&mut ThreadRng::default());
1257 }
1258
1259 let mut peers = peers.into_iter();
1260 let mut request_futures = FuturesUnordered::new();
1261
1262 let mut authorities = authorities.into_values().collect::<Vec<_>>();
1264 if cfg!(not(test)) {
1265 authorities.shuffle(&mut ThreadRng::default());
1267 }
1268
1269 let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
1270
1271 for batch in authorities.chunks(num_authorities_per_peer) {
1273 let Some(peer) = peers.next() else {
1274 debug_fatal!("No more peers left to fetch blocks!");
1275 break;
1276 };
1277 let peer_name = peer.hostname(&context);
1278 let block_refs = batch
1281 .iter()
1282 .flatten()
1283 .cloned()
1284 .collect::<BTreeSet<_>>()
1285 .into_iter()
1286 .take(context.parameters.max_blocks_per_fetch)
1287 .collect::<BTreeSet<_>>();
1288
1289 if let Some(blocks_guard) =
1291 inflight_blocks.lock_blocks(block_refs.clone(), peer.clone())
1292 {
1293 info!(
1294 "Periodic sync of {} missing blocks from peer {} {:?}: {}",
1295 peer_name.as_str(),
1296 block_refs.len(),
1297 peer,
1298 block_refs
1299 .iter()
1300 .map(|b| b.to_string())
1301 .collect::<Vec<_>>()
1302 .join(", ")
1303 );
1304 request_futures.push(Self::fetch_blocks_request(
1305 network_client.clone(),
1306 peer,
1307 blocks_guard,
1308 fetch_after_rounds.clone(),
1309 false,
1310 FETCH_REQUEST_TIMEOUT,
1311 1,
1312 ));
1313 }
1314 }
1315
1316 let mut results = Vec::new();
1317 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1318
1319 tokio::pin!(fetcher_timeout);
1320
1321 loop {
1322 tokio::select! {
1323 Some((response, blocks_guard, _retries, peer, fetch_after_rounds)) = request_futures.next() => {
1324 match response {
1325 Ok(fetched_blocks) => {
1326 results.push((blocks_guard, fetched_blocks, peer));
1327
1328 if request_futures.is_empty() {
1330 break;
1331 }
1332 },
1333 Err(_) => {
1334 let peer_name = peer.labelname(&context);
1335 context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_name.as_str(), "periodic"]).inc();
1336 if let Some(next_peer) = peers.next() {
1338 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer.clone()) {
1340 info!(
1341 "Retrying syncing {} missing blocks from peer {:?}: {}",
1342 blocks_guard.block_refs.len(),
1343 next_peer,
1344 blocks_guard.block_refs
1345 .iter()
1346 .map(|b| b.to_string())
1347 .collect::<Vec<_>>()
1348 .join(", ")
1349 );
1350 request_futures.push(Self::fetch_blocks_request(
1351 network_client.clone(),
1352 next_peer,
1353 blocks_guard,
1354 fetch_after_rounds,
1355 false,
1356 FETCH_REQUEST_TIMEOUT,
1357 1,
1358 ));
1359 } else {
1360 debug!("Couldn't acquire locks to fetch blocks from peer {:?}.", next_peer)
1361 }
1362 } else {
1363 debug!("No more peers left to fetch blocks");
1364 }
1365 }
1366 }
1367 },
1368 _ = &mut fetcher_timeout => {
1369 debug!("Timed out while fetching missing blocks");
1370 break;
1371 }
1372 }
1373 }
1374
1375 results
1376 }
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381 use std::{
1382 collections::{BTreeMap, BTreeSet},
1383 sync::Arc,
1384 time::Duration,
1385 };
1386
1387 use async_trait::async_trait;
1388 use bytes::Bytes;
1389 use consensus_config::{AuthorityIndex, Parameters};
1390 use consensus_types::block::{BlockDigest, BlockRef, Round};
1391 use mysten_metrics::monitored_mpsc;
1392 use parking_lot::RwLock;
1393 use tokio::{sync::Mutex, time::sleep};
1394
1395 use crate::commit::{CommitVote, TrustedCommit};
1396 use crate::{
1397 CommitDigest, CommitIndex,
1398 block::{TestBlock, VerifiedBlock},
1399 block_verifier::NoopBlockVerifier,
1400 commit_vote_monitor::CommitVoteMonitor,
1401 context::Context,
1402 core_thread::CoreThreadDispatcher,
1403 dag_state::DagState,
1404 error::{ConsensusError, ConsensusResult},
1405 network::{
1406 BlockStream, ObserverNetworkClient, PeerId, SynchronizerClient, ValidatorNetworkClient,
1407 },
1408 storage::mem_store::MemStore,
1409 synchronizer::{
1410 COMMIT_PROGRESS_TIMEOUT, FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT,
1411 InflightBlocksMap, Synchronizer,
1412 },
1413 };
1414 use crate::{
1415 authority_service::COMMIT_LAG_MULTIPLIER, core_thread::MockCoreThreadDispatcher,
1416 peers_pool::PeersPool, round_tracker::RoundTracker,
1417 transaction_vote_tracker::TransactionVoteTracker,
1418 };
1419
1420 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1421 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1422 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1423 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1424
1425 #[derive(Default)]
1426 struct MockNetworkClient {
1427 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1428 fetch_latest_blocks_requests:
1429 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1430 }
1431
1432 impl MockNetworkClient {
1433 async fn stub_fetch_blocks(
1434 &self,
1435 blocks: Vec<VerifiedBlock>,
1436 peer: AuthorityIndex,
1437 latency: Option<Duration>,
1438 ) {
1439 let mut lock = self.fetch_blocks_requests.lock().await;
1440 let block_refs = blocks
1441 .iter()
1442 .map(|block| block.reference())
1443 .collect::<Vec<_>>();
1444 lock.insert((block_refs, peer), (blocks, latency));
1445 }
1446
1447 async fn stub_fetch_blocks_for_key(
1448 &self,
1449 key_refs: Vec<BlockRef>,
1450 response_blocks: Vec<VerifiedBlock>,
1451 peer: AuthorityIndex,
1452 latency: Option<Duration>,
1453 ) {
1454 let mut lock = self.fetch_blocks_requests.lock().await;
1455 lock.insert((key_refs, peer), (response_blocks, latency));
1456 }
1457
1458 async fn stub_fetch_latest_blocks(
1459 &self,
1460 blocks: Vec<VerifiedBlock>,
1461 peer: AuthorityIndex,
1462 authorities: Vec<AuthorityIndex>,
1463 latency: Option<Duration>,
1464 ) {
1465 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1466 lock.entry((peer, authorities))
1467 .or_default()
1468 .push((blocks, latency));
1469 }
1470
1471 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1472 let lock = self.fetch_latest_blocks_requests.lock().await;
1473 lock.len()
1474 }
1475 }
1476
1477 #[async_trait]
1478 impl ValidatorNetworkClient for MockNetworkClient {
1479 async fn subscribe_blocks(
1480 &self,
1481 _peer: AuthorityIndex,
1482 _last_received: Round,
1483 _timeout: Duration,
1484 ) -> ConsensusResult<BlockStream> {
1485 unimplemented!("subscribe_blocks not implemented in mock")
1486 }
1487
1488 async fn fetch_blocks(
1489 &self,
1490 peer: AuthorityIndex,
1491 block_refs: Vec<BlockRef>,
1492 _fetch_after_rounds: Vec<Round>,
1493 _fetch_missing_ancestors: bool,
1494 _timeout: Duration,
1495 ) -> ConsensusResult<Vec<Bytes>> {
1496 let mut lock = self.fetch_blocks_requests.lock().await;
1497 let response = lock.remove(&(block_refs.clone(), peer)).unwrap_or_else(|| {
1498 panic!(
1499 "Unexpected fetch blocks request made: {:?} {}. Current lock: {:?}",
1500 block_refs, peer, lock
1501 );
1502 });
1503
1504 let serialised = response
1505 .0
1506 .into_iter()
1507 .map(|block| block.serialized().clone())
1508 .collect::<Vec<_>>();
1509
1510 drop(lock);
1511
1512 if let Some(latency) = response.1 {
1513 sleep(latency).await;
1514 }
1515
1516 Ok(serialised)
1517 }
1518
1519 async fn fetch_commits(
1520 &self,
1521 _peer: AuthorityIndex,
1522 _commit_range: crate::commit::CommitRange,
1523 _timeout: Duration,
1524 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1525 unimplemented!("fetch_commits not implemented in mock")
1526 }
1527
1528 async fn fetch_latest_blocks(
1529 &self,
1530 peer: AuthorityIndex,
1531 authorities: Vec<AuthorityIndex>,
1532 _timeout: Duration,
1533 ) -> ConsensusResult<Vec<Bytes>> {
1534 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1535 let mut responses = lock
1536 .remove(&(peer, authorities.clone()))
1537 .expect("Unexpected fetch blocks request made");
1538
1539 let response = responses.remove(0);
1540 let serialised = response
1541 .0
1542 .into_iter()
1543 .map(|block| block.serialized().clone())
1544 .collect::<Vec<_>>();
1545
1546 if !responses.is_empty() {
1547 lock.insert((peer, authorities), responses);
1548 }
1549
1550 drop(lock);
1551
1552 if let Some(latency) = response.1 {
1553 sleep(latency).await;
1554 }
1555
1556 Ok(serialised)
1557 }
1558
1559 async fn get_latest_rounds(
1560 &self,
1561 _peer: AuthorityIndex,
1562 _timeout: Duration,
1563 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1564 unimplemented!("get_latest_rounds not implemented in mock")
1565 }
1566
1567 #[cfg(test)]
1568 async fn send_block(
1569 &self,
1570 _peer: AuthorityIndex,
1571 _block: &VerifiedBlock,
1572 _timeout: Duration,
1573 ) -> ConsensusResult<()> {
1574 unimplemented!("send_block not implemented in mock")
1575 }
1576 }
1577
1578 #[async_trait]
1579 impl ObserverNetworkClient for MockNetworkClient {
1580 async fn stream_blocks(
1581 &self,
1582 _peer: crate::network::PeerId,
1583 _highest_round_per_authority: Vec<u64>,
1584 _timeout: Duration,
1585 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1586 unimplemented!("stream_blocks not implemented in mock")
1587 }
1588
1589 async fn fetch_blocks(
1590 &self,
1591 _peer: crate::network::PeerId,
1592 _block_refs: Vec<BlockRef>,
1593 _fetch_after_rounds: Vec<Round>,
1594 _fetch_missing_ancestors: bool,
1595 _timeout: Duration,
1596 ) -> ConsensusResult<Vec<Bytes>> {
1597 unimplemented!("Observer fetch_blocks not implemented in mock")
1598 }
1599
1600 async fn fetch_commits(
1601 &self,
1602 _peer: crate::network::PeerId,
1603 _commit_range: crate::commit::CommitRange,
1604 _timeout: Duration,
1605 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1606 unimplemented!("Observer fetch_commits not implemented in mock")
1607 }
1608 }
1609
1610 #[test]
1611 fn test_inflight_blocks_map() {
1612 let map = InflightBlocksMap::new();
1614 let some_block_refs = [
1615 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1616 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1617 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1618 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1619 ];
1620 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1621
1622 {
1624 let mut all_guards = Vec::new();
1625
1626 for i in 1..=2 {
1628 let authority = AuthorityIndex::new_for_test(i);
1629 let peer = PeerId::Validator(authority);
1630
1631 let guard = map.lock_blocks(missing_block_refs.clone(), peer.clone());
1632 let guard = guard.expect("Guard should be created");
1633 assert_eq!(guard.block_refs.len(), 4);
1634
1635 all_guards.push(guard);
1636
1637 let guard = map.lock_blocks(missing_block_refs.clone(), peer);
1639 assert!(guard.is_none());
1640 }
1641
1642 let authority_3 = AuthorityIndex::new_for_test(3);
1644 let peer_3 = PeerId::Validator(authority_3);
1645
1646 let guard = map.lock_blocks(missing_block_refs.clone(), peer_3.clone());
1647 assert!(guard.is_none());
1648
1649 drop(all_guards.remove(0));
1651
1652 let guard = map.lock_blocks(missing_block_refs.clone(), peer_3);
1653 let guard = guard.expect("Guard should be successfully acquired");
1654
1655 assert_eq!(guard.block_refs, missing_block_refs);
1656
1657 drop(guard);
1659 drop(all_guards);
1660
1661 assert_eq!(map.num_of_locked_blocks(), 0);
1662 }
1663
1664 {
1666 let authority_1 = AuthorityIndex::new_for_test(1);
1668 let peer_1 = PeerId::Validator(authority_1);
1669 let guard = map.lock_blocks(missing_block_refs.clone(), peer_1).unwrap();
1670
1671 let authority_2 = AuthorityIndex::new_for_test(2);
1673 let peer_2 = PeerId::Validator(authority_2);
1674 let guard = map.swap_locks(guard, peer_2);
1675
1676 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1677 }
1678 }
1679
1680 #[tokio::test]
1681 async fn successful_fetch_blocks_from_peer() {
1682 let (context, _) = Context::new_for_test(4);
1684 let context = Arc::new(context);
1685 let block_verifier = Arc::new(NoopBlockVerifier {});
1686 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1687 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1688 let mock_client = Arc::new(MockNetworkClient::default());
1689 let store = Arc::new(MemStore::new());
1690 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1691 let transaction_vote_tracker =
1692 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1693 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1694
1695 let network_client = Arc::new(SynchronizerClient::new(
1696 context.clone(),
1697 Some(mock_client.clone()),
1698 Some(mock_client.clone()),
1699 ));
1700 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1701 let handle = Synchronizer::start(
1702 network_client,
1703 context.clone(),
1704 core_dispatcher.clone(),
1705 commit_vote_monitor,
1706 block_verifier,
1707 transaction_vote_tracker,
1708 round_tracker,
1709 dag_state,
1710 peers_pool.clone(),
1711 false,
1712 );
1713
1714 let expected_blocks = (0..10)
1716 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1717 .collect::<Vec<_>>();
1718 let missing_blocks = expected_blocks
1719 .iter()
1720 .map(|block| block.reference())
1721 .collect::<BTreeSet<_>>();
1722
1723 let peer = AuthorityIndex::new_for_test(1);
1725 mock_client
1726 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1727 .await;
1728
1729 assert!(
1731 handle
1732 .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1733 .await
1734 .is_ok()
1735 );
1736
1737 sleep(Duration::from_millis(1_000)).await;
1739
1740 let added_blocks = core_dispatcher.get_add_blocks().await;
1742 assert_eq!(added_blocks, expected_blocks);
1743 }
1744
1745 #[tokio::test]
1746 async fn saturate_fetch_blocks_from_peer() {
1747 let (context, _) = Context::new_for_test(4);
1749 let context = Arc::new(context);
1750 let block_verifier = Arc::new(NoopBlockVerifier {});
1751 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1752 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1753 let mock_client = Arc::new(MockNetworkClient::default());
1754 let store = Arc::new(MemStore::new());
1755 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1756 let transaction_vote_tracker =
1757 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1758 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1759
1760 let network_client = Arc::new(SynchronizerClient::new(
1761 context.clone(),
1762 Some(mock_client.clone()),
1763 Some(mock_client.clone()),
1764 ));
1765 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1766 let handle = Synchronizer::start(
1767 network_client,
1768 context.clone(),
1769 core_dispatcher.clone(),
1770 commit_vote_monitor,
1771 block_verifier,
1772 transaction_vote_tracker,
1773 round_tracker,
1774 dag_state,
1775 peers_pool.clone(),
1776 false,
1777 );
1778
1779 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1781 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1782 .collect::<Vec<_>>();
1783
1784 let peer = AuthorityIndex::new_for_test(1);
1786 let mut iter = expected_blocks.iter().peekable();
1787 while let Some(block) = iter.next() {
1788 mock_client
1791 .stub_fetch_blocks(
1792 vec![block.clone()],
1793 peer,
1794 Some(Duration::from_millis(5_000)),
1795 )
1796 .await;
1797
1798 let mut missing_blocks = BTreeSet::new();
1799 missing_blocks.insert(block.reference());
1800
1801 if iter.peek().is_none() {
1804 match handle
1805 .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1806 .await
1807 {
1808 Err(ConsensusError::SynchronizerSaturated(peer_str)) => {
1809 assert_eq!(peer_str, format!("{:?}", PeerId::Validator(peer)));
1810 }
1811 _ => panic!("A saturated synchronizer error was expected"),
1812 }
1813 } else {
1814 assert!(
1815 handle
1816 .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1817 .await
1818 .is_ok()
1819 );
1820 }
1821 }
1822 }
1823
1824 #[tokio::test(flavor = "current_thread", start_paused = true)]
1825 async fn synchronizer_periodic_task_fetch_blocks() {
1826 let (context, _) = Context::new_for_test(4);
1828 let context = Arc::new(context);
1829 let block_verifier = Arc::new(NoopBlockVerifier {});
1830 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1831 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1832 let mock_client = Arc::new(MockNetworkClient::default());
1833 let store = Arc::new(MemStore::new());
1834 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1835 let transaction_vote_tracker =
1836 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1837 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1838
1839 let expected_blocks = (0..10)
1841 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1842 .collect::<Vec<_>>();
1843 let missing_blocks = expected_blocks
1844 .iter()
1845 .map(|block| block.reference())
1846 .collect::<BTreeSet<_>>();
1847
1848 core_dispatcher
1850 .stub_missing_blocks(missing_blocks.clone())
1851 .await;
1852
1853 mock_client
1857 .stub_fetch_blocks(
1858 expected_blocks.clone(),
1859 AuthorityIndex::new_for_test(1),
1860 Some(FETCH_REQUEST_TIMEOUT),
1861 )
1862 .await;
1863 mock_client
1864 .stub_fetch_blocks(
1865 expected_blocks.clone(),
1866 AuthorityIndex::new_for_test(2),
1867 None,
1868 )
1869 .await;
1870
1871 let network_client = Arc::new(SynchronizerClient::new(
1873 context.clone(),
1874 Some(mock_client.clone()),
1875 Some(mock_client.clone()),
1876 ));
1877 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1878 let _handle = Synchronizer::start(
1879 network_client,
1880 context.clone(),
1881 core_dispatcher.clone(),
1882 commit_vote_monitor,
1883 block_verifier,
1884 transaction_vote_tracker,
1885 round_tracker,
1886 dag_state,
1887 peers_pool.clone(),
1888 false,
1889 );
1890
1891 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1892
1893 let added_blocks = core_dispatcher.get_add_blocks().await;
1895 assert_eq!(added_blocks, expected_blocks);
1896
1897 assert!(
1899 core_dispatcher
1900 .get_missing_blocks()
1901 .await
1902 .unwrap()
1903 .is_empty()
1904 );
1905 }
1906
1907 #[tokio::test(flavor = "current_thread", start_paused = true)]
1908 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1909 let (context, _) = Context::new_for_test(4);
1911 let context = Arc::new(context);
1912 let block_verifier = Arc::new(NoopBlockVerifier {});
1913 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1914 let mock_client = Arc::new(MockNetworkClient::default());
1915 let store = Arc::new(MemStore::new());
1916 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1917 let transaction_vote_tracker =
1918 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1919 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1920 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1921
1922 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1924 let stub_blocks = (sync_missing_block_round_threshold * 2
1925 ..sync_missing_block_round_threshold * 3)
1926 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1927 .collect::<Vec<_>>();
1928 let missing_blocks = stub_blocks
1929 .iter()
1930 .map(|block| block.reference())
1931 .collect::<BTreeSet<_>>();
1932 core_dispatcher
1933 .stub_missing_blocks(missing_blocks.clone())
1934 .await;
1935
1936 mock_client
1941 .stub_fetch_blocks(
1942 stub_blocks.clone(),
1943 AuthorityIndex::new_for_test(1),
1944 Some(FETCH_REQUEST_TIMEOUT),
1945 )
1946 .await;
1947 mock_client
1948 .stub_fetch_blocks(stub_blocks.clone(), AuthorityIndex::new_for_test(2), None)
1949 .await;
1950 let mut expected_blocks = stub_blocks
1951 .iter()
1952 .take(context.parameters.max_blocks_per_sync)
1953 .cloned()
1954 .collect::<Vec<_>>();
1955
1956 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1958 let commit_index: CommitIndex = round - 1;
1959 let blocks = (0..4)
1960 .map(|authority| {
1961 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1962 let block = TestBlock::new(round, authority)
1963 .set_commit_votes(commit_votes)
1964 .build();
1965
1966 VerifiedBlock::new_for_test(block)
1967 })
1968 .collect::<Vec<_>>();
1969
1970 for block in blocks {
1973 commit_vote_monitor.observe_block(&block);
1974 }
1975
1976 let network_client = Arc::new(SynchronizerClient::new(
1978 context.clone(),
1979 Some(mock_client.clone()),
1980 Some(mock_client.clone()),
1981 ));
1982 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1983 let _handle = Synchronizer::start(
1984 network_client,
1985 context.clone(),
1986 core_dispatcher.clone(),
1987 commit_vote_monitor.clone(),
1988 block_verifier,
1989 transaction_vote_tracker,
1990 round_tracker,
1991 dag_state.clone(),
1992 peers_pool.clone(),
1993 false,
1994 );
1995
1996 sleep(COMMIT_PROGRESS_TIMEOUT / 2).await;
1999
2000 let added_blocks = core_dispatcher.get_add_blocks().await;
2003 assert_eq!(added_blocks, vec![]);
2004
2005 {
2008 let mut d = dag_state.write();
2009 for index in 1..=commit_index {
2010 let commit =
2011 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2012
2013 d.add_commit(commit);
2014 }
2015
2016 assert_eq!(
2017 d.last_commit_index(),
2018 commit_vote_monitor.quorum_commit_index()
2019 );
2020 }
2021
2022 core_dispatcher
2024 .stub_missing_blocks(missing_blocks.clone())
2025 .await;
2026
2027 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2028
2029 let mut added_blocks = core_dispatcher.get_add_blocks().await;
2031
2032 added_blocks.sort_by_key(|block| block.reference());
2033 expected_blocks.sort_by_key(|block| block.reference());
2034
2035 assert_eq!(added_blocks, expected_blocks);
2036 }
2037
2038 #[tokio::test(flavor = "current_thread", start_paused = true)]
2039 async fn synchronizer_periodic_sync_resumes_when_commit_sync_stalled() {
2040 let (context, _) = Context::new_for_test(4);
2042 let context = Arc::new(context);
2043 let block_verifier = Arc::new(NoopBlockVerifier {});
2044 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2045 let mock_client = Arc::new(MockNetworkClient::default());
2046 let store = Arc::new(MemStore::new());
2047 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2048 let transaction_vote_tracker =
2049 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2050 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2051 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2052
2053 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2055 let stub_blocks = (sync_missing_block_round_threshold * 2
2056 ..sync_missing_block_round_threshold * 3)
2057 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2058 .collect::<Vec<_>>();
2059 let missing_blocks = stub_blocks
2060 .iter()
2061 .map(|block| block.reference())
2062 .collect::<BTreeSet<_>>();
2063 core_dispatcher
2064 .stub_missing_blocks(missing_blocks.clone())
2065 .await;
2066
2067 mock_client
2070 .stub_fetch_blocks(
2071 stub_blocks.clone(),
2072 AuthorityIndex::new_for_test(1),
2073 Some(FETCH_REQUEST_TIMEOUT),
2074 )
2075 .await;
2076 mock_client
2077 .stub_fetch_blocks(stub_blocks.clone(), AuthorityIndex::new_for_test(2), None)
2078 .await;
2079 let mut expected_blocks = stub_blocks
2080 .iter()
2081 .take(context.parameters.max_blocks_per_sync)
2082 .cloned()
2083 .collect::<Vec<_>>();
2084
2085 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2087 let commit_index: CommitIndex = round - 1;
2088 let blocks = (0..4)
2089 .map(|authority| {
2090 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2091 let block = TestBlock::new(round, authority)
2092 .set_commit_votes(commit_votes)
2093 .build();
2094 VerifiedBlock::new_for_test(block)
2095 })
2096 .collect::<Vec<_>>();
2097 for block in blocks {
2098 commit_vote_monitor.observe_block(&block);
2099 }
2100
2101 let network_client = Arc::new(SynchronizerClient::new(
2103 context.clone(),
2104 Some(mock_client.clone()),
2105 Some(mock_client.clone()),
2106 ));
2107 let peers_pool = Arc::new(PeersPool::new(context.clone()));
2108 let _handle = Synchronizer::start(
2109 network_client,
2110 context.clone(),
2111 core_dispatcher.clone(),
2112 commit_vote_monitor.clone(),
2113 block_verifier,
2114 transaction_vote_tracker,
2115 round_tracker,
2116 dag_state.clone(),
2117 peers_pool.clone(),
2118 false,
2119 );
2120
2121 sleep(COMMIT_PROGRESS_TIMEOUT - Duration::from_millis(100)).await;
2123 let added_blocks = core_dispatcher.get_add_blocks().await;
2124 assert_eq!(added_blocks, vec![]);
2125
2126 core_dispatcher
2128 .stub_missing_blocks(missing_blocks.clone())
2129 .await;
2130 mock_client
2133 .stub_fetch_blocks_for_key(
2134 vec![],
2135 expected_blocks.clone(),
2136 AuthorityIndex::new_for_test(1),
2137 Some(Duration::from_millis(500)),
2138 )
2139 .await;
2140
2141 sleep(Duration::from_millis(200) + FETCH_REQUEST_TIMEOUT).await;
2143
2144 let mut added_blocks = core_dispatcher.get_add_blocks().await;
2145 assert!(
2146 !added_blocks.is_empty(),
2147 "Expected periodic sync to resume after commit sync stall"
2148 );
2149 added_blocks.sort_by_key(|block| block.reference());
2150 expected_blocks.sort_by_key(|block| block.reference());
2151 assert_eq!(added_blocks, expected_blocks);
2152
2153 core_dispatcher.get_add_blocks().await;
2156 {
2157 let current = dag_state.read().last_commit_index();
2158 let mut d = dag_state.write();
2159 for index in (current + 1)..=(current + context.parameters.commit_sync_batch_size) {
2161 let commit =
2162 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2163 d.add_commit(commit);
2164 }
2165 }
2166
2167 core_dispatcher
2169 .stub_missing_blocks(missing_blocks.clone())
2170 .await;
2171 mock_client
2172 .stub_fetch_blocks(
2173 stub_blocks
2174 .iter()
2175 .take(context.parameters.max_blocks_per_sync)
2176 .cloned()
2177 .collect::<Vec<_>>(),
2178 AuthorityIndex::new_for_test(2),
2179 None,
2180 )
2181 .await;
2182
2183 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2186 let added_blocks = core_dispatcher.get_add_blocks().await;
2187 assert_eq!(
2188 added_blocks,
2189 vec![],
2190 "Expected periodic sync to be skipped after stall resolved"
2191 );
2192 }
2193
2194 #[tokio::test(flavor = "current_thread", start_paused = true)]
2195 async fn synchronizer_fetch_own_last_block() {
2196 let (context, _) = Context::new_for_test(4);
2198 let context = Arc::new(context.with_parameters(Parameters {
2199 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2200 ..Default::default()
2201 }));
2202 let block_verifier = Arc::new(NoopBlockVerifier {});
2203 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2204 let mock_client = Arc::new(MockNetworkClient::default());
2205 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2206 let store = Arc::new(MemStore::new());
2207 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2208 let transaction_vote_tracker =
2209 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2210 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2211 let our_index = AuthorityIndex::new_for_test(0);
2212
2213 let mut expected_blocks = (9..=10)
2215 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2216 .collect::<Vec<_>>();
2217
2218 let block_1 = expected_blocks.pop().unwrap();
2221 mock_client
2222 .stub_fetch_latest_blocks(
2223 vec![block_1.clone()],
2224 AuthorityIndex::new_for_test(1),
2225 vec![our_index],
2226 None,
2227 )
2228 .await;
2229 mock_client
2230 .stub_fetch_latest_blocks(
2231 vec![block_1],
2232 AuthorityIndex::new_for_test(1),
2233 vec![our_index],
2234 None,
2235 )
2236 .await;
2237
2238 let block_2 = expected_blocks.pop().unwrap();
2240 mock_client
2241 .stub_fetch_latest_blocks(
2242 vec![block_2.clone()],
2243 AuthorityIndex::new_for_test(2),
2244 vec![our_index],
2245 Some(Duration::from_secs(10)),
2246 )
2247 .await;
2248 mock_client
2249 .stub_fetch_latest_blocks(
2250 vec![block_2],
2251 AuthorityIndex::new_for_test(2),
2252 vec![our_index],
2253 None,
2254 )
2255 .await;
2256
2257 mock_client
2259 .stub_fetch_latest_blocks(
2260 vec![],
2261 AuthorityIndex::new_for_test(3),
2262 vec![our_index],
2263 Some(Duration::from_secs(10)),
2264 )
2265 .await;
2266 mock_client
2267 .stub_fetch_latest_blocks(
2268 vec![],
2269 AuthorityIndex::new_for_test(3),
2270 vec![our_index],
2271 None,
2272 )
2273 .await;
2274
2275 let network_client = Arc::new(SynchronizerClient::new(
2277 context.clone(),
2278 Some(mock_client.clone()),
2279 Some(mock_client.clone()),
2280 ));
2281 let peers_pool = Arc::new(PeersPool::new(context.clone()));
2282 let handle = Synchronizer::start(
2283 network_client,
2284 context.clone(),
2285 core_dispatcher.clone(),
2286 commit_vote_monitor,
2287 block_verifier,
2288 transaction_vote_tracker,
2289 round_tracker,
2290 dag_state,
2291 peers_pool.clone(),
2292 true,
2293 );
2294
2295 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2297
2298 assert_eq!(
2300 core_dispatcher.get_last_own_proposed_round().await,
2301 vec![10]
2302 );
2303
2304 assert_eq!(mock_client.fetch_latest_blocks_pending_calls().await, 0);
2306
2307 assert_eq!(
2309 context
2310 .metrics
2311 .node_metrics
2312 .sync_last_known_own_block_retries
2313 .get(),
2314 1
2315 );
2316
2317 if let Err(err) = handle.stop().await
2319 && err.is_panic()
2320 {
2321 std::panic::resume_unwind(err.into_panic());
2322 }
2323 }
2324
2325 #[tokio::test]
2326 async fn test_process_fetched_blocks() {
2327 let (context, _) = Context::new_for_test(4);
2329 let context = Arc::new(context);
2330 let block_verifier = Arc::new(NoopBlockVerifier {});
2331 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2332 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2333 let store = Arc::new(MemStore::new());
2334 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2335 let transaction_vote_tracker =
2336 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2337 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2338 let (commands_sender, _commands_receiver) =
2339 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2340
2341 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2345 expected_blocks.extend(
2346 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2347 );
2348 assert_eq!(
2349 expected_blocks.len(),
2350 context.parameters.max_blocks_per_sync
2351 );
2352
2353 let expected_serialized_blocks = expected_blocks
2354 .iter()
2355 .map(|b| b.serialized().clone())
2356 .collect::<Vec<_>>();
2357
2358 let expected_block_refs = expected_blocks
2359 .iter()
2360 .map(|b| b.reference())
2361 .collect::<BTreeSet<_>>();
2362
2363 let peer_index = AuthorityIndex::new_for_test(2);
2365 let peer = PeerId::Validator(peer_index);
2366
2367 let inflight_blocks_map = InflightBlocksMap::new();
2369 let blocks_guard = inflight_blocks_map
2370 .lock_blocks(expected_block_refs.clone(), peer.clone())
2371 .expect("Failed to lock blocks");
2372
2373 assert_eq!(
2374 inflight_blocks_map.num_of_locked_blocks(),
2375 expected_block_refs.len()
2376 );
2377
2378 let result = Synchronizer::<
2380 NoopBlockVerifier,
2381 MockCoreThreadDispatcher,
2382 MockNetworkClient,
2383 MockNetworkClient,
2384 >::process_fetched_blocks(
2385 expected_serialized_blocks,
2386 peer,
2387 blocks_guard, core_dispatcher.clone(),
2389 block_verifier,
2390 transaction_vote_tracker,
2391 commit_vote_monitor,
2392 context.clone(),
2393 commands_sender,
2394 round_tracker,
2395 "test",
2396 )
2397 .await;
2398
2399 assert!(result.is_ok());
2401
2402 let added_blocks = core_dispatcher.get_add_blocks().await;
2404 assert_eq!(
2405 added_blocks
2406 .iter()
2407 .map(|b| b.reference())
2408 .collect::<BTreeSet<_>>(),
2409 expected_block_refs,
2410 );
2411
2412 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2414 }
2415}