1use std::{
28 collections::{BTreeMap, BTreeSet},
29 sync::Arc,
30 time::Duration,
31};
32
33use bytes::Bytes;
34use consensus_types::block::BlockRef;
35use futures::{StreamExt as _, stream::FuturesOrdered};
36use itertools::Itertools as _;
37use mysten_common::ZipDebugEqIteratorExt;
38use mysten_metrics::spawn_logged_monitored_task;
39use parking_lot::RwLock;
40use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
41use tokio::{
42 runtime::Handle,
43 sync::oneshot,
44 task::{JoinHandle, JoinSet},
45 time::{MissedTickBehavior, sleep},
46};
47use tracing::{debug, info, warn};
48
49use crate::{
50 CommitConsumerMonitor, CommitIndex,
51 block::{BlockAPI, ExtendedBlock, SignedBlock, VerifiedBlock},
52 block_verifier::BlockVerifier,
53 commit::{
54 CertifiedCommit, CertifiedCommits, Commit, CommitAPI as _, CommitDigest, CommitRange,
55 CommitRef, TrustedCommit,
56 },
57 commit_vote_monitor::CommitVoteMonitor,
58 context::Context,
59 core_thread::CoreThreadDispatcher,
60 dag_state::DagState,
61 error::{ConsensusError, ConsensusResult},
62 network::{CommitSyncerClient, ObserverNetworkClient, PeerId, ValidatorNetworkClient},
63 peers_pool::PeersPool,
64 round_tracker::RoundTracker,
65 stake_aggregator::{QuorumThreshold, StakeAggregator},
66 transaction_vote_tracker::TransactionVoteTracker,
67};
68
69pub(crate) struct CommitSyncerHandle {
71 schedule_task: JoinHandle<()>,
72 tx_shutdown: oneshot::Sender<()>,
73}
74
75impl CommitSyncerHandle {
76 pub(crate) async fn stop(self) {
77 let _ = self.tx_shutdown.send(());
78 if let Err(e) = self.schedule_task.await
80 && e.is_panic()
81 {
82 std::panic::resume_unwind(e.into_panic());
83 }
84 }
85}
86
87pub(crate) struct CommitSyncer<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> {
88 inner: Arc<Inner<VC, OC>>,
92
93 inflight_fetches: JoinSet<(u32, CertifiedCommits)>,
97 pending_fetches: BTreeSet<CommitRange>,
99 fetched_ranges: BTreeMap<CommitRange, CertifiedCommits>,
101 highest_scheduled_index: Option<CommitIndex>,
104 highest_fetched_commit_index: CommitIndex,
107 synced_commit_index: CommitIndex,
110}
111
112impl<VC, OC> CommitSyncer<VC, OC>
113where
114 VC: ValidatorNetworkClient,
115 OC: ObserverNetworkClient,
116{
117 pub(crate) fn new(
118 context: Arc<Context>,
119 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
120 commit_vote_monitor: Arc<CommitVoteMonitor>,
121 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
122 block_verifier: Arc<dyn BlockVerifier>,
123 transaction_vote_tracker: TransactionVoteTracker,
124 round_tracker: Arc<RwLock<RoundTracker>>,
125 network_client: Arc<CommitSyncerClient<VC, OC>>,
126 dag_state: Arc<RwLock<DagState>>,
127 peers_pool: Arc<PeersPool>,
128 ) -> Self {
129 let inner = Arc::new(Inner {
130 context,
131 core_thread_dispatcher,
132 commit_vote_monitor,
133 commit_consumer_monitor,
134 block_verifier,
135 transaction_vote_tracker,
136 round_tracker,
137 network_client,
138 dag_state,
139 peers_pool,
140 });
141 let synced_commit_index = inner.dag_state.read().last_commit_index();
142 CommitSyncer {
143 inner,
144 inflight_fetches: JoinSet::new(),
145 pending_fetches: BTreeSet::new(),
146 fetched_ranges: BTreeMap::new(),
147 highest_scheduled_index: None,
148 highest_fetched_commit_index: 0,
149 synced_commit_index,
150 }
151 }
152
153 pub(crate) fn start(self) -> CommitSyncerHandle {
154 let (tx_shutdown, rx_shutdown) = oneshot::channel();
155 let schedule_task = spawn_logged_monitored_task!(self.schedule_loop(rx_shutdown,));
156 CommitSyncerHandle {
157 schedule_task,
158 tx_shutdown,
159 }
160 }
161
162 async fn schedule_loop(mut self, mut rx_shutdown: oneshot::Receiver<()>) {
163 let mut interval = tokio::time::interval(Duration::from_secs(2));
164 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
165
166 loop {
167 tokio::select! {
168 _ = interval.tick() => {
170 self.try_schedule_once();
171 }
172 Some(result) = self.inflight_fetches.join_next(), if !self.inflight_fetches.is_empty() => {
174 if let Err(e) = result {
175 if e.is_panic() {
176 std::panic::resume_unwind(e.into_panic());
177 }
178 warn!("Fetch cancelled. CommitSyncer shutting down: {}", e);
179 self.inflight_fetches.shutdown().await;
181 return;
182 }
183 let (target_end, commits) = result.unwrap();
184 self.handle_fetch_result(target_end, commits).await;
185 }
186 _ = &mut rx_shutdown => {
187 info!("CommitSyncer shutting down ...");
189 self.inflight_fetches.shutdown().await;
190 return;
191 }
192 }
193
194 self.try_start_fetches();
195 }
196 }
197
198 fn try_schedule_once(&mut self) {
199 let quorum_commit_index = self.inner.commit_vote_monitor.quorum_commit_index();
200 let local_commit_index = self.inner.dag_state.read().last_commit_index();
201 let metrics = &self.inner.context.metrics.node_metrics;
202 metrics
203 .commit_sync_quorum_index
204 .set(quorum_commit_index as i64);
205 metrics
206 .commit_sync_local_index
207 .set(local_commit_index as i64);
208 let highest_handled_index = self.inner.commit_consumer_monitor.highest_handled_commit();
209 let highest_scheduled_index = self.highest_scheduled_index.unwrap_or(0);
210 self.synced_commit_index = self.synced_commit_index.max(local_commit_index);
213 let unhandled_commits_threshold = self.unhandled_commits_threshold();
214 info!(
215 "Checking to schedule fetches: synced_commit_index={}, highest_handled_index={}, highest_scheduled_index={}, quorum_commit_index={}, unhandled_commits_threshold={}",
216 self.synced_commit_index,
217 highest_handled_index,
218 highest_scheduled_index,
219 quorum_commit_index,
220 unhandled_commits_threshold,
221 );
222
223 let fetch_after_index = self
225 .synced_commit_index
226 .max(self.highest_scheduled_index.unwrap_or(0));
227 for prev_end in (fetch_after_index..=quorum_commit_index)
229 .step_by(self.inner.context.parameters.commit_sync_batch_size as usize)
230 {
231 let range_start = prev_end + 1;
233 let range_end = prev_end + self.inner.context.parameters.commit_sync_batch_size;
234 if quorum_commit_index < range_end {
238 break;
239 }
240 if highest_handled_index + unhandled_commits_threshold < range_end {
242 warn!(
243 "Skip scheduling new commit fetches: consensus handler is lagging. highest_handled_index={}, highest_scheduled_index={}",
244 highest_handled_index, highest_scheduled_index
245 );
246 break;
247 }
248 self.pending_fetches
249 .insert((range_start..=range_end).into());
250 self.highest_scheduled_index = Some(range_end);
253 }
254 }
255
256 async fn handle_fetch_result(
257 &mut self,
258 target_end: CommitIndex,
259 certified_commits: CertifiedCommits,
260 ) {
261 assert!(!certified_commits.commits().is_empty());
262
263 let (total_blocks_fetched, total_blocks_size_bytes) = certified_commits
264 .commits()
265 .iter()
266 .fold((0, 0), |(blocks, bytes), c| {
267 (
268 blocks + c.blocks().len(),
269 bytes
270 + c.blocks()
271 .iter()
272 .map(|b| b.serialized().len())
273 .sum::<usize>() as u64,
274 )
275 });
276
277 let metrics = &self.inner.context.metrics.node_metrics;
278 metrics
279 .commit_sync_fetched_commits
280 .inc_by(certified_commits.commits().len() as u64);
281 metrics
282 .commit_sync_fetched_blocks
283 .inc_by(total_blocks_fetched as u64);
284 metrics
285 .commit_sync_total_fetched_blocks_size
286 .inc_by(total_blocks_size_bytes);
287
288 let (commit_start, commit_end) = (
289 certified_commits.commits().first().unwrap().index(),
290 certified_commits.commits().last().unwrap().index(),
291 );
292 self.highest_fetched_commit_index = self.highest_fetched_commit_index.max(commit_end);
293 metrics
294 .commit_sync_highest_fetched_index
295 .set(self.highest_fetched_commit_index as i64);
296
297 if commit_end < target_end {
299 self.pending_fetches
300 .insert((commit_end + 1..=target_end).into());
301 }
302 self.synced_commit_index = self
304 .synced_commit_index
305 .max(self.inner.dag_state.read().last_commit_index());
306 if self.synced_commit_index < commit_end {
308 self.fetched_ranges
309 .insert((commit_start..=commit_end).into(), certified_commits);
310 }
311 while let Some((fetched_commit_range, _commits)) = self.fetched_ranges.first_key_value() {
313 let (fetched_commit_range, commits) =
316 if fetched_commit_range.start() <= self.synced_commit_index + 1 {
317 self.fetched_ranges.pop_first().unwrap()
318 } else {
319 metrics.commit_sync_gap_on_processing.inc();
322 break;
323 };
324 if fetched_commit_range.end() <= self.synced_commit_index {
326 continue;
327 }
328
329 debug!(
330 "Fetched blocks for commit range {:?}: {}",
331 fetched_commit_range,
332 commits
333 .commits()
334 .iter()
335 .flat_map(|c| c.blocks())
336 .map(|b| b.reference().to_string())
337 .join(","),
338 );
339
340 match self
343 .inner
344 .core_thread_dispatcher
345 .add_certified_commits(commits)
346 .await
347 {
348 Ok(missing) => {
352 if !missing.is_empty() {
353 info!(
354 "Certification blocks have missing ancestors: {} for commit range {:?}",
355 missing.iter().map(|b| b.to_string()).join(","),
356 fetched_commit_range,
357 );
358 }
359 for block_ref in missing {
360 let hostname = &self
361 .inner
362 .context
363 .committee
364 .authority(block_ref.author)
365 .hostname;
366 metrics
367 .commit_sync_fetch_missing_blocks
368 .with_label_values(&[hostname])
369 .inc();
370 }
371 }
372 Err(e) => {
373 info!("Failed to add blocks, shutting down: {}", e);
374 return;
375 }
376 };
377
378 self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end());
380 }
381
382 metrics
383 .commit_sync_inflight_fetches
384 .set(self.inflight_fetches.len() as i64);
385 metrics
386 .commit_sync_pending_fetches
387 .set(self.pending_fetches.len() as i64);
388 metrics
389 .commit_sync_highest_synced_index
390 .set(self.synced_commit_index as i64);
391 }
392
393 fn try_start_fetches(&mut self) {
394 let known_peers_count = self.inner.peers_pool.get_known_peers().len();
399 let target_parallel_fetches = if self.inner.context.is_validator() {
400 self.inner
401 .context
402 .parameters
403 .commit_sync_parallel_fetches
404 .min(known_peers_count * 2 / 3)
405 .min(
406 self.inner
407 .context
408 .parameters
409 .commit_sync_batches_ahead
410 .saturating_sub(self.fetched_ranges.len()),
411 )
412 .max(1)
413 } else {
414 self.inner
417 .context
418 .parameters
419 .commit_sync_parallel_fetches
420 .min(
421 self.inner
422 .context
423 .parameters
424 .commit_sync_batches_ahead
425 .saturating_sub(self.fetched_ranges.len()),
426 )
427 .max(1)
428 };
429 loop {
431 if self.inflight_fetches.len() >= target_parallel_fetches {
432 break;
433 }
434 let Some(commit_range) = self.pending_fetches.pop_first() else {
435 break;
436 };
437 self.inflight_fetches
438 .spawn(Self::fetch_loop(self.inner.clone(), commit_range));
439 }
440
441 let metrics = &self.inner.context.metrics.node_metrics;
442 metrics
443 .commit_sync_inflight_fetches
444 .set(self.inflight_fetches.len() as i64);
445 metrics
446 .commit_sync_pending_fetches
447 .set(self.pending_fetches.len() as i64);
448 metrics
449 .commit_sync_highest_synced_index
450 .set(self.synced_commit_index as i64);
451 }
452
453 async fn fetch_loop(
457 inner: Arc<Inner<VC, OC>>,
458 commit_range: CommitRange,
459 ) -> (CommitIndex, CertifiedCommits) {
460 let base_timeout = inner.context.parameters.commit_sync_request_timeout;
461 const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
464 const MAX_NUM_TARGETS: usize = 24;
467 let mut timeout_multiplier = 0;
468 let _timer = inner
469 .context
470 .metrics
471 .node_metrics
472 .commit_sync_fetch_loop_latency
473 .start_timer();
474 info!("Starting to fetch commits in {commit_range:?} ...",);
475 loop {
476 let mut target_peers = inner.peers_pool.get_known_peers();
478 target_peers.shuffle(&mut ThreadRng::default());
479 target_peers.truncate(MAX_NUM_TARGETS);
480 timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
482 let request_timeout = base_timeout * timeout_multiplier;
483 let fetch_timeout = request_timeout * 4;
489 for peer in target_peers {
491 match tokio::time::timeout(
492 fetch_timeout,
493 Self::fetch_once(
494 inner.clone(),
495 peer.clone(),
496 commit_range.clone(),
497 request_timeout,
498 ),
499 )
500 .await
501 {
502 Ok(Ok(commits)) => {
503 info!("Finished fetching commits in {commit_range:?}",);
504 return (commit_range.end(), commits);
505 }
506 Ok(Err(e)) => {
507 warn!(
508 "Failed to fetch {commit_range:?} from {}: {}",
509 peer.hostname(&inner.context),
510 e
511 );
512 inner
513 .context
514 .metrics
515 .node_metrics
516 .commit_sync_fetch_once_errors
517 .with_label_values(&[peer.labelname(&inner.context).as_str(), e.name()])
518 .inc();
519 }
520 Err(_) => {
521 warn!(
522 "Timed out fetching {commit_range:?} from {}",
523 peer.hostname(&inner.context)
524 );
525 inner
526 .context
527 .metrics
528 .node_metrics
529 .commit_sync_fetch_once_errors
530 .with_label_values(&[
531 peer.labelname(&inner.context).as_str(),
532 "FetchTimeout",
533 ])
534 .inc();
535 }
536 }
537 }
538 sleep(base_timeout).await;
540 }
541 }
542
543 async fn fetch_once(
547 inner: Arc<Inner<VC, OC>>,
548 target_peer: PeerId,
549 commit_range: CommitRange,
550 timeout: Duration,
551 ) -> ConsensusResult<CertifiedCommits> {
552 let _timer = inner
553 .context
554 .metrics
555 .node_metrics
556 .commit_sync_fetch_once_latency
557 .start_timer();
558
559 let probe_timeout = inner.context.parameters.commit_sync_probe_timeout;
562 inner
563 .network_client
564 .probe_connectivity(target_peer.clone(), probe_timeout)
565 .await?;
566
567 let (serialized_commits, serialized_blocks) = inner
569 .network_client
570 .fetch_commits(target_peer.clone(), commit_range.clone(), timeout)
571 .await?;
572
573 let (commits, vote_blocks) = Handle::current()
577 .spawn_blocking({
578 let inner = inner.clone();
579 let peer = target_peer.clone();
580 move || {
581 inner.verify_commits(peer, commit_range, serialized_commits, serialized_blocks)
582 }
583 })
584 .await
585 .expect("Spawn blocking should not fail")?;
586
587 let mut block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
589 block_refs.sort();
590 let num_chunks = block_refs
591 .len()
592 .div_ceil(inner.context.parameters.max_blocks_per_fetch)
593 as u32;
594 let mut requests: FuturesOrdered<_> = block_refs
595 .chunks(inner.context.parameters.max_blocks_per_fetch)
596 .enumerate()
597 .map(|(i, request_block_refs)| {
598 let inner = inner.clone();
599 let peer = target_peer.clone();
600 async move {
601 sleep(timeout * i as u32 / num_chunks).await;
603 let serialized_blocks = inner
605 .network_client
606 .fetch_blocks(
607 peer.clone(),
608 request_block_refs.to_vec(),
609 vec![],
610 false,
611 timeout,
612 )
613 .await?;
614 if request_block_refs.len() != serialized_blocks.len() {
616 return Err(ConsensusError::UnexpectedNumberOfBlocksFetched {
617 peer,
618 requested: request_block_refs.len(),
619 received: serialized_blocks.len(),
620 });
621 }
622 let signed_blocks = serialized_blocks
624 .iter()
625 .map(|serialized| {
626 let block: SignedBlock = bcs::from_bytes(serialized)
627 .map_err(ConsensusError::MalformedBlock)?;
628 Ok(block)
629 })
630 .collect::<ConsensusResult<Vec<_>>>()?;
631 let mut blocks = Vec::new();
634 for ((requested_block_ref, signed_block), serialized) in request_block_refs
635 .iter()
636 .zip_debug_eq(signed_blocks.into_iter())
637 .zip_debug_eq(serialized_blocks.into_iter())
638 {
639 let signed_block_digest = VerifiedBlock::compute_digest(&serialized);
640 let received_block_ref = BlockRef::new(
641 signed_block.round(),
642 signed_block.author(),
643 signed_block_digest,
644 );
645 if *requested_block_ref != received_block_ref {
646 return Err(ConsensusError::UnexpectedBlockForCommit {
647 peer,
648 requested: *requested_block_ref,
649 received: received_block_ref,
650 });
651 }
652 blocks.push(VerifiedBlock::new_verified(signed_block, serialized));
653 }
654 Ok(blocks)
655 }
656 })
657 .collect();
658
659 let mut fetched_blocks = BTreeMap::new();
660 while let Some(result) = requests.next().await {
661 for block in result? {
662 fetched_blocks.insert(block.reference(), block);
663 }
664 }
665
666 for block in fetched_blocks.values().chain(vote_blocks.iter()) {
668 let now_ms = inner.context.clock.timestamp_utc_ms();
669 let forward_drift = block.timestamp_ms().saturating_sub(now_ms);
670 if forward_drift == 0 {
671 continue;
672 };
673 inner
675 .context
676 .metrics
677 .node_metrics
678 .block_timestamp_drift_ms
679 .with_label_values(&[
680 target_peer.labelname(&inner.context).as_str(),
681 "commit_syncer",
682 ])
683 .inc_by(forward_drift);
684 }
685
686 let mut certified_commits = Vec::new();
688 for commit in &commits {
689 let blocks = commit
690 .blocks()
691 .iter()
692 .map(|block_ref| {
693 fetched_blocks
694 .remove(block_ref)
695 .expect("Block should exist")
696 })
697 .collect::<Vec<_>>();
698 certified_commits.push(CertifiedCommit::new_certified(commit.clone(), blocks));
699 }
700
701 for commit in &certified_commits {
703 for block in commit.blocks() {
704 if inner.context.protocol_config.transaction_voting_enabled() {
708 inner
709 .transaction_vote_tracker
710 .add_voted_blocks(vec![(block.clone(), vec![])]);
711 }
712 }
713 }
714
715 for commit in &certified_commits {
717 for block in commit.blocks() {
718 inner.commit_vote_monitor.observe_block(block);
719 }
720 }
721 for block in &vote_blocks {
722 inner.commit_vote_monitor.observe_block(block);
723 }
724
725 {
728 let mut tracker = inner.round_tracker.write();
729 for commit in &certified_commits {
731 for block in commit.blocks() {
732 tracker.update_from_verified_block(&ExtendedBlock {
733 block: block.clone(),
734 excluded_ancestors: vec![],
735 });
736 }
737 }
738 for block in &vote_blocks {
740 tracker.update_from_verified_block(&ExtendedBlock {
741 block: block.clone(),
742 excluded_ancestors: vec![],
743 });
744 }
745 }
746
747 Ok(CertifiedCommits::new(certified_commits, vote_blocks))
748 }
749
750 fn unhandled_commits_threshold(&self) -> CommitIndex {
751 self.inner.context.parameters.commit_sync_batch_size
752 * (self.inner.context.parameters.commit_sync_batches_ahead as u32)
753 }
754
755 #[cfg(test)]
756 fn pending_fetches(&self) -> BTreeSet<CommitRange> {
757 self.pending_fetches.clone()
758 }
759
760 #[cfg(test)]
761 fn fetched_ranges(&self) -> BTreeMap<CommitRange, CertifiedCommits> {
762 self.fetched_ranges.clone()
763 }
764
765 #[cfg(test)]
766 fn highest_scheduled_index(&self) -> Option<CommitIndex> {
767 self.highest_scheduled_index
768 }
769
770 #[cfg(test)]
771 fn highest_fetched_commit_index(&self) -> CommitIndex {
772 self.highest_fetched_commit_index
773 }
774
775 #[cfg(test)]
776 fn synced_commit_index(&self) -> CommitIndex {
777 self.synced_commit_index
778 }
779}
780
781struct Inner<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> {
782 context: Arc<Context>,
783 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
784 commit_vote_monitor: Arc<CommitVoteMonitor>,
785 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
786 block_verifier: Arc<dyn BlockVerifier>,
787 transaction_vote_tracker: TransactionVoteTracker,
788 round_tracker: Arc<RwLock<RoundTracker>>,
789 network_client: Arc<CommitSyncerClient<VC, OC>>,
790 dag_state: Arc<RwLock<DagState>>,
791 peers_pool: Arc<PeersPool>,
792}
793
794impl<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> Inner<VC, OC> {
795 fn verify_commits(
798 &self,
799 peer: PeerId,
800 commit_range: CommitRange,
801 serialized_commits: Vec<Bytes>,
802 serialized_vote_blocks: Vec<Bytes>,
803 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
804 let mut commits = Vec::new();
806 for serialized in &serialized_commits {
807 let commit: Commit =
808 bcs::from_bytes(serialized).map_err(ConsensusError::MalformedCommit)?;
809 let digest = TrustedCommit::compute_digest(serialized);
810 if commits.is_empty() {
811 if commit.index() != commit_range.start() {
813 return Err(ConsensusError::UnexpectedStartCommit {
814 peer,
815 start: commit_range.start(),
816 commit: Box::new(commit),
817 });
818 }
819 } else {
820 let (last_commit_digest, last_commit): &(CommitDigest, Commit) =
822 commits.last().unwrap();
823 if commit.index() != last_commit.index() + 1
824 || &commit.previous_digest() != last_commit_digest
825 {
826 return Err(ConsensusError::UnexpectedCommitSequence {
827 peer,
828 prev_commit: Box::new(last_commit.clone()),
829 curr_commit: Box::new(commit),
830 });
831 }
832 }
833 if commit.index() > commit_range.end() {
835 break;
836 }
837 commits.push((digest, commit));
838 }
839 let Some((end_commit_digest, end_commit)) = commits.last() else {
840 return Err(ConsensusError::NoCommitReceived { peer });
841 };
842
843 let end_commit_ref = CommitRef::new(end_commit.index(), *end_commit_digest);
845 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
846 let mut vote_blocks = Vec::new();
847 for serialized in serialized_vote_blocks {
848 let block: SignedBlock =
849 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
850 let (block, reject_transaction_votes) =
853 self.block_verifier.verify_and_vote(block, serialized)?;
854 if self.context.protocol_config.transaction_voting_enabled() {
855 self.transaction_vote_tracker
856 .add_voted_blocks(vec![(block.clone(), reject_transaction_votes)]);
857 }
858 for vote in block.commit_votes() {
859 if *vote == end_commit_ref {
860 stake_aggregator.add(block.author(), &self.context.committee);
861 }
862 }
863 vote_blocks.push(block);
864 }
865
866 if !stake_aggregator.reached_threshold(&self.context.committee) {
868 return Err(ConsensusError::NotEnoughCommitVotes {
869 stake: stake_aggregator.stake(),
870 peer,
871 commit: Box::new(end_commit.clone()),
872 });
873 }
874
875 let trusted_commits = commits
876 .into_iter()
877 .zip_debug_eq(serialized_commits)
878 .map(|((_d, c), s)| TrustedCommit::new_trusted(c, s))
879 .collect();
880 Ok((trusted_commits, vote_blocks))
881 }
882}
883
884#[cfg(test)]
885mod tests {
886 use std::{sync::Arc, time::Duration};
887
888 use bytes::Bytes;
889 use consensus_config::{AuthorityIndex, NetworkKeyPair, Parameters};
890 use consensus_types::block::{BlockRef, Round};
891 use mysten_common::ZipDebugEqIteratorExt;
892 use parking_lot::RwLock;
893
894 use crate::{
895 CommitConsumerMonitor, CommitDigest, CommitRef,
896 block::{TestBlock, VerifiedBlock},
897 block_verifier::NoopBlockVerifier,
898 commit::CommitRange,
899 commit_syncer::CommitSyncer,
900 commit_vote_monitor::CommitVoteMonitor,
901 context::Context,
902 core_thread::MockCoreThreadDispatcher,
903 dag_state::DagState,
904 error::ConsensusResult,
905 network::{BlockStream, CommitSyncerClient, ObserverNetworkClient, ValidatorNetworkClient},
906 peers_pool::{PeerService, PeersPool},
907 round_tracker::RoundTracker,
908 storage::mem_store::MemStore,
909 transaction_vote_tracker::TransactionVoteTracker,
910 };
911
912 #[derive(Default)]
913 struct FakeNetworkClient {}
914
915 #[async_trait::async_trait]
916 impl ValidatorNetworkClient for FakeNetworkClient {
917 async fn subscribe_blocks(
918 &self,
919 _peer: AuthorityIndex,
920 _last_received: Round,
921 _timeout: Duration,
922 ) -> ConsensusResult<BlockStream> {
923 unimplemented!("Unimplemented")
924 }
925
926 async fn fetch_blocks(
927 &self,
928 _peer: AuthorityIndex,
929 _block_refs: Vec<BlockRef>,
930 _fetch_after_rounds: Vec<Round>,
931 _fetch_missing_ancestors: bool,
932 _timeout: Duration,
933 ) -> ConsensusResult<Vec<Bytes>> {
934 unimplemented!("Unimplemented")
935 }
936
937 async fn fetch_commits(
938 &self,
939 _peer: AuthorityIndex,
940 _commit_range: CommitRange,
941 _timeout: Duration,
942 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
943 unimplemented!("Unimplemented")
944 }
945
946 async fn fetch_latest_blocks(
947 &self,
948 _peer: AuthorityIndex,
949 _authorities: Vec<AuthorityIndex>,
950 _timeout: Duration,
951 ) -> ConsensusResult<Vec<Bytes>> {
952 unimplemented!("Unimplemented")
953 }
954
955 async fn get_latest_rounds(
956 &self,
957 _peer: AuthorityIndex,
958 _timeout: Duration,
959 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
960 unimplemented!("Unimplemented")
961 }
962
963 #[cfg(test)]
964 async fn send_block(
965 &self,
966 _peer: AuthorityIndex,
967 _block: &VerifiedBlock,
968 _timeout: Duration,
969 ) -> ConsensusResult<()> {
970 unimplemented!("Unimplemented")
971 }
972 }
973
974 #[async_trait::async_trait]
975 impl ObserverNetworkClient for FakeNetworkClient {
976 async fn stream_blocks(
977 &self,
978 _peer: crate::network::PeerId,
979 _highest_round_per_authority: Vec<u64>,
980 _timeout: Duration,
981 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
982 unimplemented!("Unimplemented")
983 }
984
985 async fn fetch_blocks(
986 &self,
987 _peer: crate::network::PeerId,
988 _block_refs: Vec<BlockRef>,
989 _highest_accepted_rounds: Vec<Round>,
990 _breadth_first: bool,
991 _timeout: Duration,
992 ) -> ConsensusResult<Vec<Bytes>> {
993 unimplemented!("Unimplemented")
994 }
995
996 async fn fetch_commits(
997 &self,
998 _peer: crate::network::PeerId,
999 _commit_range: CommitRange,
1000 _timeout: Duration,
1001 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1002 unimplemented!("Unimplemented")
1003 }
1004 }
1005
1006 #[tokio::test(flavor = "current_thread", start_paused = true)]
1007 async fn commit_syncer_observer_node_basic() {
1008 let (mut context, _) = Context::new_for_test(4);
1013 context.own_index = AuthorityIndex::MAX; context.parameters = Parameters {
1015 commit_sync_batch_size: 5,
1016 commit_sync_batches_ahead: 10,
1017 commit_sync_parallel_fetches: 5,
1018 max_blocks_per_fetch: 5,
1019 ..context.parameters
1020 };
1021 let context = Arc::new(context);
1022
1023 let block_verifier = Arc::new(NoopBlockVerifier {});
1025 let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1026 let mock_client = Arc::new(FakeNetworkClient::default());
1027 let network_client = Arc::new(CommitSyncerClient::new(
1028 context.clone(),
1029 Some(mock_client.clone()),
1030 Some(mock_client.clone()),
1031 ));
1032 let store = Arc::new(MemStore::new());
1033 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1034 let transaction_vote_tracker =
1035 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1036 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1037 let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0, 0));
1038 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1039
1040 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1042 peers_pool
1044 .register_validator(
1045 AuthorityIndex::new_for_test(0),
1046 vec![PeerService::Validator, PeerService::Observer],
1047 )
1048 .unwrap();
1049
1050 let mut commit_syncer = CommitSyncer::new(
1051 context.clone(),
1052 core_thread_dispatcher,
1053 commit_vote_monitor.clone(),
1054 commit_consumer_monitor.clone(),
1055 block_verifier,
1056 transaction_vote_tracker,
1057 round_tracker,
1058 network_client,
1059 dag_state,
1060 peers_pool.clone(),
1061 );
1062
1063 assert!(!context.is_validator(), "Should be an observer node");
1065
1066 for i in 0..3 {
1068 let test_block = TestBlock::new(10, i)
1069 .set_commit_votes(vec![CommitRef::new(5, CommitDigest::MIN)])
1070 .build();
1071 let block = VerifiedBlock::new_for_test(test_block);
1072 commit_vote_monitor.observe_block(&block);
1073 }
1074
1075 commit_syncer.try_schedule_once();
1077 assert_eq!(commit_syncer.pending_fetches().len(), 1);
1078 assert_eq!(commit_syncer.highest_scheduled_index(), Some(5));
1079
1080 commit_syncer.try_start_fetches();
1082 assert_eq!(
1083 commit_syncer.inflight_fetches.len(),
1084 1,
1085 "Should start fetch from single peer"
1086 );
1087 }
1088
1089 #[tokio::test(flavor = "current_thread", start_paused = true)]
1090 async fn commit_syncer_observer_with_multiple_peers() {
1091 let (mut context, _) = Context::new_for_test(4);
1095 context.own_index = AuthorityIndex::MAX; context.parameters = Parameters {
1097 commit_sync_batch_size: 5,
1098 commit_sync_batches_ahead: 10,
1099 commit_sync_parallel_fetches: 8, max_blocks_per_fetch: 5,
1101 ..context.parameters
1102 };
1103 let context = Arc::new(context);
1104
1105 let block_verifier = Arc::new(NoopBlockVerifier {});
1107 let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1108 let network_client = Arc::new(CommitSyncerClient::new(
1109 context.clone(),
1110 Some(Arc::new(FakeNetworkClient::default())),
1111 Some(Arc::new(FakeNetworkClient::default())),
1112 ));
1113 let store = Arc::new(MemStore::new());
1114 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1115 let transaction_vote_tracker =
1116 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1117 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1118 let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0, 0));
1119 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1120
1121 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1123 peers_pool
1125 .register_validator(
1126 AuthorityIndex::new_for_test(0),
1127 vec![PeerService::Validator, PeerService::Observer],
1128 )
1129 .unwrap();
1130 peers_pool
1131 .register_validator(
1132 AuthorityIndex::new_for_test(1),
1133 vec![PeerService::Validator, PeerService::Observer],
1134 )
1135 .unwrap();
1136 peers_pool
1137 .register_validator(
1138 AuthorityIndex::new_for_test(2),
1139 vec![PeerService::Validator, PeerService::Observer],
1140 )
1141 .unwrap();
1142
1143 let observer_peer = NetworkKeyPair::generate(&mut rand::thread_rng()).public();
1145 peers_pool.register_observer(observer_peer);
1146
1147 let mut commit_syncer = CommitSyncer::new(
1148 context.clone(),
1149 core_thread_dispatcher,
1150 commit_vote_monitor.clone(),
1151 commit_consumer_monitor.clone(),
1152 block_verifier,
1153 transaction_vote_tracker,
1154 round_tracker,
1155 network_client,
1156 dag_state,
1157 peers_pool.clone(),
1158 );
1159
1160 for i in 0..3 {
1162 let test_block = TestBlock::new(100, i)
1163 .set_commit_votes(vec![CommitRef::new(50, CommitDigest::MIN)])
1164 .build();
1165 let block = VerifiedBlock::new_for_test(test_block);
1166 commit_vote_monitor.observe_block(&block);
1167 }
1168
1169 commit_syncer.try_schedule_once();
1170
1171 let pending_fetches = commit_syncer.pending_fetches().len();
1173 assert!(pending_fetches > 0, "Should schedule fetches");
1174
1175 commit_syncer.try_start_fetches();
1177
1178 let inflight = commit_syncer.inflight_fetches.len();
1181 let known_peers = peers_pool.get_known_peers().len();
1182
1183 assert_eq!(known_peers, 4, "Should have 3 validators + 1 observer peer");
1184
1185 let max_parallel = context
1188 .parameters
1189 .commit_sync_parallel_fetches
1190 .min(pending_fetches)
1191 .min(context.parameters.commit_sync_batches_ahead);
1192
1193 assert!(
1194 inflight <= max_parallel,
1195 "Observer should respect configured parallelism limit: {} <= {}",
1196 inflight,
1197 max_parallel
1198 );
1199
1200 if pending_fetches >= 3 {
1204 assert!(
1205 max_parallel > 2,
1206 "Observer should be able to use more parallelism than validator's 2/3 limit"
1207 );
1208 }
1209 }
1210
1211 #[tokio::test(flavor = "current_thread", start_paused = true)]
1212 async fn commit_syncer_start_and_pause_scheduling() {
1213 let (context, _) = Context::new_for_test(4);
1215 let context = Context {
1217 own_index: AuthorityIndex::new_for_test(3),
1218 parameters: Parameters {
1219 commit_sync_batch_size: 5,
1220 commit_sync_batches_ahead: 5,
1221 commit_sync_parallel_fetches: 5,
1222 max_blocks_per_fetch: 5,
1223 ..context.parameters
1224 },
1225 ..context
1226 };
1227 let context = Arc::new(context);
1228 let block_verifier = Arc::new(NoopBlockVerifier {});
1229 let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1230 let mock_client = Arc::new(FakeNetworkClient::default());
1231 let network_client = Arc::new(CommitSyncerClient::new(
1232 context.clone(),
1233 Some(mock_client.clone()),
1234 Some(mock_client.clone()),
1235 ));
1236 let store = Arc::new(MemStore::new());
1237 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1238 let transaction_vote_tracker =
1239 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1240 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1241 let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0, 0));
1242 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1243 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1244 let mut commit_syncer = CommitSyncer::new(
1245 context,
1246 core_thread_dispatcher,
1247 commit_vote_monitor.clone(),
1248 commit_consumer_monitor.clone(),
1249 block_verifier,
1250 transaction_vote_tracker,
1251 round_tracker,
1252 network_client,
1253 dag_state,
1254 peers_pool,
1255 );
1256
1257 assert!(commit_syncer.pending_fetches().is_empty());
1259 assert!(commit_syncer.fetched_ranges().is_empty());
1260 assert!(commit_syncer.highest_scheduled_index().is_none());
1261 assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
1262 assert_eq!(commit_syncer.synced_commit_index(), 0);
1263
1264 for i in 0..3 {
1266 let test_block = TestBlock::new(15, i)
1267 .set_commit_votes(vec![CommitRef::new(10, CommitDigest::MIN)])
1268 .build();
1269 let block = VerifiedBlock::new_for_test(test_block);
1270 commit_vote_monitor.observe_block(&block);
1271 }
1272
1273 commit_syncer.try_schedule_once();
1275
1276 assert_eq!(commit_syncer.pending_fetches().len(), 2);
1278 assert!(commit_syncer.fetched_ranges().is_empty());
1279 assert_eq!(commit_syncer.highest_scheduled_index(), Some(10));
1280 assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
1281 assert_eq!(commit_syncer.synced_commit_index(), 0);
1282
1283 for i in 0..3 {
1285 let test_block = TestBlock::new(40, i)
1286 .set_commit_votes(vec![CommitRef::new(35, CommitDigest::MIN)])
1287 .build();
1288 let block = VerifiedBlock::new_for_test(test_block);
1289 commit_vote_monitor.observe_block(&block);
1290 }
1291
1292 commit_syncer.try_schedule_once();
1294
1295 assert_eq!(commit_syncer.unhandled_commits_threshold(), 25);
1297 assert_eq!(commit_syncer.highest_scheduled_index(), Some(25));
1298 let pending_fetches = commit_syncer.pending_fetches();
1299 assert_eq!(pending_fetches.len(), 5);
1300
1301 commit_consumer_monitor.set_highest_handled_commit(25);
1303 commit_syncer.try_schedule_once();
1304
1305 assert_eq!(commit_syncer.highest_scheduled_index(), Some(35));
1307 let pending_fetches = commit_syncer.pending_fetches();
1308 assert_eq!(pending_fetches.len(), 7);
1309
1310 for (range, start) in pending_fetches.iter().zip_debug_eq((1..35).step_by(5)) {
1312 assert_eq!(range.start(), start);
1313 assert_eq!(range.end(), start + 4);
1314 }
1315 }
1316}