1use std::{
28 collections::{BTreeMap, BTreeSet},
29 sync::Arc,
30 time::Duration,
31};
32
33use bytes::Bytes;
34use consensus_config::AuthorityIndex;
35use consensus_types::block::BlockRef;
36use futures::{StreamExt as _, stream::FuturesOrdered};
37use itertools::Itertools as _;
38use mysten_metrics::spawn_logged_monitored_task;
39use parking_lot::RwLock;
40use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
41use tokio::{
42 runtime::Handle,
43 sync::oneshot,
44 task::{JoinHandle, JoinSet},
45 time::{MissedTickBehavior, sleep},
46};
47use tracing::{debug, info, warn};
48
49use crate::{
50 CommitConsumerMonitor, CommitIndex,
51 block::{BlockAPI, ExtendedBlock, SignedBlock, VerifiedBlock},
52 block_verifier::BlockVerifier,
53 commit::{
54 CertifiedCommit, CertifiedCommits, Commit, CommitAPI as _, CommitDigest, CommitRange,
55 CommitRef, TrustedCommit,
56 },
57 commit_vote_monitor::CommitVoteMonitor,
58 context::Context,
59 core_thread::CoreThreadDispatcher,
60 dag_state::DagState,
61 error::{ConsensusError, ConsensusResult},
62 network::{CommitSyncerClient, ObserverNetworkClient, ValidatorNetworkClient},
63 round_tracker::RoundTracker,
64 stake_aggregator::{QuorumThreshold, StakeAggregator},
65 transaction_certifier::TransactionCertifier,
66};
67
68pub(crate) struct CommitSyncerHandle {
70 schedule_task: JoinHandle<()>,
71 tx_shutdown: oneshot::Sender<()>,
72}
73
74impl CommitSyncerHandle {
75 pub(crate) async fn stop(self) {
76 let _ = self.tx_shutdown.send(());
77 if let Err(e) = self.schedule_task.await
79 && e.is_panic()
80 {
81 std::panic::resume_unwind(e.into_panic());
82 }
83 }
84}
85
86pub(crate) struct CommitSyncer<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> {
87 inner: Arc<Inner<VC, OC>>,
91
92 inflight_fetches: JoinSet<(u32, CertifiedCommits)>,
96 pending_fetches: BTreeSet<CommitRange>,
98 fetched_ranges: BTreeMap<CommitRange, CertifiedCommits>,
100 highest_scheduled_index: Option<CommitIndex>,
103 highest_fetched_commit_index: CommitIndex,
106 synced_commit_index: CommitIndex,
109}
110
111impl<VC, OC> CommitSyncer<VC, OC>
112where
113 VC: ValidatorNetworkClient,
114 OC: ObserverNetworkClient,
115{
116 pub(crate) fn new(
117 context: Arc<Context>,
118 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
119 commit_vote_monitor: Arc<CommitVoteMonitor>,
120 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
121 block_verifier: Arc<dyn BlockVerifier>,
122 transaction_certifier: TransactionCertifier,
123 round_tracker: Arc<RwLock<RoundTracker>>,
124 network_client: Arc<CommitSyncerClient<VC, OC>>,
125 dag_state: Arc<RwLock<DagState>>,
126 ) -> Self {
127 let inner = Arc::new(Inner {
128 context,
129 core_thread_dispatcher,
130 commit_vote_monitor,
131 commit_consumer_monitor,
132 block_verifier,
133 transaction_certifier,
134 round_tracker,
135 network_client,
136 dag_state,
137 });
138 let synced_commit_index = inner.dag_state.read().last_commit_index();
139 CommitSyncer {
140 inner,
141 inflight_fetches: JoinSet::new(),
142 pending_fetches: BTreeSet::new(),
143 fetched_ranges: BTreeMap::new(),
144 highest_scheduled_index: None,
145 highest_fetched_commit_index: 0,
146 synced_commit_index,
147 }
148 }
149
150 pub(crate) fn start(self) -> CommitSyncerHandle {
151 let (tx_shutdown, rx_shutdown) = oneshot::channel();
152 let schedule_task = spawn_logged_monitored_task!(self.schedule_loop(rx_shutdown,));
153 CommitSyncerHandle {
154 schedule_task,
155 tx_shutdown,
156 }
157 }
158
159 async fn schedule_loop(mut self, mut rx_shutdown: oneshot::Receiver<()>) {
160 let mut interval = tokio::time::interval(Duration::from_secs(2));
161 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
162
163 loop {
164 tokio::select! {
165 _ = interval.tick() => {
167 self.try_schedule_once();
168 }
169 Some(result) = self.inflight_fetches.join_next(), if !self.inflight_fetches.is_empty() => {
171 if let Err(e) = result {
172 if e.is_panic() {
173 std::panic::resume_unwind(e.into_panic());
174 }
175 warn!("Fetch cancelled. CommitSyncer shutting down: {}", e);
176 self.inflight_fetches.shutdown().await;
178 return;
179 }
180 let (target_end, commits) = result.unwrap();
181 self.handle_fetch_result(target_end, commits).await;
182 }
183 _ = &mut rx_shutdown => {
184 info!("CommitSyncer shutting down ...");
186 self.inflight_fetches.shutdown().await;
187 return;
188 }
189 }
190
191 self.try_start_fetches();
192 }
193 }
194
195 fn try_schedule_once(&mut self) {
196 let quorum_commit_index = self.inner.commit_vote_monitor.quorum_commit_index();
197 let local_commit_index = self.inner.dag_state.read().last_commit_index();
198 let metrics = &self.inner.context.metrics.node_metrics;
199 metrics
200 .commit_sync_quorum_index
201 .set(quorum_commit_index as i64);
202 metrics
203 .commit_sync_local_index
204 .set(local_commit_index as i64);
205 let highest_handled_index = self.inner.commit_consumer_monitor.highest_handled_commit();
206 let highest_scheduled_index = self.highest_scheduled_index.unwrap_or(0);
207 self.synced_commit_index = self.synced_commit_index.max(local_commit_index);
210 let unhandled_commits_threshold = self.unhandled_commits_threshold();
211 info!(
212 "Checking to schedule fetches: synced_commit_index={}, highest_handled_index={}, highest_scheduled_index={}, quorum_commit_index={}, unhandled_commits_threshold={}",
213 self.synced_commit_index,
214 highest_handled_index,
215 highest_scheduled_index,
216 quorum_commit_index,
217 unhandled_commits_threshold,
218 );
219
220 let fetch_after_index = self
222 .synced_commit_index
223 .max(self.highest_scheduled_index.unwrap_or(0));
224 for prev_end in (fetch_after_index..=quorum_commit_index)
226 .step_by(self.inner.context.parameters.commit_sync_batch_size as usize)
227 {
228 let range_start = prev_end + 1;
230 let range_end = prev_end + self.inner.context.parameters.commit_sync_batch_size;
231 if quorum_commit_index < range_end {
235 break;
236 }
237 if highest_handled_index + unhandled_commits_threshold < range_end {
239 warn!(
240 "Skip scheduling new commit fetches: consensus handler is lagging. highest_handled_index={}, highest_scheduled_index={}",
241 highest_handled_index, highest_scheduled_index
242 );
243 break;
244 }
245 self.pending_fetches
246 .insert((range_start..=range_end).into());
247 self.highest_scheduled_index = Some(range_end);
250 }
251 }
252
253 async fn handle_fetch_result(
254 &mut self,
255 target_end: CommitIndex,
256 certified_commits: CertifiedCommits,
257 ) {
258 assert!(!certified_commits.commits().is_empty());
259
260 let (total_blocks_fetched, total_blocks_size_bytes) = certified_commits
261 .commits()
262 .iter()
263 .fold((0, 0), |(blocks, bytes), c| {
264 (
265 blocks + c.blocks().len(),
266 bytes
267 + c.blocks()
268 .iter()
269 .map(|b| b.serialized().len())
270 .sum::<usize>() as u64,
271 )
272 });
273
274 let metrics = &self.inner.context.metrics.node_metrics;
275 metrics
276 .commit_sync_fetched_commits
277 .inc_by(certified_commits.commits().len() as u64);
278 metrics
279 .commit_sync_fetched_blocks
280 .inc_by(total_blocks_fetched as u64);
281 metrics
282 .commit_sync_total_fetched_blocks_size
283 .inc_by(total_blocks_size_bytes);
284
285 let (commit_start, commit_end) = (
286 certified_commits.commits().first().unwrap().index(),
287 certified_commits.commits().last().unwrap().index(),
288 );
289 self.highest_fetched_commit_index = self.highest_fetched_commit_index.max(commit_end);
290 metrics
291 .commit_sync_highest_fetched_index
292 .set(self.highest_fetched_commit_index as i64);
293
294 if commit_end < target_end {
296 self.pending_fetches
297 .insert((commit_end + 1..=target_end).into());
298 }
299 self.synced_commit_index = self
301 .synced_commit_index
302 .max(self.inner.dag_state.read().last_commit_index());
303 if self.synced_commit_index < commit_end {
305 self.fetched_ranges
306 .insert((commit_start..=commit_end).into(), certified_commits);
307 }
308 while let Some((fetched_commit_range, _commits)) = self.fetched_ranges.first_key_value() {
310 let (fetched_commit_range, commits) =
313 if fetched_commit_range.start() <= self.synced_commit_index + 1 {
314 self.fetched_ranges.pop_first().unwrap()
315 } else {
316 metrics.commit_sync_gap_on_processing.inc();
319 break;
320 };
321 if fetched_commit_range.end() <= self.synced_commit_index {
323 continue;
324 }
325
326 debug!(
327 "Fetched blocks for commit range {:?}: {}",
328 fetched_commit_range,
329 commits
330 .commits()
331 .iter()
332 .flat_map(|c| c.blocks())
333 .map(|b| b.reference().to_string())
334 .join(","),
335 );
336
337 match self
340 .inner
341 .core_thread_dispatcher
342 .add_certified_commits(commits)
343 .await
344 {
345 Ok(missing) => {
349 if !missing.is_empty() {
350 info!(
351 "Certification blocks have missing ancestors: {} for commit range {:?}",
352 missing.iter().map(|b| b.to_string()).join(","),
353 fetched_commit_range,
354 );
355 }
356 for block_ref in missing {
357 let hostname = &self
358 .inner
359 .context
360 .committee
361 .authority(block_ref.author)
362 .hostname;
363 metrics
364 .commit_sync_fetch_missing_blocks
365 .with_label_values(&[hostname])
366 .inc();
367 }
368 }
369 Err(e) => {
370 info!("Failed to add blocks, shutting down: {}", e);
371 return;
372 }
373 };
374
375 self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end());
377 }
378
379 metrics
380 .commit_sync_inflight_fetches
381 .set(self.inflight_fetches.len() as i64);
382 metrics
383 .commit_sync_pending_fetches
384 .set(self.pending_fetches.len() as i64);
385 metrics
386 .commit_sync_highest_synced_index
387 .set(self.synced_commit_index as i64);
388 }
389
390 fn try_start_fetches(&mut self) {
391 let target_parallel_fetches = self
395 .inner
396 .context
397 .parameters
398 .commit_sync_parallel_fetches
399 .min(self.inner.context.committee.size() * 2 / 3)
400 .min(
401 self.inner
402 .context
403 .parameters
404 .commit_sync_batches_ahead
405 .saturating_sub(self.fetched_ranges.len()),
406 )
407 .max(1);
408 loop {
410 if self.inflight_fetches.len() >= target_parallel_fetches {
411 break;
412 }
413 let Some(commit_range) = self.pending_fetches.pop_first() else {
414 break;
415 };
416 self.inflight_fetches
417 .spawn(Self::fetch_loop(self.inner.clone(), commit_range));
418 }
419
420 let metrics = &self.inner.context.metrics.node_metrics;
421 metrics
422 .commit_sync_inflight_fetches
423 .set(self.inflight_fetches.len() as i64);
424 metrics
425 .commit_sync_pending_fetches
426 .set(self.pending_fetches.len() as i64);
427 metrics
428 .commit_sync_highest_synced_index
429 .set(self.synced_commit_index as i64);
430 }
431
432 async fn fetch_loop(
436 inner: Arc<Inner<VC, OC>>,
437 commit_range: CommitRange,
438 ) -> (CommitIndex, CertifiedCommits) {
439 const TIMEOUT: Duration = Duration::from_secs(10);
441 const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
444 const MAX_NUM_TARGETS: usize = 24;
447 let mut timeout_multiplier = 0;
448 let _timer = inner
449 .context
450 .metrics
451 .node_metrics
452 .commit_sync_fetch_loop_latency
453 .start_timer();
454 info!("Starting to fetch commits in {commit_range:?} ...",);
455 loop {
456 let mut target_authorities = inner
458 .context
459 .committee
460 .authorities()
461 .filter_map(|(i, _)| {
462 if i != inner.context.own_index {
463 Some(i)
464 } else {
465 None
466 }
467 })
468 .collect_vec();
469 target_authorities.shuffle(&mut ThreadRng::default());
470 target_authorities.truncate(MAX_NUM_TARGETS);
471 timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
473 let request_timeout = TIMEOUT * timeout_multiplier;
474 let fetch_timeout = request_timeout * 4;
480 for authority in target_authorities {
482 match tokio::time::timeout(
483 fetch_timeout,
484 Self::fetch_once(
485 inner.clone(),
486 authority,
487 commit_range.clone(),
488 request_timeout,
489 ),
490 )
491 .await
492 {
493 Ok(Ok(commits)) => {
494 info!("Finished fetching commits in {commit_range:?}",);
495 return (commit_range.end(), commits);
496 }
497 Ok(Err(e)) => {
498 let hostname = inner
499 .context
500 .committee
501 .authority(authority)
502 .hostname
503 .clone();
504 warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e);
505 inner
506 .context
507 .metrics
508 .node_metrics
509 .commit_sync_fetch_once_errors
510 .with_label_values(&[hostname.as_str(), e.name()])
511 .inc();
512 }
513 Err(_) => {
514 let hostname = inner
515 .context
516 .committee
517 .authority(authority)
518 .hostname
519 .clone();
520 warn!("Timed out fetching {commit_range:?} from {authority}",);
521 inner
522 .context
523 .metrics
524 .node_metrics
525 .commit_sync_fetch_once_errors
526 .with_label_values(&[hostname.as_str(), "FetchTimeout"])
527 .inc();
528 }
529 }
530 }
531 sleep(TIMEOUT).await;
533 }
534 }
535
536 async fn fetch_once(
540 inner: Arc<Inner<VC, OC>>,
541 target_authority: AuthorityIndex,
542 commit_range: CommitRange,
543 timeout: Duration,
544 ) -> ConsensusResult<CertifiedCommits> {
545 let _timer = inner
546 .context
547 .metrics
548 .node_metrics
549 .commit_sync_fetch_once_latency
550 .start_timer();
551
552 let (serialized_commits, serialized_blocks) = inner
554 .network_client
555 .fetch_commits(
556 crate::network::PeerId::Validator(target_authority),
557 commit_range.clone(),
558 timeout,
559 )
560 .await?;
561
562 let (commits, vote_blocks) = Handle::current()
566 .spawn_blocking({
567 let inner = inner.clone();
568 move || {
569 inner.verify_commits(
570 target_authority,
571 commit_range,
572 serialized_commits,
573 serialized_blocks,
574 )
575 }
576 })
577 .await
578 .expect("Spawn blocking should not fail")?;
579
580 let mut block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
582 block_refs.sort();
583 let num_chunks = block_refs
584 .len()
585 .div_ceil(inner.context.parameters.max_blocks_per_fetch)
586 as u32;
587 let mut requests: FuturesOrdered<_> = block_refs
588 .chunks(inner.context.parameters.max_blocks_per_fetch)
589 .enumerate()
590 .map(|(i, request_block_refs)| {
591 let inner = inner.clone();
592 async move {
593 sleep(timeout * i as u32 / num_chunks).await;
595 let serialized_blocks = inner
597 .network_client
598 .fetch_blocks(
599 crate::network::PeerId::Validator(target_authority),
600 request_block_refs.to_vec(),
601 vec![],
602 false,
603 timeout,
604 )
605 .await?;
606 if request_block_refs.len() != serialized_blocks.len() {
608 return Err(ConsensusError::UnexpectedNumberOfBlocksFetched {
609 authority: target_authority,
610 requested: request_block_refs.len(),
611 received: serialized_blocks.len(),
612 });
613 }
614 let signed_blocks = serialized_blocks
616 .iter()
617 .map(|serialized| {
618 let block: SignedBlock = bcs::from_bytes(serialized)
619 .map_err(ConsensusError::MalformedBlock)?;
620 Ok(block)
621 })
622 .collect::<ConsensusResult<Vec<_>>>()?;
623 let mut blocks = Vec::new();
626 for ((requested_block_ref, signed_block), serialized) in request_block_refs
627 .iter()
628 .zip(signed_blocks.into_iter())
629 .zip(serialized_blocks.into_iter())
630 {
631 let signed_block_digest = VerifiedBlock::compute_digest(&serialized);
632 let received_block_ref = BlockRef::new(
633 signed_block.round(),
634 signed_block.author(),
635 signed_block_digest,
636 );
637 if *requested_block_ref != received_block_ref {
638 return Err(ConsensusError::UnexpectedBlockForCommit {
639 peer: target_authority,
640 requested: *requested_block_ref,
641 received: received_block_ref,
642 });
643 }
644 blocks.push(VerifiedBlock::new_verified(signed_block, serialized));
645 }
646 Ok(blocks)
647 }
648 })
649 .collect();
650
651 let mut fetched_blocks = BTreeMap::new();
652 while let Some(result) = requests.next().await {
653 for block in result? {
654 fetched_blocks.insert(block.reference(), block);
655 }
656 }
657
658 for block in fetched_blocks.values().chain(vote_blocks.iter()) {
660 let now_ms = inner.context.clock.timestamp_utc_ms();
661 let forward_drift = block.timestamp_ms().saturating_sub(now_ms);
662 if forward_drift == 0 {
663 continue;
664 };
665 let peer_hostname = &inner.context.committee.authority(target_authority).hostname;
666 inner
667 .context
668 .metrics
669 .node_metrics
670 .block_timestamp_drift_ms
671 .with_label_values(&[peer_hostname.as_str(), "commit_syncer"])
672 .inc_by(forward_drift);
673 }
674
675 let mut certified_commits = Vec::new();
677 for commit in &commits {
678 let blocks = commit
679 .blocks()
680 .iter()
681 .map(|block_ref| {
682 fetched_blocks
683 .remove(block_ref)
684 .expect("Block should exist")
685 })
686 .collect::<Vec<_>>();
687 certified_commits.push(CertifiedCommit::new_certified(commit.clone(), blocks));
688 }
689
690 for commit in &certified_commits {
692 for block in commit.blocks() {
693 if inner.context.protocol_config.transaction_voting_enabled() {
697 inner
698 .transaction_certifier
699 .add_voted_blocks(vec![(block.clone(), vec![])]);
700 }
701 }
702 }
703
704 for commit in &certified_commits {
706 for block in commit.blocks() {
707 inner.commit_vote_monitor.observe_block(block);
708 }
709 }
710 for block in &vote_blocks {
711 inner.commit_vote_monitor.observe_block(block);
712 }
713
714 {
717 let mut tracker = inner.round_tracker.write();
718 for commit in &certified_commits {
720 for block in commit.blocks() {
721 tracker.update_from_verified_block(&ExtendedBlock {
722 block: block.clone(),
723 excluded_ancestors: vec![],
724 });
725 }
726 }
727 for block in &vote_blocks {
729 tracker.update_from_verified_block(&ExtendedBlock {
730 block: block.clone(),
731 excluded_ancestors: vec![],
732 });
733 }
734 }
735
736 Ok(CertifiedCommits::new(certified_commits, vote_blocks))
737 }
738
739 fn unhandled_commits_threshold(&self) -> CommitIndex {
740 self.inner.context.parameters.commit_sync_batch_size
741 * (self.inner.context.parameters.commit_sync_batches_ahead as u32)
742 }
743
744 #[cfg(test)]
745 fn pending_fetches(&self) -> BTreeSet<CommitRange> {
746 self.pending_fetches.clone()
747 }
748
749 #[cfg(test)]
750 fn fetched_ranges(&self) -> BTreeMap<CommitRange, CertifiedCommits> {
751 self.fetched_ranges.clone()
752 }
753
754 #[cfg(test)]
755 fn highest_scheduled_index(&self) -> Option<CommitIndex> {
756 self.highest_scheduled_index
757 }
758
759 #[cfg(test)]
760 fn highest_fetched_commit_index(&self) -> CommitIndex {
761 self.highest_fetched_commit_index
762 }
763
764 #[cfg(test)]
765 fn synced_commit_index(&self) -> CommitIndex {
766 self.synced_commit_index
767 }
768}
769
770struct Inner<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> {
771 context: Arc<Context>,
772 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
773 commit_vote_monitor: Arc<CommitVoteMonitor>,
774 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
775 block_verifier: Arc<dyn BlockVerifier>,
776 transaction_certifier: TransactionCertifier,
777 round_tracker: Arc<RwLock<RoundTracker>>,
778 network_client: Arc<CommitSyncerClient<VC, OC>>,
779 dag_state: Arc<RwLock<DagState>>,
780}
781
782impl<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> Inner<VC, OC> {
783 fn verify_commits(
786 &self,
787 peer: AuthorityIndex,
788 commit_range: CommitRange,
789 serialized_commits: Vec<Bytes>,
790 serialized_vote_blocks: Vec<Bytes>,
791 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
792 let mut commits = Vec::new();
794 for serialized in &serialized_commits {
795 let commit: Commit =
796 bcs::from_bytes(serialized).map_err(ConsensusError::MalformedCommit)?;
797 let digest = TrustedCommit::compute_digest(serialized);
798 if commits.is_empty() {
799 if commit.index() != commit_range.start() {
801 return Err(ConsensusError::UnexpectedStartCommit {
802 peer,
803 start: commit_range.start(),
804 commit: Box::new(commit),
805 });
806 }
807 } else {
808 let (last_commit_digest, last_commit): &(CommitDigest, Commit) =
810 commits.last().unwrap();
811 if commit.index() != last_commit.index() + 1
812 || &commit.previous_digest() != last_commit_digest
813 {
814 return Err(ConsensusError::UnexpectedCommitSequence {
815 peer,
816 prev_commit: Box::new(last_commit.clone()),
817 curr_commit: Box::new(commit),
818 });
819 }
820 }
821 if commit.index() > commit_range.end() {
823 break;
824 }
825 commits.push((digest, commit));
826 }
827 let Some((end_commit_digest, end_commit)) = commits.last() else {
828 return Err(ConsensusError::NoCommitReceived { peer });
829 };
830
831 let end_commit_ref = CommitRef::new(end_commit.index(), *end_commit_digest);
833 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
834 let mut vote_blocks = Vec::new();
835 for serialized in serialized_vote_blocks {
836 let block: SignedBlock =
837 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
838 let (block, reject_transaction_votes) =
841 self.block_verifier.verify_and_vote(block, serialized)?;
842 if self.context.protocol_config.transaction_voting_enabled() {
843 self.transaction_certifier
844 .add_voted_blocks(vec![(block.clone(), reject_transaction_votes)]);
845 }
846 for vote in block.commit_votes() {
847 if *vote == end_commit_ref {
848 stake_aggregator.add(block.author(), &self.context.committee);
849 }
850 }
851 vote_blocks.push(block);
852 }
853
854 if !stake_aggregator.reached_threshold(&self.context.committee) {
856 return Err(ConsensusError::NotEnoughCommitVotes {
857 stake: stake_aggregator.stake(),
858 peer,
859 commit: Box::new(end_commit.clone()),
860 });
861 }
862
863 let trusted_commits = commits
864 .into_iter()
865 .zip(serialized_commits)
866 .map(|((_d, c), s)| TrustedCommit::new_trusted(c, s))
867 .collect();
868 Ok((trusted_commits, vote_blocks))
869 }
870}
871
872#[cfg(test)]
873mod tests {
874 use std::{sync::Arc, time::Duration};
875
876 use bytes::Bytes;
877 use consensus_config::{AuthorityIndex, Parameters};
878 use consensus_types::block::{BlockRef, Round};
879 use mysten_metrics::monitored_mpsc;
880 use parking_lot::RwLock;
881
882 use crate::{
883 CommitConsumerMonitor, CommitDigest, CommitRef,
884 block::{TestBlock, VerifiedBlock},
885 block_verifier::NoopBlockVerifier,
886 commit::CommitRange,
887 commit_syncer::CommitSyncer,
888 commit_vote_monitor::CommitVoteMonitor,
889 context::Context,
890 core_thread::MockCoreThreadDispatcher,
891 dag_state::DagState,
892 error::ConsensusResult,
893 network::{BlockStream, CommitSyncerClient, ObserverNetworkClient, ValidatorNetworkClient},
894 round_tracker::RoundTracker,
895 storage::mem_store::MemStore,
896 transaction_certifier::TransactionCertifier,
897 };
898
899 #[derive(Default)]
900 struct FakeNetworkClient {}
901
902 #[async_trait::async_trait]
903 impl ValidatorNetworkClient for FakeNetworkClient {
904 async fn subscribe_blocks(
905 &self,
906 _peer: AuthorityIndex,
907 _last_received: Round,
908 _timeout: Duration,
909 ) -> ConsensusResult<BlockStream> {
910 unimplemented!("Unimplemented")
911 }
912
913 async fn fetch_blocks(
914 &self,
915 _peer: AuthorityIndex,
916 _block_refs: Vec<BlockRef>,
917 _highest_accepted_rounds: Vec<Round>,
918 _breadth_first: bool,
919 _timeout: Duration,
920 ) -> ConsensusResult<Vec<Bytes>> {
921 unimplemented!("Unimplemented")
922 }
923
924 async fn fetch_commits(
925 &self,
926 _peer: AuthorityIndex,
927 _commit_range: CommitRange,
928 _timeout: Duration,
929 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
930 unimplemented!("Unimplemented")
931 }
932
933 async fn fetch_latest_blocks(
934 &self,
935 _peer: AuthorityIndex,
936 _authorities: Vec<AuthorityIndex>,
937 _timeout: Duration,
938 ) -> ConsensusResult<Vec<Bytes>> {
939 unimplemented!("Unimplemented")
940 }
941
942 async fn get_latest_rounds(
943 &self,
944 _peer: AuthorityIndex,
945 _timeout: Duration,
946 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
947 unimplemented!("Unimplemented")
948 }
949
950 #[cfg(test)]
951 async fn send_block(
952 &self,
953 _peer: AuthorityIndex,
954 _block: &VerifiedBlock,
955 _timeout: Duration,
956 ) -> ConsensusResult<()> {
957 unimplemented!("Unimplemented")
958 }
959 }
960
961 #[async_trait::async_trait]
962 impl ObserverNetworkClient for FakeNetworkClient {
963 async fn stream_blocks(
964 &self,
965 _peer: crate::network::PeerId,
966 _request_stream: crate::network::BlockRequestStream,
967 _timeout: Duration,
968 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
969 unimplemented!("Unimplemented")
970 }
971
972 async fn fetch_blocks(
973 &self,
974 _peer: crate::network::PeerId,
975 _block_refs: Vec<BlockRef>,
976 _timeout: Duration,
977 ) -> ConsensusResult<Vec<Bytes>> {
978 unimplemented!("Unimplemented")
979 }
980
981 async fn fetch_commits(
982 &self,
983 _peer: crate::network::PeerId,
984 _commit_range: CommitRange,
985 _timeout: Duration,
986 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
987 unimplemented!("Unimplemented")
988 }
989 }
990
991 #[tokio::test(flavor = "current_thread", start_paused = true)]
992 async fn commit_syncer_start_and_pause_scheduling() {
993 let (context, _) = Context::new_for_test(4);
995 let context = Context {
997 own_index: AuthorityIndex::new_for_test(3),
998 parameters: Parameters {
999 commit_sync_batch_size: 5,
1000 commit_sync_batches_ahead: 5,
1001 commit_sync_parallel_fetches: 5,
1002 max_blocks_per_fetch: 5,
1003 ..context.parameters
1004 },
1005 ..context
1006 };
1007 let context = Arc::new(context);
1008 let block_verifier = Arc::new(NoopBlockVerifier {});
1009 let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1010 let mock_client = Arc::new(FakeNetworkClient::default());
1011 let network_client = Arc::new(CommitSyncerClient::new(
1012 context.clone(),
1013 Some(mock_client.clone()),
1014 Some(mock_client.clone()),
1015 ));
1016 let store = Arc::new(MemStore::new());
1017 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1018 let (blocks_sender, _blocks_receiver) =
1019 monitored_mpsc::unbounded_channel("consensus_block_output");
1020 let transaction_certifier = TransactionCertifier::new(
1021 context.clone(),
1022 block_verifier.clone(),
1023 dag_state.clone(),
1024 blocks_sender,
1025 );
1026 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1027 let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0, 0));
1028 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1029 let mut commit_syncer = CommitSyncer::new(
1030 context,
1031 core_thread_dispatcher,
1032 commit_vote_monitor.clone(),
1033 commit_consumer_monitor.clone(),
1034 block_verifier,
1035 transaction_certifier,
1036 round_tracker,
1037 network_client,
1038 dag_state,
1039 );
1040
1041 assert!(commit_syncer.pending_fetches().is_empty());
1043 assert!(commit_syncer.fetched_ranges().is_empty());
1044 assert!(commit_syncer.highest_scheduled_index().is_none());
1045 assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
1046 assert_eq!(commit_syncer.synced_commit_index(), 0);
1047
1048 for i in 0..3 {
1050 let test_block = TestBlock::new(15, i)
1051 .set_commit_votes(vec![CommitRef::new(10, CommitDigest::MIN)])
1052 .build();
1053 let block = VerifiedBlock::new_for_test(test_block);
1054 commit_vote_monitor.observe_block(&block);
1055 }
1056
1057 commit_syncer.try_schedule_once();
1059
1060 assert_eq!(commit_syncer.pending_fetches().len(), 2);
1062 assert!(commit_syncer.fetched_ranges().is_empty());
1063 assert_eq!(commit_syncer.highest_scheduled_index(), Some(10));
1064 assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
1065 assert_eq!(commit_syncer.synced_commit_index(), 0);
1066
1067 for i in 0..3 {
1069 let test_block = TestBlock::new(40, i)
1070 .set_commit_votes(vec![CommitRef::new(35, CommitDigest::MIN)])
1071 .build();
1072 let block = VerifiedBlock::new_for_test(test_block);
1073 commit_vote_monitor.observe_block(&block);
1074 }
1075
1076 commit_syncer.try_schedule_once();
1078
1079 assert_eq!(commit_syncer.unhandled_commits_threshold(), 25);
1081 assert_eq!(commit_syncer.highest_scheduled_index(), Some(25));
1082 let pending_fetches = commit_syncer.pending_fetches();
1083 assert_eq!(pending_fetches.len(), 5);
1084
1085 commit_consumer_monitor.set_highest_handled_commit(25);
1087 commit_syncer.try_schedule_once();
1088
1089 assert_eq!(commit_syncer.highest_scheduled_index(), Some(35));
1091 let pending_fetches = commit_syncer.pending_fetches();
1092 assert_eq!(pending_fetches.len(), 7);
1093
1094 for (range, start) in pending_fetches.iter().zip((1..35).step_by(5)) {
1096 assert_eq!(range.start(), start);
1097 assert_eq!(range.end(), start + 4);
1098 }
1099 }
1100}