1use std::{sync::Arc, time::Duration};
5
6use parking_lot::RwLock;
7use tokio::time::Instant;
8use tracing::info;
9
10use crate::{
11 CommitConsumerArgs, CommittedSubDag,
12 block::{BlockAPI, VerifiedBlock},
13 commit::{CommitAPI, load_committed_subdag_from_store},
14 commit_finalizer::{CommitFinalizer, CommitFinalizerHandle},
15 context::Context,
16 dag_state::DagState,
17 error::ConsensusResult,
18 leader_schedule::LeaderSchedule,
19 linearizer::Linearizer,
20 storage::Store,
21 transaction_certifier::TransactionCertifier,
22};
23
24pub(crate) struct CommitObserver {
37 context: Arc<Context>,
38 dag_state: Arc<RwLock<DagState>>,
39 store: Arc<dyn Store>,
41 transaction_certifier: TransactionCertifier,
42 leader_schedule: Arc<LeaderSchedule>,
43 commit_interpreter: Linearizer,
45 commit_finalizer_handle: CommitFinalizerHandle,
47}
48
49impl CommitObserver {
50 pub(crate) async fn new(
51 context: Arc<Context>,
52 commit_consumer: CommitConsumerArgs,
53 dag_state: Arc<RwLock<DagState>>,
54 transaction_certifier: TransactionCertifier,
55 leader_schedule: Arc<LeaderSchedule>,
56 ) -> Self {
57 let store = dag_state.read().store();
58 let commit_interpreter = Linearizer::new(context.clone(), dag_state.clone());
59 let commit_finalizer_handle = CommitFinalizer::start(
60 context.clone(),
61 dag_state.clone(),
62 transaction_certifier.clone(),
63 commit_consumer.commit_sender.clone(),
64 );
65
66 let mut observer = Self {
67 context,
68 dag_state,
69 store,
70 transaction_certifier,
71 leader_schedule,
72 commit_interpreter,
73 commit_finalizer_handle,
74 };
75 observer.recover_and_send_commits(&commit_consumer).await;
76
77 tokio::runtime::Handle::current()
81 .spawn_blocking({
82 let transaction_certifier = observer.transaction_certifier.clone();
83 let gc_round = observer.dag_state.read().gc_round();
84 move || {
85 transaction_certifier.recover_blocks_after_round(gc_round);
86 }
87 })
88 .await
89 .expect("Spawn blocking should not fail");
90
91 observer
92 }
93
94 pub(crate) fn handle_commit(
99 &mut self,
100 committed_leaders: Vec<VerifiedBlock>,
101 local: bool,
102 ) -> ConsensusResult<Vec<CommittedSubDag>> {
103 let _s = self
104 .context
105 .metrics
106 .node_metrics
107 .scope_processing_time
108 .with_label_values(&["CommitObserver::handle_commit"])
109 .start_timer();
110
111 let mut committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
112 self.report_metrics(&committed_sub_dags);
113
114 for subdag in committed_sub_dags.iter_mut() {
116 subdag.decided_with_local_blocks = local;
117 }
118
119 let schedule_updated = self
121 .leader_schedule
122 .leader_schedule_updated(&self.dag_state);
123 if schedule_updated {
124 let reputation_scores_desc = self
125 .leader_schedule
126 .leader_swap_table
127 .read()
128 .reputation_scores_desc
129 .clone();
130 committed_sub_dags[0].reputation_scores_desc = reputation_scores_desc;
131 }
132
133 for commit in committed_sub_dags.iter() {
134 tracing::debug!(
135 "Sending commit {} leader {} to finalization and execution.",
136 commit.commit_ref,
137 commit.leader
138 );
139 tracing::trace!("Committed subdag: {:#?}", commit);
140 self.commit_finalizer_handle.send(commit.clone())?;
142 }
143
144 self.dag_state
145 .write()
146 .add_scoring_subdags(committed_sub_dags.clone());
147
148 Ok(committed_sub_dags)
149 }
150
151 async fn recover_and_send_commits(&mut self, commit_consumer: &CommitConsumerArgs) {
152 let now = Instant::now();
153
154 let replay_after_commit_index = commit_consumer.replay_after_commit_index;
155
156 let last_commit = self
157 .store
158 .read_last_commit()
159 .expect("Reading the last commit should not fail");
160 let Some(last_commit) = &last_commit else {
161 assert_eq!(
162 replay_after_commit_index, 0,
163 "Commit replay should start at the beginning if there is no commit history"
164 );
165 info!("Nothing to recover for commit observer - starting new epoch");
166 return;
167 };
168
169 let last_commit_index = last_commit.index();
170 if last_commit_index == replay_after_commit_index {
171 info!(
172 "Nothing to recover for commit observer - replay is requested immediately after last commit index {last_commit_index}"
173 );
174 return;
175 }
176 assert!(last_commit_index > replay_after_commit_index);
177
178 info!(
179 "Recovering commit observer in the range [{}..={last_commit_index}]",
180 replay_after_commit_index + 1,
181 );
182
183 const COMMIT_RECOVERY_BATCH_SIZE: u32 = if cfg!(test) { 3 } else { 250 };
186
187 let mut last_sent_commit_index = replay_after_commit_index;
188
189 self.dag_state.read().ensure_commits_to_write_is_empty();
191
192 let mut seen_unfinalized_commit = false;
193 for start_index in (replay_after_commit_index + 1..=last_commit_index)
194 .step_by(COMMIT_RECOVERY_BATCH_SIZE as usize)
195 {
196 let end_index = start_index
197 .saturating_add(COMMIT_RECOVERY_BATCH_SIZE - 1)
198 .min(last_commit_index);
199
200 let unsent_commits = self
201 .store
202 .scan_commits((start_index..=end_index).into())
203 .expect("Scanning commits should not fail");
204 assert_eq!(
205 unsent_commits.len() as u32,
206 end_index.checked_sub(start_index).unwrap() + 1,
207 "Gap in scanned commits: start index: {start_index}, end index: {end_index}, commits: {:?}",
208 unsent_commits,
209 );
210
211 self.dag_state
214 .write()
215 .recover_commits_to_write(unsent_commits.clone());
216
217 info!(
218 "Recovering {} unsent commits in range [{start_index}..={end_index}]",
219 unsent_commits.len()
220 );
221
222 for commit in unsent_commits.into_iter() {
225 last_sent_commit_index += 1;
227 assert_eq!(commit.index(), last_sent_commit_index);
228
229 let reputation_scores = if commit.index() == last_commit_index {
233 self.leader_schedule
234 .leader_swap_table
235 .read()
236 .reputation_scores_desc
237 .clone()
238 } else {
239 vec![]
240 };
241
242 let committed_sub_dag = load_committed_subdag_from_store(
243 &self.context,
244 self.store.as_ref(),
245 commit,
246 reputation_scores,
247 );
248
249 if !committed_sub_dag.recovered_rejected_transactions && !seen_unfinalized_commit {
250 info!(
251 "Starting to recover unfinalized commit from {}",
252 committed_sub_dag.commit_ref
253 );
254 seen_unfinalized_commit = true;
257 }
258
259 if seen_unfinalized_commit {
260 assert!(!committed_sub_dag.recovered_rejected_transactions);
262 assert!(!committed_sub_dag.decided_with_local_blocks);
265 self.transaction_certifier
268 .recover_and_vote_on_blocks(committed_sub_dag.blocks.clone());
269 }
270
271 self.commit_finalizer_handle
272 .send(committed_sub_dag)
273 .unwrap();
274
275 self.context
276 .metrics
277 .node_metrics
278 .commit_observer_last_recovered_commit_index
279 .set(last_sent_commit_index as i64);
280
281 tokio::task::yield_now().await;
282 }
283 }
284
285 assert_eq!(
286 last_sent_commit_index, last_commit_index,
287 "We should have sent all commits up to the last commit {}",
288 last_commit_index
289 );
290
291 info!(
292 "Commit observer recovery [{}..={}] completed, took {:?}",
293 replay_after_commit_index + 1,
294 last_commit_index,
295 now.elapsed()
296 );
297 }
298
299 fn report_metrics(&self, committed: &[CommittedSubDag]) {
300 let metrics = &self.context.metrics.node_metrics;
301 let utc_now = self.context.clock.timestamp_utc_ms();
302
303 for commit in committed {
304 info!(
305 "Consensus commit {} with leader {} has {} blocks",
306 commit.commit_ref,
307 commit.leader,
308 commit.blocks.len()
309 );
310
311 metrics
312 .last_committed_leader_round
313 .set(commit.leader.round as i64);
314 metrics
315 .last_commit_index
316 .set(commit.commit_ref.index as i64);
317 metrics
318 .blocks_per_commit_count
319 .observe(commit.blocks.len() as f64);
320
321 for block in &commit.blocks {
322 let latency_ms = utc_now
323 .checked_sub(block.timestamp_ms())
324 .unwrap_or_default();
325 metrics
326 .block_commit_latency
327 .observe(Duration::from_millis(latency_ms).as_secs_f64());
328 }
329 }
330
331 self.context
332 .metrics
333 .node_metrics
334 .sub_dags_per_commit_count
335 .observe(committed.len() as f64);
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use consensus_config::AuthorityIndex;
342 use consensus_types::block::BlockRef;
343 use mysten_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
344 use parking_lot::RwLock;
345 use rstest::rstest;
346 use tokio::time::timeout;
347
348 use super::*;
349 use crate::{
350 CommitIndex, block_verifier::NoopBlockVerifier, context::Context, dag_state::DagState,
351 linearizer::median_timestamp_by_stake, storage::mem_store::MemStore,
352 test_dag_builder::DagBuilder,
353 };
354
355 #[rstest]
356 #[tokio::test]
357 async fn test_handle_commit() {
358 use crate::leader_schedule::LeaderSwapTable;
359
360 telemetry_subscribers::init_for_testing();
361 let num_authorities = 4;
362 let (context, _keys) = Context::new_for_test(num_authorities);
363 let context = Arc::new(context);
364
365 let mem_store = Arc::new(MemStore::new());
366 let dag_state = Arc::new(RwLock::new(DagState::new(
367 context.clone(),
368 mem_store.clone(),
369 )));
370 let last_processed_commit_index = 0;
371 let (commit_consumer, mut commit_receiver) =
372 CommitConsumerArgs::new(0, last_processed_commit_index);
373 let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
374 let transaction_certifier = TransactionCertifier::new(
375 context.clone(),
376 Arc::new(NoopBlockVerifier {}),
377 dag_state.clone(),
378 blocks_sender,
379 );
380 const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 5;
381 let leader_schedule = Arc::new(
382 LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
383 .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
384 );
385
386 let mut observer = CommitObserver::new(
387 context.clone(),
388 commit_consumer,
389 dag_state.clone(),
390 transaction_certifier.clone(),
391 leader_schedule.clone(),
392 )
393 .await;
394
395 let num_rounds = 10;
397 let mut builder = DagBuilder::new(context.clone());
398 builder
399 .layers(1..=num_rounds)
400 .build()
401 .persist_layers(dag_state.clone());
402 transaction_certifier.add_voted_blocks(
403 builder
404 .all_blocks()
405 .iter()
406 .map(|b| (b.clone(), vec![]))
407 .collect(),
408 );
409
410 let leaders = builder
411 .leader_blocks(1..=num_rounds)
412 .into_iter()
413 .map(Option::unwrap)
414 .collect::<Vec<_>>();
415
416 let mut commits = observer
418 .handle_commit(leaders[0..5].to_vec(), true)
419 .unwrap();
420
421 leader_schedule.update_leader_schedule_v2(&dag_state);
423
424 commits.extend(observer.handle_commit(leaders[5..].to_vec(), true).unwrap());
426
427 let mut expected_stored_refs: Vec<BlockRef> = vec![];
429 for (idx, subdag) in commits.iter().enumerate() {
430 tracing::info!("{subdag:?}");
431 assert_eq!(subdag.leader, leaders[idx].reference());
432
433 if idx == 5 {
435 let scores = vec![
436 (AuthorityIndex::new_for_test(1), 9),
437 (AuthorityIndex::new_for_test(3), 9),
438 (AuthorityIndex::new_for_test(0), 9),
439 (AuthorityIndex::new_for_test(2), 9),
440 ];
441 assert_eq!(subdag.reputation_scores_desc, scores);
442 } else {
443 assert!(subdag.reputation_scores_desc.is_empty());
444 }
445
446 let expected_ts = {
447 let block_refs = leaders[idx]
448 .ancestors()
449 .iter()
450 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
451 .cloned()
452 .collect::<Vec<_>>();
453 let blocks = dag_state
454 .read()
455 .get_blocks(&block_refs)
456 .into_iter()
457 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
458 median_timestamp_by_stake(&context, blocks).unwrap()
459 };
460
461 let expected_ts = if idx == 0 {
462 expected_ts
463 } else {
464 expected_ts.max(commits[idx - 1].timestamp_ms)
465 };
466
467 assert_eq!(expected_ts, subdag.timestamp_ms);
468
469 if idx == 0 {
470 assert_eq!(subdag.blocks.len(), 1);
473 } else {
474 assert_eq!(subdag.blocks.len(), num_authorities);
477 }
478 for block in subdag.blocks.iter() {
479 expected_stored_refs.push(block.reference());
480 assert!(block.round() <= leaders[idx].round());
481 }
482 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
483 }
484
485 let mut processed_subdag_index = 0;
487 while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
488 assert_eq!(subdag, commits[processed_subdag_index]);
489 processed_subdag_index = subdag.commit_ref.index as usize;
490 if processed_subdag_index == leaders.len() {
491 break;
492 }
493 }
494 assert_eq!(processed_subdag_index, leaders.len());
495
496 verify_channel_empty(&mut commit_receiver).await;
497
498 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
500 assert_eq!(
501 last_commit.index(),
502 commits.last().unwrap().commit_ref.index
503 );
504 let all_stored_commits = mem_store
505 .scan_commits((0..=CommitIndex::MAX).into())
506 .unwrap();
507 assert_eq!(all_stored_commits.len(), leaders.len());
508 let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
509 assert!(blocks_existence.iter().all(|exists| *exists));
510 }
511
512 #[tokio::test]
513 async fn test_recover_and_send_commits() {
514 telemetry_subscribers::init_for_testing();
515 let num_authorities = 4;
516 let context = Arc::new(Context::new_for_test(num_authorities).0);
517 let mem_store = Arc::new(MemStore::new());
518 let dag_state = Arc::new(RwLock::new(DagState::new(
519 context.clone(),
520 mem_store.clone(),
521 )));
522 let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
523 let transaction_certifier = TransactionCertifier::new(
524 context.clone(),
525 Arc::new(NoopBlockVerifier {}),
526 dag_state.clone(),
527 blocks_sender,
528 );
529 let last_processed_commit_index = 0;
530 let (commit_consumer, mut commit_receiver) =
531 CommitConsumerArgs::new(0, last_processed_commit_index);
532 let leader_schedule = Arc::new(LeaderSchedule::from_store(
533 context.clone(),
534 dag_state.clone(),
535 ));
536
537 let mut observer = CommitObserver::new(
538 context.clone(),
539 commit_consumer,
540 dag_state.clone(),
541 transaction_certifier.clone(),
542 leader_schedule.clone(),
543 )
544 .await;
545
546 let num_rounds = 10;
548 let mut builder = DagBuilder::new(context.clone());
549 builder
550 .layers(1..=num_rounds)
551 .build()
552 .persist_layers(dag_state.clone());
553 transaction_certifier.add_voted_blocks(
554 builder
555 .all_blocks()
556 .iter()
557 .map(|b| (b.clone(), vec![]))
558 .collect(),
559 );
560
561 let leaders = builder
562 .leader_blocks(1..=num_rounds)
563 .into_iter()
564 .map(Option::unwrap)
565 .collect::<Vec<_>>();
566
567 let expected_last_processed_index: usize = 2;
570 let mut commits = observer
571 .handle_commit(leaders[..expected_last_processed_index].to_vec(), true)
572 .unwrap();
573
574 let mut processed_subdag_index = 0;
576 while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
577 tracing::info!("Processed {subdag}");
578 assert_eq!(subdag, commits[processed_subdag_index]);
579 assert_eq!(subdag.reputation_scores_desc, vec![]);
580 processed_subdag_index = subdag.commit_ref.index as usize;
581 if processed_subdag_index == expected_last_processed_index {
582 break;
583 }
584 }
585 assert_eq!(processed_subdag_index, expected_last_processed_index);
586
587 verify_channel_empty(&mut commit_receiver).await;
588
589 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
591 assert_eq!(
592 last_commit.index(),
593 expected_last_processed_index as CommitIndex
594 );
595
596 commits.append(
600 &mut observer
601 .handle_commit(leaders[expected_last_processed_index..].to_vec(), true)
602 .unwrap(),
603 );
604
605 let expected_last_sent_index = num_rounds as usize;
606 while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
607 tracing::info!("{subdag} was sent but not processed by consumer");
608 assert_eq!(subdag, commits[processed_subdag_index]);
609 assert!(subdag.decided_with_local_blocks);
610 assert_eq!(subdag.reputation_scores_desc, vec![]);
611 processed_subdag_index = subdag.commit_ref.index as usize;
612 if processed_subdag_index == expected_last_sent_index {
613 break;
614 }
615 }
616 assert_eq!(processed_subdag_index, expected_last_sent_index);
617
618 verify_channel_empty(&mut commit_receiver).await;
619
620 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
624 assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
625
626 {
628 let replay_after_commit_index = 2;
629 let consumer_last_processed_commit_index = 10;
630 let dag_state = Arc::new(RwLock::new(DagState::new(
631 context.clone(),
632 mem_store.clone(),
633 )));
634 let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
635 replay_after_commit_index,
636 consumer_last_processed_commit_index,
637 );
638 let _observer = CommitObserver::new(
639 context.clone(),
640 commit_consumer,
641 dag_state.clone(),
642 transaction_certifier.clone(),
643 leader_schedule.clone(),
644 )
645 .await;
646
647 let mut processed_subdag_index = replay_after_commit_index;
648 while let Ok(Some(mut subdag)) =
649 timeout(Duration::from_secs(1), commit_receiver.recv()).await
650 {
651 tracing::info!("Received {subdag} on recovery");
652 assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
653 assert!(subdag.recovered_rejected_transactions);
654
655 subdag.recovered_rejected_transactions = false;
657 assert_eq!(subdag, commits[processed_subdag_index as usize]);
658
659 assert!(subdag.decided_with_local_blocks);
660 assert_eq!(subdag.reputation_scores_desc, vec![]);
661 processed_subdag_index = subdag.commit_ref.index;
662 if processed_subdag_index == consumer_last_processed_commit_index {
663 break;
664 }
665 }
666 assert_eq!(processed_subdag_index, consumer_last_processed_commit_index);
667
668 verify_channel_empty(&mut commit_receiver).await;
669 }
670
671 {
673 let replay_after_commit_index = 10;
674 let consumer_last_processed_commit_index = 10;
675 let dag_state = Arc::new(RwLock::new(DagState::new(
676 context.clone(),
677 mem_store.clone(),
678 )));
679 let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
682 replay_after_commit_index,
683 consumer_last_processed_commit_index,
684 );
685 let _observer = CommitObserver::new(
686 context.clone(),
687 commit_consumer,
688 dag_state.clone(),
689 transaction_certifier.clone(),
690 leader_schedule.clone(),
691 )
692 .await;
693
694 verify_channel_empty(&mut commit_receiver).await;
697 }
698
699 {
701 let replay_after_commit_index = 2;
702 let consumer_last_processed_commit_index = 4;
703 let dag_state = Arc::new(RwLock::new(DagState::new(
704 context.clone(),
705 mem_store.clone(),
706 )));
707 let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
708 replay_after_commit_index,
709 consumer_last_processed_commit_index,
710 );
711 let _observer = CommitObserver::new(
712 context.clone(),
713 commit_consumer,
714 dag_state.clone(),
715 transaction_certifier.clone(),
716 leader_schedule.clone(),
717 )
718 .await;
719
720 let mut processed_subdag_index = replay_after_commit_index;
723 while let Ok(Some(subdag)) =
724 timeout(Duration::from_secs(1), commit_receiver.recv()).await
725 {
726 tracing::info!("Received {subdag} on recovery");
727 assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
728 assert!(subdag.decided_with_local_blocks);
729 assert_eq!(subdag.reputation_scores_desc, vec![]);
730 processed_subdag_index = subdag.commit_ref.index;
731 if processed_subdag_index == expected_last_sent_index as CommitIndex {
732 break;
733 }
734 }
735 assert_eq!(
736 processed_subdag_index,
737 expected_last_sent_index as CommitIndex
738 );
739
740 verify_channel_empty(&mut commit_receiver).await;
741 }
742
743 {
747 let replay_after_commit_index = 2;
748 let consumer_last_processed_commit_index = 20;
749 let dag_state = Arc::new(RwLock::new(DagState::new(
750 context.clone(),
751 mem_store.clone(),
752 )));
753 let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
754 replay_after_commit_index,
755 consumer_last_processed_commit_index,
756 );
757 let _observer = CommitObserver::new(
758 context.clone(),
759 commit_consumer,
760 dag_state.clone(),
761 transaction_certifier.clone(),
762 leader_schedule.clone(),
763 )
764 .await;
765
766 let mut processed_subdag_index = replay_after_commit_index;
769 while let Ok(Some(mut subdag)) =
770 timeout(Duration::from_secs(1), commit_receiver.recv()).await
771 {
772 tracing::info!("Received {subdag} on recovery");
773 assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
774 assert!(subdag.recovered_rejected_transactions);
775
776 subdag.recovered_rejected_transactions = false;
778 assert_eq!(subdag, commits[processed_subdag_index as usize]);
779
780 assert!(subdag.decided_with_local_blocks);
781 assert_eq!(subdag.reputation_scores_desc, vec![]);
782 processed_subdag_index = subdag.commit_ref.index;
783 if processed_subdag_index == expected_last_sent_index as CommitIndex {
784 break;
785 }
786 }
787 assert_eq!(
788 processed_subdag_index,
789 expected_last_sent_index as CommitIndex
790 );
791 assert_eq!(10, expected_last_sent_index);
792
793 verify_channel_empty(&mut commit_receiver).await;
794 }
795 }
796
797 async fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
799 if let Ok(Some(_)) = timeout(Duration::from_secs(1), receiver.recv()).await {
800 panic!("Expected the consensus output channel to be empty, but found more subdags.")
801 }
802 }
803}