1use std::{sync::Arc, time::Duration};
5
6use parking_lot::RwLock;
7use tokio::time::Instant;
8use tracing::info;
9
10use crate::{
11 CommitConsumerArgs, CommittedSubDag,
12 block::{BlockAPI, VerifiedBlock},
13 commit::{CommitAPI, load_committed_subdag_from_store},
14 commit_finalizer::{CommitFinalizer, CommitFinalizerHandle},
15 context::Context,
16 dag_state::DagState,
17 error::ConsensusResult,
18 leader_schedule::LeaderSchedule,
19 linearizer::Linearizer,
20 storage::Store,
21 transaction_certifier::TransactionCertifier,
22};
23
24pub(crate) struct CommitObserver {
37 context: Arc<Context>,
38 dag_state: Arc<RwLock<DagState>>,
39 store: Arc<dyn Store>,
41 transaction_certifier: TransactionCertifier,
42 leader_schedule: Arc<LeaderSchedule>,
43 commit_interpreter: Linearizer,
45 commit_finalizer_handle: CommitFinalizerHandle,
47}
48
49impl CommitObserver {
50 pub(crate) async fn new(
51 context: Arc<Context>,
52 commit_consumer: CommitConsumerArgs,
53 dag_state: Arc<RwLock<DagState>>,
54 transaction_certifier: TransactionCertifier,
55 leader_schedule: Arc<LeaderSchedule>,
56 ) -> Self {
57 let store = dag_state.read().store();
58 let commit_interpreter = Linearizer::new(context.clone(), dag_state.clone());
59 let commit_finalizer_handle = CommitFinalizer::start(
60 context.clone(),
61 dag_state.clone(),
62 transaction_certifier.clone(),
63 commit_consumer.commit_sender.clone(),
64 );
65
66 let mut observer = Self {
67 context,
68 dag_state,
69 store,
70 transaction_certifier,
71 leader_schedule,
72 commit_interpreter,
73 commit_finalizer_handle,
74 };
75 observer.recover_and_send_commits(&commit_consumer).await;
76
77 tokio::runtime::Handle::current()
81 .spawn_blocking({
82 let transaction_certifier = observer.transaction_certifier.clone();
83 let gc_round = observer.dag_state.read().gc_round();
84 move || {
85 transaction_certifier.recover_blocks_after_round(gc_round);
86 }
87 })
88 .await
89 .expect("Spawn blocking should not fail");
90
91 observer
92 }
93
94 pub(crate) fn handle_commit(
99 &mut self,
100 committed_leaders: Vec<VerifiedBlock>,
101 local: bool,
102 ) -> ConsensusResult<Vec<CommittedSubDag>> {
103 let _s = self
104 .context
105 .metrics
106 .node_metrics
107 .scope_processing_time
108 .with_label_values(&["CommitObserver::handle_commit"])
109 .start_timer();
110
111 let mut committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
112 self.report_metrics(&committed_sub_dags);
113
114 for subdag in committed_sub_dags.iter_mut() {
116 subdag.decided_with_local_blocks = local;
117 }
118
119 let schedule_updated = self
121 .leader_schedule
122 .leader_schedule_updated(&self.dag_state);
123 if schedule_updated {
124 let reputation_scores_desc = self
125 .leader_schedule
126 .leader_swap_table
127 .read()
128 .reputation_scores_desc
129 .clone();
130 committed_sub_dags[0].reputation_scores_desc = reputation_scores_desc;
131 }
132
133 for commit in committed_sub_dags.iter() {
134 tracing::debug!(
135 "Sending commit {} leader {} to finalization and execution.",
136 commit.commit_ref,
137 commit.leader
138 );
139 tracing::trace!("Committed subdag: {:#?}", commit);
140 self.commit_finalizer_handle.send(commit.clone())?;
142 }
143
144 self.dag_state
145 .write()
146 .add_scoring_subdags(committed_sub_dags.clone());
147
148 Ok(committed_sub_dags)
149 }
150
151 async fn recover_and_send_commits(&mut self, commit_consumer: &CommitConsumerArgs) {
152 let now = Instant::now();
153
154 let replay_after_commit_index = commit_consumer.replay_after_commit_index;
155
156 let last_commit = self
157 .store
158 .read_last_commit()
159 .expect("Reading the last commit should not fail");
160 let Some(last_commit) = &last_commit else {
161 assert_eq!(
162 replay_after_commit_index, 0,
163 "Commit replay should start at the beginning if there is no commit history"
164 );
165 info!("Nothing to recover for commit observer - starting new epoch");
166 return;
167 };
168
169 let last_commit_index = last_commit.index();
170 if last_commit_index == replay_after_commit_index {
171 info!(
172 "Nothing to recover for commit observer - replay is requested immediately after last commit index {last_commit_index}"
173 );
174 return;
175 }
176 assert!(last_commit_index > replay_after_commit_index);
177
178 info!(
179 "Recovering commit observer in the range [{}..={last_commit_index}]",
180 replay_after_commit_index + 1,
181 );
182
183 const COMMIT_RECOVERY_BATCH_SIZE: u32 = if cfg!(test) { 3 } else { 250 };
186
187 let mut last_sent_commit_index = replay_after_commit_index;
188
189 self.dag_state.read().ensure_commits_to_write_is_empty();
191
192 let mut seen_unfinalized_commit = false;
193 for start_index in (replay_after_commit_index + 1..=last_commit_index)
194 .step_by(COMMIT_RECOVERY_BATCH_SIZE as usize)
195 {
196 let end_index = start_index
197 .saturating_add(COMMIT_RECOVERY_BATCH_SIZE - 1)
198 .min(last_commit_index);
199
200 let unsent_commits = self
201 .store
202 .scan_commits((start_index..=end_index).into())
203 .expect("Scanning commits should not fail");
204 assert_eq!(
205 unsent_commits.len() as u32,
206 end_index.checked_sub(start_index).unwrap() + 1,
207 "Gap in scanned commits: start index: {start_index}, end index: {end_index}, commits: {:?}",
208 unsent_commits,
209 );
210
211 self.dag_state
214 .write()
215 .recover_commits_to_write(unsent_commits.clone());
216
217 info!(
218 "Recovering {} unsent commits in range [{start_index}..={end_index}]",
219 unsent_commits.len()
220 );
221
222 for commit in unsent_commits.into_iter() {
225 last_sent_commit_index += 1;
227 assert_eq!(commit.index(), last_sent_commit_index);
228
229 let reputation_scores = if commit.index() == last_commit_index {
233 self.leader_schedule
234 .leader_swap_table
235 .read()
236 .reputation_scores_desc
237 .clone()
238 } else {
239 vec![]
240 };
241
242 let committed_sub_dag = load_committed_subdag_from_store(
243 self.store.as_ref(),
244 commit,
245 reputation_scores,
246 );
247
248 if !committed_sub_dag.recovered_rejected_transactions && !seen_unfinalized_commit {
249 info!(
250 "Starting to recover unfinalized commit from {}",
251 committed_sub_dag.commit_ref
252 );
253 seen_unfinalized_commit = true;
256 }
257
258 if seen_unfinalized_commit {
259 assert!(!committed_sub_dag.recovered_rejected_transactions);
261 assert!(!committed_sub_dag.decided_with_local_blocks);
264 self.transaction_certifier
267 .recover_and_vote_on_blocks(committed_sub_dag.blocks.clone());
268 }
269
270 self.commit_finalizer_handle
271 .send(committed_sub_dag)
272 .unwrap();
273
274 self.context
275 .metrics
276 .node_metrics
277 .commit_observer_last_recovered_commit_index
278 .set(last_sent_commit_index as i64);
279
280 tokio::task::yield_now().await;
281 }
282 }
283
284 assert_eq!(
285 last_sent_commit_index, last_commit_index,
286 "We should have sent all commits up to the last commit {}",
287 last_commit_index
288 );
289
290 info!(
291 "Commit observer recovery [{}..={}] completed, took {:?}",
292 replay_after_commit_index + 1,
293 last_commit_index,
294 now.elapsed()
295 );
296 }
297
298 fn report_metrics(&self, committed: &[CommittedSubDag]) {
299 let metrics = &self.context.metrics.node_metrics;
300 let utc_now = self.context.clock.timestamp_utc_ms();
301
302 for commit in committed {
303 info!(
304 "Consensus commit {} with leader {} has {} blocks",
305 commit.commit_ref,
306 commit.leader,
307 commit.blocks.len()
308 );
309
310 metrics
311 .last_committed_leader_round
312 .set(commit.leader.round as i64);
313 metrics
314 .last_commit_index
315 .set(commit.commit_ref.index as i64);
316 metrics
317 .blocks_per_commit_count
318 .observe(commit.blocks.len() as f64);
319
320 for block in &commit.blocks {
321 let latency_ms = utc_now
322 .checked_sub(block.timestamp_ms())
323 .unwrap_or_default();
324 metrics
325 .block_commit_latency
326 .observe(Duration::from_millis(latency_ms).as_secs_f64());
327 }
328 }
329
330 self.context
331 .metrics
332 .node_metrics
333 .sub_dags_per_commit_count
334 .observe(committed.len() as f64);
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use consensus_config::AuthorityIndex;
341 use consensus_types::block::BlockRef;
342 use mysten_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
343 use parking_lot::RwLock;
344 use rstest::rstest;
345 use tokio::time::timeout;
346
347 use super::*;
348 use crate::{
349 CommitIndex, block_verifier::NoopBlockVerifier, context::Context, dag_state::DagState,
350 linearizer::median_timestamp_by_stake, storage::mem_store::MemStore,
351 test_dag_builder::DagBuilder,
352 };
353
354 #[rstest]
355 #[tokio::test]
356 async fn test_handle_commit() {
357 use crate::leader_schedule::LeaderSwapTable;
358
359 telemetry_subscribers::init_for_testing();
360 let num_authorities = 4;
361 let (context, _keys) = Context::new_for_test(num_authorities);
362 let context = Arc::new(context);
363
364 let mem_store = Arc::new(MemStore::new());
365 let dag_state = Arc::new(RwLock::new(DagState::new(
366 context.clone(),
367 mem_store.clone(),
368 )));
369 let last_processed_commit_index = 0;
370 let (commit_consumer, mut commit_receiver, _transaction_receiver) =
371 CommitConsumerArgs::new(0, last_processed_commit_index);
372 let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
373 let transaction_certifier = TransactionCertifier::new(
374 context.clone(),
375 Arc::new(NoopBlockVerifier {}),
376 dag_state.clone(),
377 blocks_sender,
378 );
379 const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 5;
380 let leader_schedule = Arc::new(
381 LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
382 .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
383 );
384
385 let mut observer = CommitObserver::new(
386 context.clone(),
387 commit_consumer,
388 dag_state.clone(),
389 transaction_certifier.clone(),
390 leader_schedule.clone(),
391 )
392 .await;
393
394 let num_rounds = 10;
396 let mut builder = DagBuilder::new(context.clone());
397 builder
398 .layers(1..=num_rounds)
399 .build()
400 .persist_layers(dag_state.clone());
401 transaction_certifier.add_voted_blocks(
402 builder
403 .all_blocks()
404 .iter()
405 .map(|b| (b.clone(), vec![]))
406 .collect(),
407 );
408
409 let leaders = builder
410 .leader_blocks(1..=num_rounds)
411 .into_iter()
412 .map(Option::unwrap)
413 .collect::<Vec<_>>();
414
415 let mut commits = observer
417 .handle_commit(leaders[0..5].to_vec(), true)
418 .unwrap();
419
420 leader_schedule.update_leader_schedule_v2(&dag_state);
422
423 commits.extend(observer.handle_commit(leaders[5..].to_vec(), true).unwrap());
425
426 let mut expected_stored_refs: Vec<BlockRef> = vec![];
428 for (idx, subdag) in commits.iter().enumerate() {
429 tracing::info!("{subdag:?}");
430 assert_eq!(subdag.leader, leaders[idx].reference());
431
432 if idx == 5 {
434 let scores = vec![
435 (AuthorityIndex::new_for_test(1), 9),
436 (AuthorityIndex::new_for_test(3), 9),
437 (AuthorityIndex::new_for_test(0), 9),
438 (AuthorityIndex::new_for_test(2), 9),
439 ];
440 assert_eq!(subdag.reputation_scores_desc, scores);
441 } else {
442 assert!(subdag.reputation_scores_desc.is_empty());
443 }
444
445 let expected_ts = {
446 let block_refs = leaders[idx]
447 .ancestors()
448 .iter()
449 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
450 .cloned()
451 .collect::<Vec<_>>();
452 let blocks = dag_state
453 .read()
454 .get_blocks(&block_refs)
455 .into_iter()
456 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
457 median_timestamp_by_stake(&context, blocks).unwrap()
458 };
459
460 let expected_ts = if idx == 0 {
461 expected_ts
462 } else {
463 expected_ts.max(commits[idx - 1].timestamp_ms)
464 };
465
466 assert_eq!(expected_ts, subdag.timestamp_ms);
467
468 if idx == 0 {
469 assert_eq!(subdag.blocks.len(), 1);
472 } else {
473 assert_eq!(subdag.blocks.len(), num_authorities);
476 }
477 for block in subdag.blocks.iter() {
478 expected_stored_refs.push(block.reference());
479 assert!(block.round() <= leaders[idx].round());
480 }
481 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
482 }
483
484 let mut processed_subdag_index = 0;
486 while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
487 assert_eq!(subdag, commits[processed_subdag_index]);
488 processed_subdag_index = subdag.commit_ref.index as usize;
489 if processed_subdag_index == leaders.len() {
490 break;
491 }
492 }
493 assert_eq!(processed_subdag_index, leaders.len());
494
495 verify_channel_empty(&mut commit_receiver).await;
496
497 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
499 assert_eq!(
500 last_commit.index(),
501 commits.last().unwrap().commit_ref.index
502 );
503 let all_stored_commits = mem_store
504 .scan_commits((0..=CommitIndex::MAX).into())
505 .unwrap();
506 assert_eq!(all_stored_commits.len(), leaders.len());
507 let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
508 assert!(blocks_existence.iter().all(|exists| *exists));
509 }
510
511 #[tokio::test]
512 async fn test_recover_and_send_commits() {
513 telemetry_subscribers::init_for_testing();
514 let num_authorities = 4;
515 let context = Arc::new(Context::new_for_test(num_authorities).0);
516 let mem_store = Arc::new(MemStore::new());
517 let dag_state = Arc::new(RwLock::new(DagState::new(
518 context.clone(),
519 mem_store.clone(),
520 )));
521 let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
522 let transaction_certifier = TransactionCertifier::new(
523 context.clone(),
524 Arc::new(NoopBlockVerifier {}),
525 dag_state.clone(),
526 blocks_sender,
527 );
528 let last_processed_commit_index = 0;
529 let (commit_consumer, mut commit_receiver, _transaction_receiver) =
530 CommitConsumerArgs::new(0, last_processed_commit_index);
531 let leader_schedule = Arc::new(LeaderSchedule::from_store(
532 context.clone(),
533 dag_state.clone(),
534 ));
535
536 let mut observer = CommitObserver::new(
537 context.clone(),
538 commit_consumer,
539 dag_state.clone(),
540 transaction_certifier.clone(),
541 leader_schedule.clone(),
542 )
543 .await;
544
545 let num_rounds = 10;
547 let mut builder = DagBuilder::new(context.clone());
548 builder
549 .layers(1..=num_rounds)
550 .build()
551 .persist_layers(dag_state.clone());
552 transaction_certifier.add_voted_blocks(
553 builder
554 .all_blocks()
555 .iter()
556 .map(|b| (b.clone(), vec![]))
557 .collect(),
558 );
559
560 let leaders = builder
561 .leader_blocks(1..=num_rounds)
562 .into_iter()
563 .map(Option::unwrap)
564 .collect::<Vec<_>>();
565
566 let expected_last_processed_index: usize = 2;
569 let mut commits = observer
570 .handle_commit(leaders[..expected_last_processed_index].to_vec(), true)
571 .unwrap();
572
573 let mut processed_subdag_index = 0;
575 while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
576 tracing::info!("Processed {subdag}");
577 assert_eq!(subdag, commits[processed_subdag_index]);
578 assert_eq!(subdag.reputation_scores_desc, vec![]);
579 processed_subdag_index = subdag.commit_ref.index as usize;
580 if processed_subdag_index == expected_last_processed_index {
581 break;
582 }
583 }
584 assert_eq!(processed_subdag_index, expected_last_processed_index);
585
586 verify_channel_empty(&mut commit_receiver).await;
587
588 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
590 assert_eq!(
591 last_commit.index(),
592 expected_last_processed_index as CommitIndex
593 );
594
595 commits.append(
599 &mut observer
600 .handle_commit(leaders[expected_last_processed_index..].to_vec(), true)
601 .unwrap(),
602 );
603
604 let expected_last_sent_index = num_rounds as usize;
605 while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
606 tracing::info!("{subdag} was sent but not processed by consumer");
607 assert_eq!(subdag, commits[processed_subdag_index]);
608 assert!(subdag.decided_with_local_blocks);
609 assert_eq!(subdag.reputation_scores_desc, vec![]);
610 processed_subdag_index = subdag.commit_ref.index as usize;
611 if processed_subdag_index == expected_last_sent_index {
612 break;
613 }
614 }
615 assert_eq!(processed_subdag_index, expected_last_sent_index);
616
617 verify_channel_empty(&mut commit_receiver).await;
618
619 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
623 assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
624
625 {
627 let replay_after_commit_index = 2;
628 let consumer_last_processed_commit_index = 10;
629 let dag_state = Arc::new(RwLock::new(DagState::new(
630 context.clone(),
631 mem_store.clone(),
632 )));
633 let (commit_consumer, mut commit_receiver, _transaction_receiver) =
634 CommitConsumerArgs::new(
635 replay_after_commit_index,
636 consumer_last_processed_commit_index,
637 );
638 let _observer = CommitObserver::new(
639 context.clone(),
640 commit_consumer,
641 dag_state.clone(),
642 transaction_certifier.clone(),
643 leader_schedule.clone(),
644 )
645 .await;
646
647 let mut processed_subdag_index = replay_after_commit_index;
648 while let Ok(Some(mut subdag)) =
649 timeout(Duration::from_secs(1), commit_receiver.recv()).await
650 {
651 tracing::info!("Received {subdag} on recovery");
652 assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
653 assert!(subdag.recovered_rejected_transactions);
654
655 subdag.recovered_rejected_transactions = false;
657 assert_eq!(subdag, commits[processed_subdag_index as usize]);
658
659 assert!(subdag.decided_with_local_blocks);
660 assert_eq!(subdag.reputation_scores_desc, vec![]);
661 processed_subdag_index = subdag.commit_ref.index;
662 if processed_subdag_index == consumer_last_processed_commit_index {
663 break;
664 }
665 }
666 assert_eq!(processed_subdag_index, consumer_last_processed_commit_index);
667
668 verify_channel_empty(&mut commit_receiver).await;
669 }
670
671 {
673 let replay_after_commit_index = 10;
674 let consumer_last_processed_commit_index = 10;
675 let dag_state = Arc::new(RwLock::new(DagState::new(
676 context.clone(),
677 mem_store.clone(),
678 )));
679 let (commit_consumer, mut commit_receiver, _transaction_receiver) =
682 CommitConsumerArgs::new(
683 replay_after_commit_index,
684 consumer_last_processed_commit_index,
685 );
686 let _observer = CommitObserver::new(
687 context.clone(),
688 commit_consumer,
689 dag_state.clone(),
690 transaction_certifier.clone(),
691 leader_schedule.clone(),
692 )
693 .await;
694
695 verify_channel_empty(&mut commit_receiver).await;
698 }
699
700 {
702 let replay_after_commit_index = 2;
703 let consumer_last_processed_commit_index = 4;
704 let dag_state = Arc::new(RwLock::new(DagState::new(
705 context.clone(),
706 mem_store.clone(),
707 )));
708 let (commit_consumer, mut commit_receiver, _transaction_receiver) =
709 CommitConsumerArgs::new(
710 replay_after_commit_index,
711 consumer_last_processed_commit_index,
712 );
713 let _observer = CommitObserver::new(
714 context.clone(),
715 commit_consumer,
716 dag_state.clone(),
717 transaction_certifier.clone(),
718 leader_schedule.clone(),
719 )
720 .await;
721
722 let mut processed_subdag_index = replay_after_commit_index;
725 while let Ok(Some(subdag)) =
726 timeout(Duration::from_secs(1), commit_receiver.recv()).await
727 {
728 tracing::info!("Received {subdag} on recovery");
729 assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
730 assert!(subdag.decided_with_local_blocks);
731 assert_eq!(subdag.reputation_scores_desc, vec![]);
732 processed_subdag_index = subdag.commit_ref.index;
733 if processed_subdag_index == expected_last_sent_index as CommitIndex {
734 break;
735 }
736 }
737 assert_eq!(
738 processed_subdag_index,
739 expected_last_sent_index as CommitIndex
740 );
741
742 verify_channel_empty(&mut commit_receiver).await;
743 }
744
745 {
749 let replay_after_commit_index = 2;
750 let consumer_last_processed_commit_index = 20;
751 let dag_state = Arc::new(RwLock::new(DagState::new(
752 context.clone(),
753 mem_store.clone(),
754 )));
755 let (commit_consumer, mut commit_receiver, _transaction_receiver) =
756 CommitConsumerArgs::new(
757 replay_after_commit_index,
758 consumer_last_processed_commit_index,
759 );
760 let _observer = CommitObserver::new(
761 context.clone(),
762 commit_consumer,
763 dag_state.clone(),
764 transaction_certifier.clone(),
765 leader_schedule.clone(),
766 )
767 .await;
768
769 let mut processed_subdag_index = replay_after_commit_index;
772 while let Ok(Some(mut subdag)) =
773 timeout(Duration::from_secs(1), commit_receiver.recv()).await
774 {
775 tracing::info!("Received {subdag} on recovery");
776 assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
777 assert!(subdag.recovered_rejected_transactions);
778
779 subdag.recovered_rejected_transactions = false;
781 assert_eq!(subdag, commits[processed_subdag_index as usize]);
782
783 assert!(subdag.decided_with_local_blocks);
784 assert_eq!(subdag.reputation_scores_desc, vec![]);
785 processed_subdag_index = subdag.commit_ref.index;
786 if processed_subdag_index == expected_last_sent_index as CommitIndex {
787 break;
788 }
789 }
790 assert_eq!(
791 processed_subdag_index,
792 expected_last_sent_index as CommitIndex
793 );
794 assert_eq!(10, expected_last_sent_index);
795
796 verify_channel_empty(&mut commit_receiver).await;
797 }
798 }
799
800 async fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
802 if let Ok(Some(_)) = timeout(Duration::from_secs(1), receiver.recv()).await {
803 panic!("Expected the consensus output channel to be empty, but found more subdags.")
804 }
805 }
806}