1use std::{
28 collections::{BTreeMap, BTreeSet},
29 sync::Arc,
30 time::Duration,
31};
32
33use bytes::Bytes;
34use consensus_config::AuthorityIndex;
35use consensus_types::block::BlockRef;
36use futures::{StreamExt as _, stream::FuturesOrdered};
37use itertools::Itertools as _;
38use mysten_metrics::spawn_logged_monitored_task;
39use parking_lot::RwLock;
40use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
41use tokio::{
42 runtime::Handle,
43 sync::oneshot,
44 task::{JoinHandle, JoinSet},
45 time::{MissedTickBehavior, sleep},
46};
47use tracing::{debug, info, warn};
48
49use crate::{
50 CommitConsumerMonitor, CommitIndex,
51 block::{BlockAPI, SignedBlock, VerifiedBlock},
52 block_verifier::BlockVerifier,
53 commit::{
54 CertifiedCommit, CertifiedCommits, Commit, CommitAPI as _, CommitDigest, CommitRange,
55 CommitRef, TrustedCommit,
56 },
57 commit_vote_monitor::CommitVoteMonitor,
58 context::Context,
59 core_thread::CoreThreadDispatcher,
60 dag_state::DagState,
61 error::{ConsensusError, ConsensusResult},
62 network::NetworkClient,
63 stake_aggregator::{QuorumThreshold, StakeAggregator},
64 transaction_certifier::TransactionCertifier,
65};
66
67pub(crate) struct CommitSyncerHandle {
69 schedule_task: JoinHandle<()>,
70 tx_shutdown: oneshot::Sender<()>,
71}
72
73impl CommitSyncerHandle {
74 pub(crate) async fn stop(self) {
75 let _ = self.tx_shutdown.send(());
76 if let Err(e) = self.schedule_task.await
78 && e.is_panic()
79 {
80 std::panic::resume_unwind(e.into_panic());
81 }
82 }
83}
84
85pub(crate) struct CommitSyncer<C: NetworkClient> {
86 inner: Arc<Inner<C>>,
90
91 inflight_fetches: JoinSet<(u32, CertifiedCommits)>,
95 pending_fetches: BTreeSet<CommitRange>,
97 fetched_ranges: BTreeMap<CommitRange, CertifiedCommits>,
99 highest_scheduled_index: Option<CommitIndex>,
102 highest_fetched_commit_index: CommitIndex,
105 synced_commit_index: CommitIndex,
108}
109
110impl<C: NetworkClient> CommitSyncer<C> {
111 pub(crate) fn new(
112 context: Arc<Context>,
113 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
114 commit_vote_monitor: Arc<CommitVoteMonitor>,
115 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
116 block_verifier: Arc<dyn BlockVerifier>,
117 transaction_certifier: TransactionCertifier,
118 network_client: Arc<C>,
119 dag_state: Arc<RwLock<DagState>>,
120 ) -> Self {
121 let inner = Arc::new(Inner {
122 context,
123 core_thread_dispatcher,
124 commit_vote_monitor,
125 commit_consumer_monitor,
126 block_verifier,
127 transaction_certifier,
128 network_client,
129 dag_state,
130 });
131 let synced_commit_index = inner.dag_state.read().last_commit_index();
132 CommitSyncer {
133 inner,
134 inflight_fetches: JoinSet::new(),
135 pending_fetches: BTreeSet::new(),
136 fetched_ranges: BTreeMap::new(),
137 highest_scheduled_index: None,
138 highest_fetched_commit_index: 0,
139 synced_commit_index,
140 }
141 }
142
143 pub(crate) fn start(self) -> CommitSyncerHandle {
144 let (tx_shutdown, rx_shutdown) = oneshot::channel();
145 let schedule_task = spawn_logged_monitored_task!(self.schedule_loop(rx_shutdown,));
146 CommitSyncerHandle {
147 schedule_task,
148 tx_shutdown,
149 }
150 }
151
152 async fn schedule_loop(mut self, mut rx_shutdown: oneshot::Receiver<()>) {
153 let mut interval = tokio::time::interval(Duration::from_secs(2));
154 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
155
156 loop {
157 tokio::select! {
158 _ = interval.tick() => {
160 self.try_schedule_once();
161 }
162 Some(result) = self.inflight_fetches.join_next(), if !self.inflight_fetches.is_empty() => {
164 if let Err(e) = result {
165 if e.is_panic() {
166 std::panic::resume_unwind(e.into_panic());
167 }
168 warn!("Fetch cancelled. CommitSyncer shutting down: {}", e);
169 self.inflight_fetches.shutdown().await;
171 return;
172 }
173 let (target_end, commits) = result.unwrap();
174 self.handle_fetch_result(target_end, commits).await;
175 }
176 _ = &mut rx_shutdown => {
177 info!("CommitSyncer shutting down ...");
179 self.inflight_fetches.shutdown().await;
180 return;
181 }
182 }
183
184 self.try_start_fetches();
185 }
186 }
187
188 fn try_schedule_once(&mut self) {
189 let quorum_commit_index = self.inner.commit_vote_monitor.quorum_commit_index();
190 let local_commit_index = self.inner.dag_state.read().last_commit_index();
191 let metrics = &self.inner.context.metrics.node_metrics;
192 metrics
193 .commit_sync_quorum_index
194 .set(quorum_commit_index as i64);
195 metrics
196 .commit_sync_local_index
197 .set(local_commit_index as i64);
198 let highest_handled_index = self.inner.commit_consumer_monitor.highest_handled_commit();
199 let highest_scheduled_index = self.highest_scheduled_index.unwrap_or(0);
200 self.synced_commit_index = self.synced_commit_index.max(local_commit_index);
203 let unhandled_commits_threshold = self.unhandled_commits_threshold();
204 info!(
205 "Checking to schedule fetches: synced_commit_index={}, highest_handled_index={}, highest_scheduled_index={}, quorum_commit_index={}, unhandled_commits_threshold={}",
206 self.synced_commit_index,
207 highest_handled_index,
208 highest_scheduled_index,
209 quorum_commit_index,
210 unhandled_commits_threshold,
211 );
212
213 let fetch_after_index = self
215 .synced_commit_index
216 .max(self.highest_scheduled_index.unwrap_or(0));
217 for prev_end in (fetch_after_index..=quorum_commit_index)
219 .step_by(self.inner.context.parameters.commit_sync_batch_size as usize)
220 {
221 let range_start = prev_end + 1;
223 let range_end = prev_end + self.inner.context.parameters.commit_sync_batch_size;
224 if quorum_commit_index < range_end {
228 break;
229 }
230 if highest_handled_index + unhandled_commits_threshold < range_end {
232 warn!(
233 "Skip scheduling new commit fetches: consensus handler is lagging. highest_handled_index={}, highest_scheduled_index={}",
234 highest_handled_index, highest_scheduled_index
235 );
236 break;
237 }
238 self.pending_fetches
239 .insert((range_start..=range_end).into());
240 self.highest_scheduled_index = Some(range_end);
243 }
244 }
245
246 async fn handle_fetch_result(
247 &mut self,
248 target_end: CommitIndex,
249 certified_commits: CertifiedCommits,
250 ) {
251 assert!(!certified_commits.commits().is_empty());
252
253 let (total_blocks_fetched, total_blocks_size_bytes) = certified_commits
254 .commits()
255 .iter()
256 .fold((0, 0), |(blocks, bytes), c| {
257 (
258 blocks + c.blocks().len(),
259 bytes
260 + c.blocks()
261 .iter()
262 .map(|b| b.serialized().len())
263 .sum::<usize>() as u64,
264 )
265 });
266
267 let metrics = &self.inner.context.metrics.node_metrics;
268 metrics
269 .commit_sync_fetched_commits
270 .inc_by(certified_commits.commits().len() as u64);
271 metrics
272 .commit_sync_fetched_blocks
273 .inc_by(total_blocks_fetched as u64);
274 metrics
275 .commit_sync_total_fetched_blocks_size
276 .inc_by(total_blocks_size_bytes);
277
278 let (commit_start, commit_end) = (
279 certified_commits.commits().first().unwrap().index(),
280 certified_commits.commits().last().unwrap().index(),
281 );
282 self.highest_fetched_commit_index = self.highest_fetched_commit_index.max(commit_end);
283 metrics
284 .commit_sync_highest_fetched_index
285 .set(self.highest_fetched_commit_index as i64);
286
287 if commit_end < target_end {
289 self.pending_fetches
290 .insert((commit_end + 1..=target_end).into());
291 }
292 self.synced_commit_index = self
294 .synced_commit_index
295 .max(self.inner.dag_state.read().last_commit_index());
296 if self.synced_commit_index < commit_end {
298 self.fetched_ranges
299 .insert((commit_start..=commit_end).into(), certified_commits);
300 }
301 while let Some((fetched_commit_range, _commits)) = self.fetched_ranges.first_key_value() {
303 let (fetched_commit_range, commits) =
306 if fetched_commit_range.start() <= self.synced_commit_index + 1 {
307 self.fetched_ranges.pop_first().unwrap()
308 } else {
309 metrics.commit_sync_gap_on_processing.inc();
312 break;
313 };
314 if fetched_commit_range.end() <= self.synced_commit_index {
316 continue;
317 }
318
319 debug!(
320 "Fetched blocks for commit range {:?}: {}",
321 fetched_commit_range,
322 commits
323 .commits()
324 .iter()
325 .flat_map(|c| c.blocks())
326 .map(|b| b.reference().to_string())
327 .join(","),
328 );
329
330 match self
333 .inner
334 .core_thread_dispatcher
335 .add_certified_commits(commits)
336 .await
337 {
338 Ok(missing) => {
342 if !missing.is_empty() {
343 info!(
344 "Certification blocks have missing ancestors: {} for commit range {:?}",
345 missing.iter().map(|b| b.to_string()).join(","),
346 fetched_commit_range,
347 );
348 }
349 for block_ref in missing {
350 let hostname = &self
351 .inner
352 .context
353 .committee
354 .authority(block_ref.author)
355 .hostname;
356 metrics
357 .commit_sync_fetch_missing_blocks
358 .with_label_values(&[hostname])
359 .inc();
360 }
361 }
362 Err(e) => {
363 info!("Failed to add blocks, shutting down: {}", e);
364 return;
365 }
366 };
367
368 self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end());
370 }
371
372 metrics
373 .commit_sync_inflight_fetches
374 .set(self.inflight_fetches.len() as i64);
375 metrics
376 .commit_sync_pending_fetches
377 .set(self.pending_fetches.len() as i64);
378 metrics
379 .commit_sync_highest_synced_index
380 .set(self.synced_commit_index as i64);
381 }
382
383 fn try_start_fetches(&mut self) {
384 let target_parallel_fetches = self
388 .inner
389 .context
390 .parameters
391 .commit_sync_parallel_fetches
392 .min(self.inner.context.committee.size() * 2 / 3)
393 .min(
394 self.inner
395 .context
396 .parameters
397 .commit_sync_batches_ahead
398 .saturating_sub(self.fetched_ranges.len()),
399 )
400 .max(1);
401 loop {
403 if self.inflight_fetches.len() >= target_parallel_fetches {
404 break;
405 }
406 let Some(commit_range) = self.pending_fetches.pop_first() else {
407 break;
408 };
409 self.inflight_fetches
410 .spawn(Self::fetch_loop(self.inner.clone(), commit_range));
411 }
412
413 let metrics = &self.inner.context.metrics.node_metrics;
414 metrics
415 .commit_sync_inflight_fetches
416 .set(self.inflight_fetches.len() as i64);
417 metrics
418 .commit_sync_pending_fetches
419 .set(self.pending_fetches.len() as i64);
420 metrics
421 .commit_sync_highest_synced_index
422 .set(self.synced_commit_index as i64);
423 }
424
425 async fn fetch_loop(
429 inner: Arc<Inner<C>>,
430 commit_range: CommitRange,
431 ) -> (CommitIndex, CertifiedCommits) {
432 const TIMEOUT: Duration = Duration::from_secs(10);
434 const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
437 const MAX_NUM_TARGETS: usize = 24;
440 let mut timeout_multiplier = 0;
441 let _timer = inner
442 .context
443 .metrics
444 .node_metrics
445 .commit_sync_fetch_loop_latency
446 .start_timer();
447 info!("Starting to fetch commits in {commit_range:?} ...",);
448 loop {
449 let mut target_authorities = inner
451 .context
452 .committee
453 .authorities()
454 .filter_map(|(i, _)| {
455 if i != inner.context.own_index {
456 Some(i)
457 } else {
458 None
459 }
460 })
461 .collect_vec();
462 target_authorities.shuffle(&mut ThreadRng::default());
463 target_authorities.truncate(MAX_NUM_TARGETS);
464 timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
466 let request_timeout = TIMEOUT * timeout_multiplier;
467 let fetch_timeout = request_timeout * 4;
473 for authority in target_authorities {
475 match tokio::time::timeout(
476 fetch_timeout,
477 Self::fetch_once(
478 inner.clone(),
479 authority,
480 commit_range.clone(),
481 request_timeout,
482 ),
483 )
484 .await
485 {
486 Ok(Ok(commits)) => {
487 info!("Finished fetching commits in {commit_range:?}",);
488 return (commit_range.end(), commits);
489 }
490 Ok(Err(e)) => {
491 let hostname = inner
492 .context
493 .committee
494 .authority(authority)
495 .hostname
496 .clone();
497 warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e);
498 inner
499 .context
500 .metrics
501 .node_metrics
502 .commit_sync_fetch_once_errors
503 .with_label_values(&[&hostname, e.name()])
504 .inc();
505 }
506 Err(_) => {
507 let hostname = inner
508 .context
509 .committee
510 .authority(authority)
511 .hostname
512 .clone();
513 warn!("Timed out fetching {commit_range:?} from {authority}",);
514 inner
515 .context
516 .metrics
517 .node_metrics
518 .commit_sync_fetch_once_errors
519 .with_label_values(&[&hostname, "FetchTimeout"])
520 .inc();
521 }
522 }
523 }
524 sleep(TIMEOUT).await;
526 }
527 }
528
529 async fn fetch_once(
533 inner: Arc<Inner<C>>,
534 target_authority: AuthorityIndex,
535 commit_range: CommitRange,
536 timeout: Duration,
537 ) -> ConsensusResult<CertifiedCommits> {
538 let _timer = inner
539 .context
540 .metrics
541 .node_metrics
542 .commit_sync_fetch_once_latency
543 .start_timer();
544
545 let (serialized_commits, serialized_blocks) = inner
547 .network_client
548 .fetch_commits(target_authority, commit_range.clone(), timeout)
549 .await?;
550
551 let (commits, vote_blocks) = Handle::current()
555 .spawn_blocking({
556 let inner = inner.clone();
557 move || {
558 inner.verify_commits(
559 target_authority,
560 commit_range,
561 serialized_commits,
562 serialized_blocks,
563 )
564 }
565 })
566 .await
567 .expect("Spawn blocking should not fail")?;
568
569 let mut block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
571 block_refs.sort();
572 let num_chunks = block_refs
573 .len()
574 .div_ceil(inner.context.parameters.max_blocks_per_fetch)
575 as u32;
576 let mut requests: FuturesOrdered<_> = block_refs
577 .chunks(inner.context.parameters.max_blocks_per_fetch)
578 .enumerate()
579 .map(|(i, request_block_refs)| {
580 let inner = inner.clone();
581 async move {
582 sleep(timeout * i as u32 / num_chunks).await;
584 let serialized_blocks = inner
586 .network_client
587 .fetch_blocks(
588 target_authority,
589 request_block_refs.to_vec(),
590 vec![],
591 false,
592 timeout,
593 )
594 .await?;
595 if request_block_refs.len() != serialized_blocks.len() {
597 return Err(ConsensusError::UnexpectedNumberOfBlocksFetched {
598 authority: target_authority,
599 requested: request_block_refs.len(),
600 received: serialized_blocks.len(),
601 });
602 }
603 let signed_blocks = serialized_blocks
605 .iter()
606 .map(|serialized| {
607 let block: SignedBlock = bcs::from_bytes(serialized)
608 .map_err(ConsensusError::MalformedBlock)?;
609 Ok(block)
610 })
611 .collect::<ConsensusResult<Vec<_>>>()?;
612 let mut blocks = Vec::new();
615 for ((requested_block_ref, signed_block), serialized) in request_block_refs
616 .iter()
617 .zip(signed_blocks.into_iter())
618 .zip(serialized_blocks.into_iter())
619 {
620 let signed_block_digest = VerifiedBlock::compute_digest(&serialized);
621 let received_block_ref = BlockRef::new(
622 signed_block.round(),
623 signed_block.author(),
624 signed_block_digest,
625 );
626 if *requested_block_ref != received_block_ref {
627 return Err(ConsensusError::UnexpectedBlockForCommit {
628 peer: target_authority,
629 requested: *requested_block_ref,
630 received: received_block_ref,
631 });
632 }
633 blocks.push(VerifiedBlock::new_verified(signed_block, serialized));
634 }
635 Ok(blocks)
636 }
637 })
638 .collect();
639
640 let mut fetched_blocks = BTreeMap::new();
641 while let Some(result) = requests.next().await {
642 for block in result? {
643 fetched_blocks.insert(block.reference(), block);
644 }
645 }
646
647 for block in fetched_blocks.values().chain(vote_blocks.iter()) {
649 let now_ms = inner.context.clock.timestamp_utc_ms();
650 let forward_drift = block.timestamp_ms().saturating_sub(now_ms);
651 if forward_drift == 0 {
652 continue;
653 };
654 let peer_hostname = &inner.context.committee.authority(target_authority).hostname;
655 inner
656 .context
657 .metrics
658 .node_metrics
659 .block_timestamp_drift_ms
660 .with_label_values(&[peer_hostname, "commit_syncer"])
661 .inc_by(forward_drift);
662 }
663
664 let mut certified_commits = Vec::new();
666 for commit in &commits {
667 let blocks = commit
668 .blocks()
669 .iter()
670 .map(|block_ref| {
671 fetched_blocks
672 .remove(block_ref)
673 .expect("Block should exist")
674 })
675 .collect::<Vec<_>>();
676 certified_commits.push(CertifiedCommit::new_certified(commit.clone(), blocks));
677 }
678
679 for commit in &certified_commits {
681 for block in commit.blocks() {
682 if inner.context.protocol_config.mysticeti_fastpath() {
686 inner
687 .transaction_certifier
688 .add_voted_blocks(vec![(block.clone(), vec![])]);
689 }
690 }
691 }
692
693 Ok(CertifiedCommits::new(certified_commits, vote_blocks))
694 }
695
696 fn unhandled_commits_threshold(&self) -> CommitIndex {
697 self.inner.context.parameters.commit_sync_batch_size
698 * (self.inner.context.parameters.commit_sync_batches_ahead as u32)
699 }
700
701 #[cfg(test)]
702 fn pending_fetches(&self) -> BTreeSet<CommitRange> {
703 self.pending_fetches.clone()
704 }
705
706 #[cfg(test)]
707 fn fetched_ranges(&self) -> BTreeMap<CommitRange, CertifiedCommits> {
708 self.fetched_ranges.clone()
709 }
710
711 #[cfg(test)]
712 fn highest_scheduled_index(&self) -> Option<CommitIndex> {
713 self.highest_scheduled_index
714 }
715
716 #[cfg(test)]
717 fn highest_fetched_commit_index(&self) -> CommitIndex {
718 self.highest_fetched_commit_index
719 }
720
721 #[cfg(test)]
722 fn synced_commit_index(&self) -> CommitIndex {
723 self.synced_commit_index
724 }
725}
726
727struct Inner<C: NetworkClient> {
728 context: Arc<Context>,
729 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
730 commit_vote_monitor: Arc<CommitVoteMonitor>,
731 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
732 block_verifier: Arc<dyn BlockVerifier>,
733 transaction_certifier: TransactionCertifier,
734 network_client: Arc<C>,
735 dag_state: Arc<RwLock<DagState>>,
736}
737
738impl<C: NetworkClient> Inner<C> {
739 fn verify_commits(
742 &self,
743 peer: AuthorityIndex,
744 commit_range: CommitRange,
745 serialized_commits: Vec<Bytes>,
746 serialized_vote_blocks: Vec<Bytes>,
747 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
748 let mut commits = Vec::new();
750 for serialized in &serialized_commits {
751 let commit: Commit =
752 bcs::from_bytes(serialized).map_err(ConsensusError::MalformedCommit)?;
753 let digest = TrustedCommit::compute_digest(serialized);
754 if commits.is_empty() {
755 if commit.index() != commit_range.start() {
757 return Err(ConsensusError::UnexpectedStartCommit {
758 peer,
759 start: commit_range.start(),
760 commit: Box::new(commit),
761 });
762 }
763 } else {
764 let (last_commit_digest, last_commit): &(CommitDigest, Commit) =
766 commits.last().unwrap();
767 if commit.index() != last_commit.index() + 1
768 || &commit.previous_digest() != last_commit_digest
769 {
770 return Err(ConsensusError::UnexpectedCommitSequence {
771 peer,
772 prev_commit: Box::new(last_commit.clone()),
773 curr_commit: Box::new(commit),
774 });
775 }
776 }
777 if commit.index() > commit_range.end() {
779 break;
780 }
781 commits.push((digest, commit));
782 }
783 let Some((end_commit_digest, end_commit)) = commits.last() else {
784 return Err(ConsensusError::NoCommitReceived { peer });
785 };
786
787 let end_commit_ref = CommitRef::new(end_commit.index(), *end_commit_digest);
789 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
790 let mut vote_blocks = Vec::new();
791 for serialized in serialized_vote_blocks {
792 let block: SignedBlock =
793 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
794 let (block, reject_transaction_votes) =
797 self.block_verifier.verify_and_vote(block, serialized)?;
798 if self.context.protocol_config.mysticeti_fastpath() {
799 self.transaction_certifier
800 .add_voted_blocks(vec![(block.clone(), reject_transaction_votes)]);
801 }
802 for vote in block.commit_votes() {
803 if *vote == end_commit_ref {
804 stake_aggregator.add(block.author(), &self.context.committee);
805 }
806 }
807 vote_blocks.push(block);
808 }
809
810 if !stake_aggregator.reached_threshold(&self.context.committee) {
812 return Err(ConsensusError::NotEnoughCommitVotes {
813 stake: stake_aggregator.stake(),
814 peer,
815 commit: Box::new(end_commit.clone()),
816 });
817 }
818
819 let trusted_commits = commits
820 .into_iter()
821 .zip(serialized_commits)
822 .map(|((_d, c), s)| TrustedCommit::new_trusted(c, s))
823 .collect();
824 Ok((trusted_commits, vote_blocks))
825 }
826}
827
828#[cfg(test)]
829mod tests {
830 use std::{sync::Arc, time::Duration};
831
832 use bytes::Bytes;
833 use consensus_config::{AuthorityIndex, Parameters};
834 use consensus_types::block::{BlockRef, Round};
835 use mysten_metrics::monitored_mpsc;
836 use parking_lot::RwLock;
837
838 use crate::{
839 CommitConsumerMonitor, CommitDigest, CommitRef,
840 block::{TestBlock, VerifiedBlock},
841 block_verifier::NoopBlockVerifier,
842 commit::CommitRange,
843 commit_syncer::CommitSyncer,
844 commit_vote_monitor::CommitVoteMonitor,
845 context::Context,
846 core_thread::MockCoreThreadDispatcher,
847 dag_state::DagState,
848 error::ConsensusResult,
849 network::{BlockStream, NetworkClient},
850 storage::mem_store::MemStore,
851 transaction_certifier::TransactionCertifier,
852 };
853
854 #[derive(Default)]
855 struct FakeNetworkClient {}
856
857 #[async_trait::async_trait]
858 impl NetworkClient for FakeNetworkClient {
859 const SUPPORT_STREAMING: bool = true;
860
861 async fn send_block(
862 &self,
863 _peer: AuthorityIndex,
864 _serialized_block: &VerifiedBlock,
865 _timeout: Duration,
866 ) -> ConsensusResult<()> {
867 unimplemented!("Unimplemented")
868 }
869
870 async fn subscribe_blocks(
871 &self,
872 _peer: AuthorityIndex,
873 _last_received: Round,
874 _timeout: Duration,
875 ) -> ConsensusResult<BlockStream> {
876 unimplemented!("Unimplemented")
877 }
878
879 async fn fetch_blocks(
880 &self,
881 _peer: AuthorityIndex,
882 _block_refs: Vec<BlockRef>,
883 _highest_accepted_rounds: Vec<Round>,
884 _breadth_first: bool,
885 _timeout: Duration,
886 ) -> ConsensusResult<Vec<Bytes>> {
887 unimplemented!("Unimplemented")
888 }
889
890 async fn fetch_commits(
891 &self,
892 _peer: AuthorityIndex,
893 _commit_range: CommitRange,
894 _timeout: Duration,
895 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
896 unimplemented!("Unimplemented")
897 }
898
899 async fn fetch_latest_blocks(
900 &self,
901 _peer: AuthorityIndex,
902 _authorities: Vec<AuthorityIndex>,
903 _timeout: Duration,
904 ) -> ConsensusResult<Vec<Bytes>> {
905 unimplemented!("Unimplemented")
906 }
907
908 async fn get_latest_rounds(
909 &self,
910 _peer: AuthorityIndex,
911 _timeout: Duration,
912 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
913 unimplemented!("Unimplemented")
914 }
915 }
916
917 #[tokio::test(flavor = "current_thread", start_paused = true)]
918 async fn commit_syncer_start_and_pause_scheduling() {
919 let (context, _) = Context::new_for_test(4);
921 let context = Context {
923 own_index: AuthorityIndex::new_for_test(3),
924 parameters: Parameters {
925 commit_sync_batch_size: 5,
926 commit_sync_batches_ahead: 5,
927 commit_sync_parallel_fetches: 5,
928 max_blocks_per_fetch: 5,
929 ..context.parameters
930 },
931 ..context
932 };
933 let context = Arc::new(context);
934 let block_verifier = Arc::new(NoopBlockVerifier {});
935 let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
936 let network_client = Arc::new(FakeNetworkClient::default());
937 let store = Arc::new(MemStore::new());
938 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
939 let (blocks_sender, _blocks_receiver) =
940 monitored_mpsc::unbounded_channel("consensus_block_output");
941 let transaction_certifier =
942 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
943 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
944 let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0, 0));
945 let mut commit_syncer = CommitSyncer::new(
946 context,
947 core_thread_dispatcher,
948 commit_vote_monitor.clone(),
949 commit_consumer_monitor.clone(),
950 block_verifier,
951 transaction_certifier,
952 network_client,
953 dag_state,
954 );
955
956 assert!(commit_syncer.pending_fetches().is_empty());
958 assert!(commit_syncer.fetched_ranges().is_empty());
959 assert!(commit_syncer.highest_scheduled_index().is_none());
960 assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
961 assert_eq!(commit_syncer.synced_commit_index(), 0);
962
963 for i in 0..3 {
965 let test_block = TestBlock::new(15, i)
966 .set_commit_votes(vec![CommitRef::new(10, CommitDigest::MIN)])
967 .build();
968 let block = VerifiedBlock::new_for_test(test_block);
969 commit_vote_monitor.observe_block(&block);
970 }
971
972 commit_syncer.try_schedule_once();
974
975 assert_eq!(commit_syncer.pending_fetches().len(), 2);
977 assert!(commit_syncer.fetched_ranges().is_empty());
978 assert_eq!(commit_syncer.highest_scheduled_index(), Some(10));
979 assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
980 assert_eq!(commit_syncer.synced_commit_index(), 0);
981
982 for i in 0..3 {
984 let test_block = TestBlock::new(40, i)
985 .set_commit_votes(vec![CommitRef::new(35, CommitDigest::MIN)])
986 .build();
987 let block = VerifiedBlock::new_for_test(test_block);
988 commit_vote_monitor.observe_block(&block);
989 }
990
991 commit_syncer.try_schedule_once();
993
994 assert_eq!(commit_syncer.unhandled_commits_threshold(), 25);
996 assert_eq!(commit_syncer.highest_scheduled_index(), Some(25));
997 let pending_fetches = commit_syncer.pending_fetches();
998 assert_eq!(pending_fetches.len(), 5);
999
1000 commit_consumer_monitor.set_highest_handled_commit(25);
1002 commit_syncer.try_schedule_once();
1003
1004 assert_eq!(commit_syncer.highest_scheduled_index(), Some(35));
1006 let pending_fetches = commit_syncer.pending_fetches();
1007 assert_eq!(pending_fetches.len(), 7);
1008
1009 for (range, start) in pending_fetches.iter().zip((1..35).step_by(5)) {
1011 assert_eq!(range.start(), start);
1012 assert_eq!(range.end(), start + 4);
1013 }
1014 }
1015}