1use std::{
4 collections::{BTreeMap, BTreeSet, HashMap},
5 sync::Arc,
6 time::Duration,
7};
8
9use bytes::Bytes;
10use consensus_config::AuthorityIndex;
11use consensus_types::block::{BlockRef, Round};
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use itertools::Itertools as _;
14use mysten_common::debug_fatal;
15use mysten_metrics::{
16 monitored_future,
17 monitored_mpsc::{Receiver, Sender, channel},
18 monitored_scope,
19};
20use parking_lot::{Mutex, RwLock};
21use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
22use sui_macros::fail_point_async;
23use tap::TapFallible;
24use tokio::{
25 runtime::Handle,
26 sync::{mpsc::error::TrySendError, oneshot},
27 task::{JoinError, JoinSet},
28 time::{Instant, sleep, sleep_until, timeout},
29};
30use tracing::{debug, error, info, trace, warn};
31
32use crate::{
33 BlockAPI,
34 block::{ExtendedBlock, SignedBlock, VerifiedBlock},
35 block_verifier::BlockVerifier,
36 commit_vote_monitor::CommitVoteMonitor,
37 context::Context,
38 dag_state::DagState,
39 error::{ConsensusError, ConsensusResult},
40 network::{ObserverNetworkClient, SynchronizerClient, ValidatorNetworkClient},
41 round_tracker::RoundTracker,
42};
43use crate::{
44 authority_service::COMMIT_LAG_MULTIPLIER, core_thread::CoreThreadDispatcher,
45 transaction_certifier::TransactionCertifier,
46};
47
48const FETCH_BLOCKS_CONCURRENCY: usize = 5;
50
51const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
53const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
54
55const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
56
57const MAX_PERIODIC_SYNC_PEERS: usize = 3;
59
60struct BlocksGuard {
61 map: Arc<InflightBlocksMap>,
62 block_refs: BTreeSet<BlockRef>,
63 peer: AuthorityIndex,
64}
65
66impl Drop for BlocksGuard {
67 fn drop(&mut self) {
68 self.map.unlock_blocks(&self.block_refs, self.peer);
69 }
70}
71
72struct InflightBlocksMap {
77 inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
78}
79
80impl InflightBlocksMap {
81 fn new() -> Arc<Self> {
82 Arc::new(Self {
83 inner: Mutex::new(HashMap::new()),
84 })
85 }
86
87 fn lock_blocks(
93 self: &Arc<Self>,
94 missing_block_refs: BTreeSet<BlockRef>,
95 peer: AuthorityIndex,
96 ) -> Option<BlocksGuard> {
97 let mut blocks = BTreeSet::new();
98 let mut inner = self.inner.lock();
99
100 for block_ref in missing_block_refs {
101 let authorities = inner.entry(block_ref).or_default();
104 if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
105 && authorities.get(&peer).is_none()
106 {
107 assert!(authorities.insert(peer));
108 blocks.insert(block_ref);
109 }
110 }
111
112 if blocks.is_empty() {
113 None
114 } else {
115 Some(BlocksGuard {
116 map: self.clone(),
117 block_refs: blocks,
118 peer,
119 })
120 }
121 }
122
123 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
127 let mut blocks_to_fetch = self.inner.lock();
129 for block_ref in block_refs {
130 let authorities = blocks_to_fetch
131 .get_mut(block_ref)
132 .expect("Should have found a non empty map");
133
134 assert!(authorities.remove(&peer), "Peer index should be present!");
135
136 if authorities.is_empty() {
138 blocks_to_fetch.remove(block_ref);
139 }
140 }
141 }
142
143 fn swap_locks(
147 self: &Arc<Self>,
148 blocks_guard: BlocksGuard,
149 peer: AuthorityIndex,
150 ) -> Option<BlocksGuard> {
151 let block_refs = blocks_guard.block_refs.clone();
152
153 drop(blocks_guard);
155
156 self.lock_blocks(block_refs, peer)
158 }
159
160 #[cfg(test)]
161 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
162 let inner = self.inner.lock();
163 inner.len()
164 }
165}
166
167enum Command {
168 FetchBlocks {
169 missing_block_refs: BTreeSet<BlockRef>,
170 peer_index: AuthorityIndex,
171 result: oneshot::Sender<Result<(), ConsensusError>>,
172 },
173 FetchOwnLastBlock,
174 KickOffScheduler,
175}
176
177pub(crate) struct SynchronizerHandle {
178 commands_sender: Sender<Command>,
179 tasks: tokio::sync::Mutex<JoinSet<()>>,
180}
181
182impl SynchronizerHandle {
183 pub(crate) async fn fetch_blocks(
186 &self,
187 missing_block_refs: BTreeSet<BlockRef>,
188 peer_index: AuthorityIndex,
189 ) -> ConsensusResult<()> {
190 let (sender, receiver) = oneshot::channel();
191 self.commands_sender
192 .send(Command::FetchBlocks {
193 missing_block_refs,
194 peer_index,
195 result: sender,
196 })
197 .await
198 .map_err(|_err| ConsensusError::Shutdown)?;
199 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
200 }
201
202 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
203 let mut tasks = self.tasks.lock().await;
204 tasks.abort_all();
205 while let Some(result) = tasks.join_next().await {
206 result?
207 }
208 Ok(())
209 }
210}
211
212pub(crate) struct Synchronizer<
234 V: BlockVerifier,
235 D: CoreThreadDispatcher,
236 VC: ValidatorNetworkClient,
237 OC: ObserverNetworkClient,
238> {
239 context: Arc<Context>,
240 commands_receiver: Receiver<Command>,
241 fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
242 core_dispatcher: Arc<D>,
243 commit_vote_monitor: Arc<CommitVoteMonitor>,
244 dag_state: Arc<RwLock<DagState>>,
245 fetch_blocks_scheduler_task: JoinSet<()>,
246 fetch_own_last_block_task: JoinSet<()>,
247 network_client: Arc<SynchronizerClient<VC, OC>>,
248 block_verifier: Arc<V>,
249 transaction_certifier: TransactionCertifier,
250 round_tracker: Arc<RwLock<RoundTracker>>,
251 inflight_blocks_map: Arc<InflightBlocksMap>,
252 commands_sender: Sender<Command>,
253}
254
255impl<V, D, VC, OC> Synchronizer<V, D, VC, OC>
256where
257 V: BlockVerifier,
258 D: CoreThreadDispatcher,
259 VC: ValidatorNetworkClient,
260 OC: ObserverNetworkClient,
261{
262 pub(crate) fn start(
263 network_client: Arc<SynchronizerClient<VC, OC>>,
264 context: Arc<Context>,
265 core_dispatcher: Arc<D>,
266 commit_vote_monitor: Arc<CommitVoteMonitor>,
267 block_verifier: Arc<V>,
268 transaction_certifier: TransactionCertifier,
269 round_tracker: Arc<RwLock<RoundTracker>>,
270 dag_state: Arc<RwLock<DagState>>,
271 sync_last_known_own_block: bool,
272 ) -> Arc<SynchronizerHandle> {
273 let (commands_sender, commands_receiver) =
274 channel("consensus_synchronizer_commands", 1_000);
275 let inflight_blocks_map = InflightBlocksMap::new();
276
277 let mut fetch_block_senders = BTreeMap::new();
279 let mut tasks = JoinSet::new();
280 for (index, _) in context.committee.authorities() {
281 if index == context.own_index {
282 continue;
283 }
284 let (sender, receiver) =
285 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
286 let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
287 index,
288 network_client.clone(),
289 block_verifier.clone(),
290 transaction_certifier.clone(),
291 commit_vote_monitor.clone(),
292 context.clone(),
293 core_dispatcher.clone(),
294 dag_state.clone(),
295 receiver,
296 commands_sender.clone(),
297 round_tracker.clone(),
298 );
299 tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
300 fetch_block_senders.insert(index, sender);
301 }
302
303 let commands_sender_clone = commands_sender.clone();
304
305 if sync_last_known_own_block {
306 commands_sender
307 .try_send(Command::FetchOwnLastBlock)
308 .expect("Failed to sync our last block");
309 }
310
311 tasks.spawn(monitored_future!(async move {
313 let mut s = Self {
314 context,
315 commands_receiver,
316 fetch_block_senders,
317 core_dispatcher,
318 commit_vote_monitor,
319 fetch_blocks_scheduler_task: JoinSet::new(),
320 fetch_own_last_block_task: JoinSet::new(),
321 network_client,
322 block_verifier,
323 transaction_certifier,
324 inflight_blocks_map,
325 commands_sender: commands_sender_clone,
326 dag_state,
327 round_tracker,
328 };
329 s.run().await;
330 }));
331
332 Arc::new(SynchronizerHandle {
333 commands_sender,
334 tasks: tokio::sync::Mutex::new(tasks),
335 })
336 }
337
338 async fn run(&mut self) {
340 const PERIODIC_FETCH_INTERVAL: Duration = Duration::from_millis(200);
342 let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_INTERVAL);
343
344 tokio::pin!(scheduler_timeout);
345
346 loop {
347 tokio::select! {
348 Some(command) = self.commands_receiver.recv() => {
349 match command {
350 Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
351 if peer_index == self.context.own_index {
352 error!("We should never attempt to fetch blocks from our own node");
353 continue;
354 }
355
356 let missing_block_refs = missing_block_refs
360 .into_iter()
361 .take(self.context.parameters.max_blocks_per_sync)
362 .collect();
363
364 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
365 let Some(blocks_guard) = blocks_guard else {
366 result.send(Ok(())).ok();
367 continue;
368 };
369
370 let r = self
373 .fetch_block_senders
374 .get(&peer_index)
375 .expect("Fatal error, sender should be present")
376 .try_send(blocks_guard)
377 .map_err(|err| {
378 match err {
379 TrySendError::Full(_) => {
380 let peer_hostname = &self.context.committee.authority(peer_index).hostname;
381 self.context
382 .metrics
383 .node_metrics
384 .synchronizer_skipped_fetch_requests
385 .with_label_values(&[peer_hostname])
386 .inc();
387 ConsensusError::SynchronizerSaturated(peer_index)
388 },
389 TrySendError::Closed(_) => ConsensusError::Shutdown
390 }
391 });
392
393 result.send(r).ok();
394 }
395 Command::FetchOwnLastBlock => {
396 if self.fetch_own_last_block_task.is_empty() {
397 self.start_fetch_own_last_block_task();
398 }
399 }
400 Command::KickOffScheduler => {
401 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
404 Instant::now()
405 } else {
406 Instant::now() + PERIODIC_FETCH_INTERVAL.checked_div(2).unwrap()
407 };
408
409 if timeout < scheduler_timeout.deadline() {
411 scheduler_timeout.as_mut().reset(timeout);
412 }
413 }
414 }
415 },
416 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
417 match result {
418 Ok(()) => {},
419 Err(e) => {
420 if e.is_cancelled() {
421 } else if e.is_panic() {
422 std::panic::resume_unwind(e.into_panic());
423 } else {
424 panic!("fetch our last block task failed: {e}");
425 }
426 },
427 };
428 },
429 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
430 match result {
431 Ok(()) => {},
432 Err(e) => {
433 if e.is_cancelled() {
434 } else if e.is_panic() {
435 std::panic::resume_unwind(e.into_panic());
436 } else {
437 panic!("fetch blocks scheduler task failed: {e}");
438 }
439 },
440 };
441 },
442 () = &mut scheduler_timeout => {
443 if self.fetch_blocks_scheduler_task.is_empty()
446 && let Err(err) = self.start_fetch_missing_blocks_task().await {
447 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
448 return;
449 };
450
451 scheduler_timeout
452 .as_mut()
453 .reset(Instant::now() + PERIODIC_FETCH_INTERVAL);
454 }
455 }
456 }
457 }
458
459 async fn fetch_blocks_from_authority(
460 peer_index: AuthorityIndex,
461 network_client: Arc<SynchronizerClient<VC, OC>>,
462 block_verifier: Arc<V>,
463 transaction_certifier: TransactionCertifier,
464 commit_vote_monitor: Arc<CommitVoteMonitor>,
465 context: Arc<Context>,
466 core_dispatcher: Arc<D>,
467 dag_state: Arc<RwLock<DagState>>,
468 mut receiver: Receiver<BlocksGuard>,
469 commands_sender: Sender<Command>,
470 round_tracker: Arc<RwLock<RoundTracker>>,
471 ) {
472 const MAX_RETRIES: u32 = 3;
473 let peer_hostname = &context.committee.authority(peer_index).hostname;
474 let mut requests = FuturesUnordered::new();
475
476 loop {
477 tokio::select! {
478 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
479 let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
481
482 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, true, FETCH_REQUEST_TIMEOUT, 1))
483 },
484 Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
485 match response {
486 Ok(blocks) => {
487 if let Err(err) = Self::process_fetched_blocks(blocks,
488 peer_index,
489 blocks_guard,
490 core_dispatcher.clone(),
491 block_verifier.clone(),
492 transaction_certifier.clone(),
493 commit_vote_monitor.clone(),
494 context.clone(),
495 commands_sender.clone(),
496 round_tracker.clone(),
497 "live"
498 ).await {
499 warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
500 context.metrics.node_metrics.synchronizer_process_fetched_failures.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
501 }
502 },
503 Err(_) => {
504 context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
505 if retries <= MAX_RETRIES {
506 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, true, FETCH_REQUEST_TIMEOUT, retries))
507 } else {
508 warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
509 drop(blocks_guard);
511 }
512 }
513 }
514 },
515 else => {
516 info!("Fetching blocks from authority {peer_index} task will now abort.");
517 break;
518 }
519 }
520 }
521 }
522
523 async fn process_fetched_blocks(
526 mut serialized_blocks: Vec<Bytes>,
527 peer_index: AuthorityIndex,
528 requested_blocks_guard: BlocksGuard,
529 core_dispatcher: Arc<D>,
530 block_verifier: Arc<V>,
531 transaction_certifier: TransactionCertifier,
532 commit_vote_monitor: Arc<CommitVoteMonitor>,
533 context: Arc<Context>,
534 commands_sender: Sender<Command>,
535 round_tracker: Arc<RwLock<RoundTracker>>,
536 sync_method: &str,
537 ) -> ConsensusResult<()> {
538 if serialized_blocks.is_empty() {
539 return Ok(());
540 }
541
542 serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
544
545 let blocks = Handle::current()
547 .spawn_blocking({
548 let block_verifier = block_verifier.clone();
549 let context = context.clone();
550 move || {
551 Self::verify_blocks(
552 serialized_blocks,
553 block_verifier,
554 transaction_certifier,
555 &context,
556 peer_index,
557 )
558 }
559 })
560 .await
561 .expect("Spawn blocking should not fail")?;
562
563 for block in &blocks {
565 commit_vote_monitor.observe_block(block);
566 }
567
568 {
571 let mut tracker = round_tracker.write();
572 for block in &blocks {
573 tracker.update_from_verified_block(&ExtendedBlock {
574 block: block.clone(),
575 excluded_ancestors: vec![],
576 });
577 }
578 }
579
580 let metrics = &context.metrics.node_metrics;
581 let peer_hostname = &context.committee.authority(peer_index).hostname;
582 metrics
583 .synchronizer_fetched_blocks_by_peer
584 .with_label_values(&[peer_hostname.as_str(), sync_method])
585 .inc_by(blocks.len() as u64);
586 for block in &blocks {
587 let block_hostname = &context.committee.authority(block.author()).hostname;
588 metrics
589 .synchronizer_fetched_blocks_by_authority
590 .with_label_values(&[block_hostname.as_str(), sync_method])
591 .inc();
592 }
593
594 debug!(
595 "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
596 blocks.len(),
597 blocks.iter().map(|b| b.reference().to_string()).join(", "),
598 );
599
600 let missing_blocks = core_dispatcher
604 .add_blocks(blocks)
605 .await
606 .map_err(|_| ConsensusError::Shutdown)?;
607
608 drop(requested_blocks_guard);
610
611 if !missing_blocks.is_empty() {
613 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
615 {
616 warn!("Commands channel is full")
617 }
618 }
619
620 context
621 .metrics
622 .node_metrics
623 .missing_blocks_after_fetch_total
624 .inc_by(missing_blocks.len() as u64);
625
626 Ok(())
627 }
628
629 fn get_highest_accepted_rounds(
630 dag_state: Arc<RwLock<DagState>>,
631 context: &Arc<Context>,
632 ) -> Vec<Round> {
633 let blocks = dag_state
634 .read()
635 .get_last_cached_block_per_authority(Round::MAX);
636 assert_eq!(blocks.len(), context.committee.size());
637
638 blocks
639 .into_iter()
640 .map(|(block, _)| block.round())
641 .collect::<Vec<_>>()
642 }
643
644 fn verify_blocks(
645 serialized_blocks: Vec<Bytes>,
646 block_verifier: Arc<V>,
647 transaction_certifier: TransactionCertifier,
648 context: &Context,
649 peer_index: AuthorityIndex,
650 ) -> ConsensusResult<Vec<VerifiedBlock>> {
651 let mut verified_blocks = Vec::new();
652 let mut voted_blocks = Vec::new();
653 for serialized_block in serialized_blocks {
654 let signed_block: SignedBlock =
655 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
656
657 let (verified_block, reject_txn_votes) = block_verifier
659 .verify_and_vote(signed_block, serialized_block)
660 .tap_err(|e| {
661 let hostname = context.committee.authority(peer_index).hostname.clone();
662 context
663 .metrics
664 .node_metrics
665 .invalid_blocks
666 .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
667 .inc();
668 info!("Invalid block received from {}: {}", peer_index, e);
669 })?;
670
671 let now = context.clock.timestamp_utc_ms();
673 let drift = verified_block.timestamp_ms().saturating_sub(now);
674 if drift > 0 {
675 let peer_hostname = &context
676 .committee
677 .authority(verified_block.author())
678 .hostname;
679 context
680 .metrics
681 .node_metrics
682 .block_timestamp_drift_ms
683 .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
684 .inc_by(drift);
685
686 trace!(
687 "Synced block {} timestamp {} is in the future (now={}).",
688 verified_block.reference(),
689 verified_block.timestamp_ms(),
690 now
691 );
692 }
693
694 verified_blocks.push(verified_block.clone());
695 voted_blocks.push((verified_block, reject_txn_votes));
696 }
697
698 if context.protocol_config.mysticeti_fastpath() {
699 transaction_certifier.add_voted_blocks(voted_blocks);
700 }
701
702 Ok(verified_blocks)
703 }
704
705 async fn fetch_blocks_request(
706 network_client: Arc<SynchronizerClient<VC, OC>>,
707 peer: AuthorityIndex,
708 blocks_guard: BlocksGuard,
709 highest_rounds: Vec<Round>,
710 breadth_first: bool,
711 request_timeout: Duration,
712 mut retries: u32,
713 ) -> (
714 ConsensusResult<Vec<Bytes>>,
715 BlocksGuard,
716 u32,
717 AuthorityIndex,
718 Vec<Round>,
719 ) {
720 use crate::network::PeerId;
721 let start = Instant::now();
722 let resp = timeout(
723 request_timeout,
724 network_client.fetch_blocks(
725 PeerId::Validator(peer),
726 blocks_guard
727 .block_refs
728 .clone()
729 .into_iter()
730 .collect::<Vec<_>>(),
731 highest_rounds.clone().into_iter().collect::<Vec<_>>(),
732 breadth_first,
733 request_timeout,
734 ),
735 )
736 .await;
737
738 fail_point_async!("consensus-delay");
739
740 let resp = match resp {
741 Ok(Err(err)) => {
742 sleep_until(start + request_timeout).await;
745 retries += 1;
746 Err(err)
747 } Err(err) => {
749 sleep_until(start + request_timeout).await;
751 retries += 1;
752 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
753 }
754 Ok(result) => result,
755 };
756 (resp, blocks_guard, retries, peer, highest_rounds)
757 }
758
759 fn start_fetch_own_last_block_task(&mut self) {
760 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
761 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
762
763 let context = self.context.clone();
764 let dag_state = self.dag_state.clone();
765 let network_client = self.network_client.clone();
766 let block_verifier = self.block_verifier.clone();
767 let core_dispatcher = self.core_dispatcher.clone();
768
769 self.fetch_own_last_block_task
770 .spawn(monitored_future!(async move {
771 let _scope = monitored_scope("FetchOwnLastBlockTask");
772
773 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
774 let network_client_cloned = network_client.clone();
775 let own_index = context.own_index;
776 async move {
777 sleep(fetch_own_block_delay).await;
778 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
779 (r, authority_index)
780 }
781 };
782
783
784 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
785 let mut result = Vec::new();
786 for serialized_block in blocks {
787 let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
788 let (verified_block, _) = block_verifier.verify_and_vote(signed_block, serialized_block).tap_err(|err|{
789 let hostname = context.committee.authority(authority_index).hostname.clone();
790 context
791 .metrics
792 .node_metrics
793 .invalid_blocks
794 .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
795 .inc();
796 warn!("Invalid block received from {}: {}", authority_index, err);
797 })?;
798
799 if verified_block.author() != context.own_index {
800 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
801 }
802 result.push(verified_block);
803 }
804 Ok(result)
805 };
806
807 let mut highest_round;
809 let mut retries = 0;
810 let mut retry_delay_step = Duration::from_millis(500);
811 'main:loop {
812 if context.committee.size() == 1 {
813 highest_round = dag_state.read().get_last_proposed_block().round();
814 info!("Only one node in the network, will not try fetching own last block from peers.");
815 break 'main;
816 }
817
818 let mut total_stake = 0;
819 highest_round = 0;
820
821 let mut results = FuturesUnordered::new();
823
824 for (authority_index, _authority) in context.committee.authorities() {
825 if authority_index != context.own_index {
826 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
827 }
828 }
829
830 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
832 tokio::pin!(timer);
833
834 'inner: loop {
835 tokio::select! {
836 result = results.next() => {
837 let Some((result, authority_index)) = result else {
838 break 'inner;
839 };
840 match result {
841 Ok(result) => {
842 match process_blocks(result, authority_index) {
843 Ok(blocks) => {
844 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
845 highest_round = highest_round.max(max_round);
846
847 total_stake += context.committee.stake(authority_index);
848 },
849 Err(err) => {
850 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
851 }
852 }
853 },
854 Err(err) => {
855 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
856 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
857 }
858 }
859 },
860 () = &mut timer => {
861 info!("Timeout while trying to sync our own last block from peers");
862 break 'inner;
863 }
864 }
865 }
866
867 if context.committee.reached_validity(total_stake) {
869 info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
870 break 'main;
871 }
872
873 retries += 1;
874 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
875 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
876
877 sleep(retry_delay_step).await;
878
879 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
880 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
881 }
882
883 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
885
886 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
887 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
888 }
889 }));
890 }
891
892 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
893 if self.context.committee.size() == 1 {
894 trace!(
895 "Only one node in the network, will not try fetching missing blocks from peers."
896 );
897 return Ok(());
898 }
899
900 let missing_blocks = self
901 .core_dispatcher
902 .get_missing_blocks()
903 .await
904 .map_err(|_err| ConsensusError::Shutdown)?;
905
906 if missing_blocks.is_empty() {
908 return Ok(());
909 }
910
911 let context = self.context.clone();
912 let network_client = self.network_client.clone();
913 let block_verifier = self.block_verifier.clone();
914 let transaction_certifier = self.transaction_certifier.clone();
915 let commit_vote_monitor = self.commit_vote_monitor.clone();
916 let core_dispatcher = self.core_dispatcher.clone();
917 let blocks_to_fetch = self.inflight_blocks_map.clone();
918 let commands_sender = self.commands_sender.clone();
919 let dag_state = self.dag_state.clone();
920 let round_tracker = self.round_tracker.clone();
921
922 if self.is_commit_lagging() {
925 return Ok(());
926 }
927
928 self.fetch_blocks_scheduler_task
929 .spawn(monitored_future!(async move {
930 let _scope = monitored_scope("FetchMissingBlocksScheduler");
931 context
932 .metrics
933 .node_metrics
934 .fetch_blocks_scheduler_inflight
935 .inc();
936 let total_requested = missing_blocks.len();
937
938 fail_point_async!("consensus-delay");
939 let results = Self::fetch_blocks_from_authorities(
941 context.clone(),
942 blocks_to_fetch.clone(),
943 network_client,
944 missing_blocks,
945 dag_state,
946 )
947 .await;
948 context
949 .metrics
950 .node_metrics
951 .fetch_blocks_scheduler_inflight
952 .dec();
953 if results.is_empty() {
954 return;
955 }
956
957 let mut total_fetched = 0;
959 for (blocks_guard, fetched_blocks, peer) in results {
960 total_fetched += fetched_blocks.len();
961
962 if let Err(err) = Self::process_fetched_blocks(
963 fetched_blocks,
964 peer,
965 blocks_guard,
966 core_dispatcher.clone(),
967 block_verifier.clone(),
968 transaction_certifier.clone(),
969 commit_vote_monitor.clone(),
970 context.clone(),
971 commands_sender.clone(),
972 round_tracker.clone(),
973 "periodic",
974 )
975 .await
976 {
977 warn!(
978 "Error occurred while processing fetched blocks from peer {peer}: {err}"
979 );
980 context
981 .metrics
982 .node_metrics
983 .synchronizer_process_fetched_failures
984 .with_label_values(&[
985 context.committee.authority(peer).hostname.as_str(),
986 "periodic",
987 ])
988 .inc();
989 }
990 }
991
992 debug!(
993 "Total blocks requested to fetch: {}, total fetched: {}",
994 total_requested, total_fetched
995 );
996 }));
997 Ok(())
998 }
999
1000 fn is_commit_lagging(&self) -> bool {
1001 let last_commit_index = self.dag_state.read().last_commit_index();
1002 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1003 let commit_threshold = last_commit_index
1004 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1005 commit_threshold < quorum_commit_index
1006 }
1007
1008 async fn fetch_blocks_from_authorities(
1014 context: Arc<Context>,
1015 inflight_blocks: Arc<InflightBlocksMap>,
1016 network_client: Arc<SynchronizerClient<VC, OC>>,
1017 missing_blocks: BTreeSet<BlockRef>,
1018 dag_state: Arc<RwLock<DagState>>,
1019 ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1020 let missing_blocks = missing_blocks
1024 .into_iter()
1025 .take(2 * MAX_PERIODIC_SYNC_PEERS * context.parameters.max_blocks_per_sync)
1026 .collect::<Vec<_>>();
1027
1028 let mut authorities = BTreeMap::<AuthorityIndex, Vec<BlockRef>>::new();
1030 for block_ref in &missing_blocks {
1031 authorities
1032 .entry(block_ref.author)
1033 .or_default()
1034 .push(*block_ref);
1035 }
1036 let num_authorities_per_peer = authorities
1039 .len()
1040 .div_ceil((context.committee.size() - 1).min(MAX_PERIODIC_SYNC_PEERS));
1041
1042 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1044 for (authority, blocks) in &authorities {
1045 missing_blocks_per_authority[*authority] += blocks.len();
1046 }
1047 for (missing, (_, authority)) in missing_blocks_per_authority
1048 .into_iter()
1049 .zip(context.committee.authorities())
1050 {
1051 context
1052 .metrics
1053 .node_metrics
1054 .synchronizer_missing_blocks_by_authority
1055 .with_label_values(&[&authority.hostname])
1056 .inc_by(missing as u64);
1057 context
1058 .metrics
1059 .node_metrics
1060 .synchronizer_current_missing_blocks_by_authority
1061 .with_label_values(&[&authority.hostname])
1062 .set(missing as i64);
1063 }
1064
1065 let mut peers = context
1066 .committee
1067 .authorities()
1068 .filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
1069 .collect::<Vec<_>>();
1070
1071 if cfg!(not(test)) {
1073 peers.shuffle(&mut ThreadRng::default());
1075 }
1076
1077 let mut peers = peers.into_iter();
1078 let mut request_futures = FuturesUnordered::new();
1079
1080 let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1081
1082 let mut authorities = authorities.into_values().collect::<Vec<_>>();
1084 if cfg!(not(test)) {
1085 authorities.shuffle(&mut ThreadRng::default());
1087 }
1088
1089 for batch in authorities.chunks(num_authorities_per_peer) {
1091 let Some(peer) = peers.next() else {
1092 debug_fatal!("No more peers left to fetch blocks!");
1093 break;
1094 };
1095 let peer_hostname = &context.committee.authority(peer).hostname;
1096 let block_refs = batch
1099 .iter()
1100 .flatten()
1101 .cloned()
1102 .collect::<BTreeSet<_>>()
1103 .into_iter()
1104 .take(context.parameters.max_blocks_per_sync)
1105 .collect::<BTreeSet<_>>();
1106
1107 if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1109 info!(
1110 "Periodic sync of {} missing blocks from peer {} {}: {}",
1111 block_refs.len(),
1112 peer,
1113 peer_hostname,
1114 block_refs
1115 .iter()
1116 .map(|b| b.to_string())
1117 .collect::<Vec<_>>()
1118 .join(", ")
1119 );
1120 request_futures.push(Self::fetch_blocks_request(
1121 network_client.clone(),
1122 peer,
1123 blocks_guard,
1124 highest_rounds.clone(),
1125 false,
1126 FETCH_REQUEST_TIMEOUT,
1127 1,
1128 ));
1129 }
1130 }
1131
1132 let mut results = Vec::new();
1133 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1134
1135 tokio::pin!(fetcher_timeout);
1136
1137 loop {
1138 tokio::select! {
1139 Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1140 let peer_hostname = &context.committee.authority(peer_index).hostname;
1141 match response {
1142 Ok(fetched_blocks) => {
1143 results.push((blocks_guard, fetched_blocks, peer_index));
1144
1145 if request_futures.is_empty() {
1147 break;
1148 }
1149 },
1150 Err(_) => {
1151 context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1152 if let Some(next_peer) = peers.next() {
1154 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1156 info!(
1157 "Retrying syncing {} missing blocks from peer {}: {}",
1158 blocks_guard.block_refs.len(),
1159 peer_hostname,
1160 blocks_guard.block_refs
1161 .iter()
1162 .map(|b| b.to_string())
1163 .collect::<Vec<_>>()
1164 .join(", ")
1165 );
1166 request_futures.push(Self::fetch_blocks_request(
1167 network_client.clone(),
1168 next_peer,
1169 blocks_guard,
1170 highest_rounds,
1171 false,
1172 FETCH_REQUEST_TIMEOUT,
1173 1,
1174 ));
1175 } else {
1176 debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1177 }
1178 } else {
1179 debug!("No more peers left to fetch blocks");
1180 }
1181 }
1182 }
1183 },
1184 _ = &mut fetcher_timeout => {
1185 debug!("Timed out while fetching missing blocks");
1186 break;
1187 }
1188 }
1189 }
1190
1191 results
1192 }
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197 use std::{
1198 collections::{BTreeMap, BTreeSet},
1199 sync::Arc,
1200 time::Duration,
1201 };
1202
1203 use async_trait::async_trait;
1204 use bytes::Bytes;
1205 use consensus_config::{AuthorityIndex, Parameters};
1206 use consensus_types::block::{BlockDigest, BlockRef, Round};
1207 use mysten_metrics::monitored_mpsc;
1208 use parking_lot::RwLock;
1209 use tokio::{sync::Mutex, time::sleep};
1210
1211 use crate::commit::{CommitVote, TrustedCommit};
1212 use crate::{
1213 CommitDigest, CommitIndex,
1214 block::{TestBlock, VerifiedBlock},
1215 block_verifier::NoopBlockVerifier,
1216 commit_vote_monitor::CommitVoteMonitor,
1217 context::Context,
1218 core_thread::CoreThreadDispatcher,
1219 dag_state::DagState,
1220 error::{ConsensusError, ConsensusResult},
1221 network::{BlockStream, ObserverNetworkClient, SynchronizerClient, ValidatorNetworkClient},
1222 storage::mem_store::MemStore,
1223 synchronizer::{
1224 FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, Synchronizer,
1225 },
1226 };
1227 use crate::{
1228 authority_service::COMMIT_LAG_MULTIPLIER, core_thread::MockCoreThreadDispatcher,
1229 round_tracker::RoundTracker, transaction_certifier::TransactionCertifier,
1230 };
1231
1232 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1233 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1234 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1235 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1236
1237 #[derive(Default)]
1238 struct MockNetworkClient {
1239 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1240 fetch_latest_blocks_requests:
1241 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1242 }
1243
1244 impl MockNetworkClient {
1245 async fn stub_fetch_blocks(
1246 &self,
1247 blocks: Vec<VerifiedBlock>,
1248 peer: AuthorityIndex,
1249 latency: Option<Duration>,
1250 ) {
1251 let mut lock = self.fetch_blocks_requests.lock().await;
1252 let block_refs = blocks
1253 .iter()
1254 .map(|block| block.reference())
1255 .collect::<Vec<_>>();
1256 lock.insert((block_refs, peer), (blocks, latency));
1257 }
1258
1259 async fn stub_fetch_latest_blocks(
1260 &self,
1261 blocks: Vec<VerifiedBlock>,
1262 peer: AuthorityIndex,
1263 authorities: Vec<AuthorityIndex>,
1264 latency: Option<Duration>,
1265 ) {
1266 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1267 lock.entry((peer, authorities))
1268 .or_default()
1269 .push((blocks, latency));
1270 }
1271
1272 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1273 let lock = self.fetch_latest_blocks_requests.lock().await;
1274 lock.len()
1275 }
1276 }
1277
1278 #[async_trait]
1279 impl ValidatorNetworkClient for MockNetworkClient {
1280 async fn subscribe_blocks(
1281 &self,
1282 _peer: AuthorityIndex,
1283 _last_received: Round,
1284 _timeout: Duration,
1285 ) -> ConsensusResult<BlockStream> {
1286 unimplemented!("subscribe_blocks not implemented in mock")
1287 }
1288
1289 async fn fetch_blocks(
1290 &self,
1291 peer: AuthorityIndex,
1292 block_refs: Vec<BlockRef>,
1293 _highest_accepted_rounds: Vec<Round>,
1294 _breadth_first: bool,
1295 _timeout: Duration,
1296 ) -> ConsensusResult<Vec<Bytes>> {
1297 let mut lock = self.fetch_blocks_requests.lock().await;
1298 let response = lock.remove(&(block_refs.clone(), peer)).unwrap_or_else(|| {
1299 panic!(
1300 "Unexpected fetch blocks request made: {:?} {}. Current lock: {:?}",
1301 block_refs, peer, lock
1302 );
1303 });
1304
1305 let serialised = response
1306 .0
1307 .into_iter()
1308 .map(|block| block.serialized().clone())
1309 .collect::<Vec<_>>();
1310
1311 drop(lock);
1312
1313 if let Some(latency) = response.1 {
1314 sleep(latency).await;
1315 }
1316
1317 Ok(serialised)
1318 }
1319
1320 async fn fetch_commits(
1321 &self,
1322 _peer: AuthorityIndex,
1323 _commit_range: crate::commit::CommitRange,
1324 _timeout: Duration,
1325 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1326 unimplemented!("fetch_commits not implemented in mock")
1327 }
1328
1329 async fn fetch_latest_blocks(
1330 &self,
1331 peer: AuthorityIndex,
1332 authorities: Vec<AuthorityIndex>,
1333 _timeout: Duration,
1334 ) -> ConsensusResult<Vec<Bytes>> {
1335 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1336 let mut responses = lock
1337 .remove(&(peer, authorities.clone()))
1338 .expect("Unexpected fetch blocks request made");
1339
1340 let response = responses.remove(0);
1341 let serialised = response
1342 .0
1343 .into_iter()
1344 .map(|block| block.serialized().clone())
1345 .collect::<Vec<_>>();
1346
1347 if !responses.is_empty() {
1348 lock.insert((peer, authorities), responses);
1349 }
1350
1351 drop(lock);
1352
1353 if let Some(latency) = response.1 {
1354 sleep(latency).await;
1355 }
1356
1357 Ok(serialised)
1358 }
1359
1360 async fn get_latest_rounds(
1361 &self,
1362 _peer: AuthorityIndex,
1363 _timeout: Duration,
1364 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1365 unimplemented!("get_latest_rounds not implemented in mock")
1366 }
1367
1368 #[cfg(test)]
1369 async fn send_block(
1370 &self,
1371 _peer: AuthorityIndex,
1372 _block: &VerifiedBlock,
1373 _timeout: Duration,
1374 ) -> ConsensusResult<()> {
1375 unimplemented!("send_block not implemented in mock")
1376 }
1377 }
1378
1379 #[async_trait]
1380 impl ObserverNetworkClient for MockNetworkClient {
1381 async fn stream_blocks(
1382 &self,
1383 _peer: crate::network::PeerId,
1384 _request_stream: crate::network::BlockRequestStream,
1385 _timeout: Duration,
1386 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1387 unimplemented!("stream_blocks not implemented in mock")
1388 }
1389
1390 async fn fetch_blocks(
1391 &self,
1392 _peer: crate::network::PeerId,
1393 _block_refs: Vec<BlockRef>,
1394 _timeout: Duration,
1395 ) -> ConsensusResult<Vec<Bytes>> {
1396 unimplemented!("Observer fetch_blocks not implemented in mock")
1397 }
1398
1399 async fn fetch_commits(
1400 &self,
1401 _peer: crate::network::PeerId,
1402 _commit_range: crate::commit::CommitRange,
1403 _timeout: Duration,
1404 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1405 unimplemented!("Observer fetch_commits not implemented in mock")
1406 }
1407 }
1408
1409 #[test]
1410 fn test_inflight_blocks_map() {
1411 let map = InflightBlocksMap::new();
1413 let some_block_refs = [
1414 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1415 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1416 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1417 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1418 ];
1419 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1420
1421 {
1423 let mut all_guards = Vec::new();
1424
1425 for i in 1..=2 {
1427 let authority = AuthorityIndex::new_for_test(i);
1428
1429 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1430 let guard = guard.expect("Guard should be created");
1431 assert_eq!(guard.block_refs.len(), 4);
1432
1433 all_guards.push(guard);
1434
1435 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1437 assert!(guard.is_none());
1438 }
1439
1440 let authority_3 = AuthorityIndex::new_for_test(3);
1442
1443 let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1444 assert!(guard.is_none());
1445
1446 drop(all_guards.remove(0));
1448
1449 let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1450 let guard = guard.expect("Guard should be successfully acquired");
1451
1452 assert_eq!(guard.block_refs, missing_block_refs);
1453
1454 drop(guard);
1456 drop(all_guards);
1457
1458 assert_eq!(map.num_of_locked_blocks(), 0);
1459 }
1460
1461 {
1463 let authority_1 = AuthorityIndex::new_for_test(1);
1465 let guard = map
1466 .lock_blocks(missing_block_refs.clone(), authority_1)
1467 .unwrap();
1468
1469 let authority_2 = AuthorityIndex::new_for_test(2);
1471 let guard = map.swap_locks(guard, authority_2);
1472
1473 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1474 }
1475 }
1476
1477 #[tokio::test]
1478 async fn successful_fetch_blocks_from_peer() {
1479 let (context, _) = Context::new_for_test(4);
1481 let context = Arc::new(context);
1482 let block_verifier = Arc::new(NoopBlockVerifier {});
1483 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1484 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1485 let mock_client = Arc::new(MockNetworkClient::default());
1486 let (blocks_sender, _blocks_receiver) =
1487 monitored_mpsc::unbounded_channel("consensus_block_output");
1488 let store = Arc::new(MemStore::new());
1489 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1490 let transaction_certifier = TransactionCertifier::new(
1491 context.clone(),
1492 block_verifier.clone(),
1493 dag_state.clone(),
1494 blocks_sender,
1495 );
1496 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1497
1498 let network_client = Arc::new(SynchronizerClient::new(
1499 context.clone(),
1500 Some(mock_client.clone()),
1501 Some(mock_client.clone()),
1502 ));
1503 let handle = Synchronizer::start(
1504 network_client,
1505 context,
1506 core_dispatcher.clone(),
1507 commit_vote_monitor,
1508 block_verifier,
1509 transaction_certifier,
1510 round_tracker,
1511 dag_state,
1512 false,
1513 );
1514
1515 let expected_blocks = (0..10)
1517 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1518 .collect::<Vec<_>>();
1519 let missing_blocks = expected_blocks
1520 .iter()
1521 .map(|block| block.reference())
1522 .collect::<BTreeSet<_>>();
1523
1524 let peer = AuthorityIndex::new_for_test(1);
1526 mock_client
1527 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1528 .await;
1529
1530 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1532
1533 sleep(Duration::from_millis(1_000)).await;
1535
1536 let added_blocks = core_dispatcher.get_add_blocks().await;
1538 assert_eq!(added_blocks, expected_blocks);
1539 }
1540
1541 #[tokio::test]
1542 async fn saturate_fetch_blocks_from_peer() {
1543 let (context, _) = Context::new_for_test(4);
1545 let context = Arc::new(context);
1546 let block_verifier = Arc::new(NoopBlockVerifier {});
1547 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1548 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1549 let mock_client = Arc::new(MockNetworkClient::default());
1550 let (blocks_sender, _blocks_receiver) =
1551 monitored_mpsc::unbounded_channel("consensus_block_output");
1552 let store = Arc::new(MemStore::new());
1553 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1554 let transaction_certifier = TransactionCertifier::new(
1555 context.clone(),
1556 block_verifier.clone(),
1557 dag_state.clone(),
1558 blocks_sender,
1559 );
1560 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1561
1562 let network_client = Arc::new(SynchronizerClient::new(
1563 context.clone(),
1564 Some(mock_client.clone()),
1565 Some(mock_client.clone()),
1566 ));
1567 let handle = Synchronizer::start(
1568 network_client,
1569 context,
1570 core_dispatcher.clone(),
1571 commit_vote_monitor,
1572 block_verifier,
1573 transaction_certifier,
1574 round_tracker,
1575 dag_state,
1576 false,
1577 );
1578
1579 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1581 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1582 .collect::<Vec<_>>();
1583
1584 let peer = AuthorityIndex::new_for_test(1);
1586 let mut iter = expected_blocks.iter().peekable();
1587 while let Some(block) = iter.next() {
1588 mock_client
1591 .stub_fetch_blocks(
1592 vec![block.clone()],
1593 peer,
1594 Some(Duration::from_millis(5_000)),
1595 )
1596 .await;
1597
1598 let mut missing_blocks = BTreeSet::new();
1599 missing_blocks.insert(block.reference());
1600
1601 if iter.peek().is_none() {
1604 match handle.fetch_blocks(missing_blocks, peer).await {
1605 Err(ConsensusError::SynchronizerSaturated(index)) => {
1606 assert_eq!(index, peer);
1607 }
1608 _ => panic!("A saturated synchronizer error was expected"),
1609 }
1610 } else {
1611 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1612 }
1613 }
1614 }
1615
1616 #[tokio::test(flavor = "current_thread", start_paused = true)]
1617 async fn synchronizer_periodic_task_fetch_blocks() {
1618 let (context, _) = Context::new_for_test(4);
1620 let context = Arc::new(context);
1621 let block_verifier = Arc::new(NoopBlockVerifier {});
1622 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1623 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1624 let mock_client = Arc::new(MockNetworkClient::default());
1625 let (blocks_sender, _blocks_receiver) =
1626 monitored_mpsc::unbounded_channel("consensus_block_output");
1627 let store = Arc::new(MemStore::new());
1628 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1629 let transaction_certifier = TransactionCertifier::new(
1630 context.clone(),
1631 block_verifier.clone(),
1632 dag_state.clone(),
1633 blocks_sender,
1634 );
1635 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1636
1637 let expected_blocks = (0..10)
1639 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1640 .collect::<Vec<_>>();
1641 let missing_blocks = expected_blocks
1642 .iter()
1643 .map(|block| block.reference())
1644 .collect::<BTreeSet<_>>();
1645
1646 core_dispatcher
1648 .stub_missing_blocks(missing_blocks.clone())
1649 .await;
1650
1651 mock_client
1655 .stub_fetch_blocks(
1656 expected_blocks.clone(),
1657 AuthorityIndex::new_for_test(1),
1658 Some(FETCH_REQUEST_TIMEOUT),
1659 )
1660 .await;
1661 mock_client
1662 .stub_fetch_blocks(
1663 expected_blocks.clone(),
1664 AuthorityIndex::new_for_test(2),
1665 None,
1666 )
1667 .await;
1668
1669 let network_client = Arc::new(SynchronizerClient::new(
1671 context.clone(),
1672 Some(mock_client.clone()),
1673 Some(mock_client.clone()),
1674 ));
1675 let _handle = Synchronizer::start(
1676 network_client,
1677 context,
1678 core_dispatcher.clone(),
1679 commit_vote_monitor,
1680 block_verifier,
1681 transaction_certifier,
1682 round_tracker,
1683 dag_state,
1684 false,
1685 );
1686
1687 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1688
1689 let added_blocks = core_dispatcher.get_add_blocks().await;
1691 assert_eq!(added_blocks, expected_blocks);
1692
1693 assert!(
1695 core_dispatcher
1696 .get_missing_blocks()
1697 .await
1698 .unwrap()
1699 .is_empty()
1700 );
1701 }
1702
1703 #[tokio::test(flavor = "current_thread", start_paused = true)]
1704 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1705 let (context, _) = Context::new_for_test(4);
1707 let context = Arc::new(context);
1708 let block_verifier = Arc::new(NoopBlockVerifier {});
1709 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1710 let mock_client = Arc::new(MockNetworkClient::default());
1711 let (blocks_sender, _blocks_receiver) =
1712 monitored_mpsc::unbounded_channel("consensus_block_output");
1713 let store = Arc::new(MemStore::new());
1714 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1715 let transaction_certifier = TransactionCertifier::new(
1716 context.clone(),
1717 block_verifier.clone(),
1718 dag_state.clone(),
1719 blocks_sender,
1720 );
1721 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1722 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1723
1724 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1726 let stub_blocks = (sync_missing_block_round_threshold * 2
1727 ..sync_missing_block_round_threshold * 3)
1728 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1729 .collect::<Vec<_>>();
1730 let missing_blocks = stub_blocks
1731 .iter()
1732 .map(|block| block.reference())
1733 .collect::<BTreeSet<_>>();
1734 core_dispatcher
1735 .stub_missing_blocks(missing_blocks.clone())
1736 .await;
1737
1738 let mut expected_blocks = stub_blocks
1742 .iter()
1743 .take(context.parameters.max_blocks_per_sync)
1744 .cloned()
1745 .collect::<Vec<_>>();
1746 mock_client
1747 .stub_fetch_blocks(
1748 expected_blocks.clone(),
1749 AuthorityIndex::new_for_test(1),
1750 Some(FETCH_REQUEST_TIMEOUT),
1751 )
1752 .await;
1753 mock_client
1754 .stub_fetch_blocks(
1755 expected_blocks.clone(),
1756 AuthorityIndex::new_for_test(2),
1757 None,
1758 )
1759 .await;
1760
1761 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1763 let commit_index: CommitIndex = round - 1;
1764 let blocks = (0..4)
1765 .map(|authority| {
1766 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1767 let block = TestBlock::new(round, authority)
1768 .set_commit_votes(commit_votes)
1769 .build();
1770
1771 VerifiedBlock::new_for_test(block)
1772 })
1773 .collect::<Vec<_>>();
1774
1775 for block in blocks {
1778 commit_vote_monitor.observe_block(&block);
1779 }
1780
1781 let network_client = Arc::new(SynchronizerClient::new(
1783 context.clone(),
1784 Some(mock_client.clone()),
1785 Some(mock_client.clone()),
1786 ));
1787 let _handle = Synchronizer::start(
1788 network_client,
1789 context.clone(),
1790 core_dispatcher.clone(),
1791 commit_vote_monitor.clone(),
1792 block_verifier,
1793 transaction_certifier,
1794 round_tracker,
1795 dag_state.clone(),
1796 false,
1797 );
1798
1799 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1800
1801 let added_blocks = core_dispatcher.get_add_blocks().await;
1804 assert_eq!(added_blocks, vec![]);
1805
1806 {
1809 let mut d = dag_state.write();
1810 for index in 1..=commit_index {
1811 let commit =
1812 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
1813
1814 d.add_commit(commit);
1815 }
1816
1817 assert_eq!(
1818 d.last_commit_index(),
1819 commit_vote_monitor.quorum_commit_index()
1820 );
1821 }
1822
1823 core_dispatcher
1825 .stub_missing_blocks(missing_blocks.clone())
1826 .await;
1827
1828 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1829
1830 let mut added_blocks = core_dispatcher.get_add_blocks().await;
1832
1833 added_blocks.sort_by_key(|block| block.reference());
1834 expected_blocks.sort_by_key(|block| block.reference());
1835
1836 assert_eq!(added_blocks, expected_blocks);
1837 }
1838
1839 #[tokio::test(flavor = "current_thread", start_paused = true)]
1840 async fn synchronizer_fetch_own_last_block() {
1841 let (context, _) = Context::new_for_test(4);
1843 let context = Arc::new(context.with_parameters(Parameters {
1844 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1845 ..Default::default()
1846 }));
1847 let block_verifier = Arc::new(NoopBlockVerifier {});
1848 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1849 let mock_client = Arc::new(MockNetworkClient::default());
1850 let (blocks_sender, _blocks_receiver) =
1851 monitored_mpsc::unbounded_channel("consensus_block_output");
1852 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1853 let store = Arc::new(MemStore::new());
1854 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1855 let transaction_certifier = TransactionCertifier::new(
1856 context.clone(),
1857 block_verifier.clone(),
1858 dag_state.clone(),
1859 blocks_sender,
1860 );
1861 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1862 let our_index = AuthorityIndex::new_for_test(0);
1863
1864 let mut expected_blocks = (9..=10)
1866 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1867 .collect::<Vec<_>>();
1868
1869 let block_1 = expected_blocks.pop().unwrap();
1872 mock_client
1873 .stub_fetch_latest_blocks(
1874 vec![block_1.clone()],
1875 AuthorityIndex::new_for_test(1),
1876 vec![our_index],
1877 None,
1878 )
1879 .await;
1880 mock_client
1881 .stub_fetch_latest_blocks(
1882 vec![block_1],
1883 AuthorityIndex::new_for_test(1),
1884 vec![our_index],
1885 None,
1886 )
1887 .await;
1888
1889 let block_2 = expected_blocks.pop().unwrap();
1891 mock_client
1892 .stub_fetch_latest_blocks(
1893 vec![block_2.clone()],
1894 AuthorityIndex::new_for_test(2),
1895 vec![our_index],
1896 Some(Duration::from_secs(10)),
1897 )
1898 .await;
1899 mock_client
1900 .stub_fetch_latest_blocks(
1901 vec![block_2],
1902 AuthorityIndex::new_for_test(2),
1903 vec![our_index],
1904 None,
1905 )
1906 .await;
1907
1908 mock_client
1910 .stub_fetch_latest_blocks(
1911 vec![],
1912 AuthorityIndex::new_for_test(3),
1913 vec![our_index],
1914 Some(Duration::from_secs(10)),
1915 )
1916 .await;
1917 mock_client
1918 .stub_fetch_latest_blocks(
1919 vec![],
1920 AuthorityIndex::new_for_test(3),
1921 vec![our_index],
1922 None,
1923 )
1924 .await;
1925
1926 let network_client = Arc::new(SynchronizerClient::new(
1928 context.clone(),
1929 Some(mock_client.clone()),
1930 Some(mock_client.clone()),
1931 ));
1932 let handle = Synchronizer::start(
1933 network_client,
1934 context.clone(),
1935 core_dispatcher.clone(),
1936 commit_vote_monitor,
1937 block_verifier,
1938 transaction_certifier,
1939 round_tracker,
1940 dag_state,
1941 true,
1942 );
1943
1944 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
1946
1947 assert_eq!(
1949 core_dispatcher.get_last_own_proposed_round().await,
1950 vec![10]
1951 );
1952
1953 assert_eq!(mock_client.fetch_latest_blocks_pending_calls().await, 0);
1955
1956 assert_eq!(
1958 context
1959 .metrics
1960 .node_metrics
1961 .sync_last_known_own_block_retries
1962 .get(),
1963 1
1964 );
1965
1966 if let Err(err) = handle.stop().await
1968 && err.is_panic()
1969 {
1970 std::panic::resume_unwind(err.into_panic());
1971 }
1972 }
1973
1974 #[tokio::test]
1975 async fn test_process_fetched_blocks() {
1976 let (context, _) = Context::new_for_test(4);
1978 let context = Arc::new(context);
1979 let block_verifier = Arc::new(NoopBlockVerifier {});
1980 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1981 let (blocks_sender, _blocks_receiver) =
1982 monitored_mpsc::unbounded_channel("consensus_block_output");
1983 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1984 let store = Arc::new(MemStore::new());
1985 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1986 let transaction_certifier = TransactionCertifier::new(
1987 context.clone(),
1988 block_verifier.clone(),
1989 dag_state.clone(),
1990 blocks_sender,
1991 );
1992 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1993 let (commands_sender, _commands_receiver) =
1994 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
1995
1996 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2000 expected_blocks.extend(
2001 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2002 );
2003 assert_eq!(
2004 expected_blocks.len(),
2005 context.parameters.max_blocks_per_sync
2006 );
2007
2008 let expected_serialized_blocks = expected_blocks
2009 .iter()
2010 .map(|b| b.serialized().clone())
2011 .collect::<Vec<_>>();
2012
2013 let expected_block_refs = expected_blocks
2014 .iter()
2015 .map(|b| b.reference())
2016 .collect::<BTreeSet<_>>();
2017
2018 let peer_index = AuthorityIndex::new_for_test(2);
2020
2021 let inflight_blocks_map = InflightBlocksMap::new();
2023 let blocks_guard = inflight_blocks_map
2024 .lock_blocks(expected_block_refs.clone(), peer_index)
2025 .expect("Failed to lock blocks");
2026
2027 assert_eq!(
2028 inflight_blocks_map.num_of_locked_blocks(),
2029 expected_block_refs.len()
2030 );
2031
2032 let result = Synchronizer::<
2034 NoopBlockVerifier,
2035 MockCoreThreadDispatcher,
2036 MockNetworkClient,
2037 MockNetworkClient,
2038 >::process_fetched_blocks(
2039 expected_serialized_blocks,
2040 peer_index,
2041 blocks_guard, core_dispatcher.clone(),
2043 block_verifier,
2044 transaction_certifier,
2045 commit_vote_monitor,
2046 context.clone(),
2047 commands_sender,
2048 round_tracker,
2049 "test",
2050 )
2051 .await;
2052
2053 assert!(result.is_ok());
2055
2056 let added_blocks = core_dispatcher.get_add_blocks().await;
2058 assert_eq!(
2059 added_blocks
2060 .iter()
2061 .map(|b| b.reference())
2062 .collect::<BTreeSet<_>>(),
2063 expected_block_refs,
2064 );
2065
2066 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2068 }
2069}