1use std::{sync::Arc, time::Duration};
5
6use parking_lot::RwLock;
7use tokio::time::Instant;
8use tracing::info;
9
10use crate::{
11 CommitConsumerArgs, CommittedSubDag,
12 block::{BlockAPI, VerifiedBlock},
13 commit::{CommitAPI, load_committed_subdag_from_store},
14 commit_finalizer::{CommitFinalizer, CommitFinalizerHandle},
15 context::Context,
16 dag_state::DagState,
17 error::ConsensusResult,
18 linearizer::Linearizer,
19 storage::Store,
20 transaction_vote_tracker::TransactionVoteTracker,
21};
22
23pub(crate) struct CommitObserver {
36 context: Arc<Context>,
37 dag_state: Arc<RwLock<DagState>>,
38 store: Arc<dyn Store>,
40 transaction_vote_tracker: TransactionVoteTracker,
41 commit_interpreter: Linearizer,
43 commit_finalizer_handle: CommitFinalizerHandle,
45}
46
47impl CommitObserver {
48 pub(crate) async fn new(
49 context: Arc<Context>,
50 commit_consumer: CommitConsumerArgs,
51 dag_state: Arc<RwLock<DagState>>,
52 transaction_vote_tracker: TransactionVoteTracker,
53 ) -> Self {
54 let store = dag_state.read().store();
55 let commit_interpreter = Linearizer::new(context.clone(), dag_state.clone());
56 let commit_finalizer_handle = CommitFinalizer::start(
57 context.clone(),
58 dag_state.clone(),
59 transaction_vote_tracker.clone(),
60 commit_consumer.commit_sender.clone(),
61 );
62
63 let mut observer = Self {
64 context,
65 dag_state,
66 store,
67 transaction_vote_tracker,
68 commit_interpreter,
69 commit_finalizer_handle,
70 };
71 observer.recover_and_send_commits(&commit_consumer).await;
72
73 tokio::runtime::Handle::current()
77 .spawn_blocking({
78 let transaction_vote_tracker = observer.transaction_vote_tracker.clone();
79 let gc_round = observer.dag_state.read().gc_round();
80 move || {
81 transaction_vote_tracker.recover_blocks_after_round(gc_round);
82 }
83 })
84 .await
85 .expect("Spawn blocking should not fail");
86
87 observer
88 }
89
90 pub(crate) fn handle_commit(
95 &mut self,
96 committed_leaders: Vec<VerifiedBlock>,
97 local: bool,
98 ) -> ConsensusResult<Vec<CommittedSubDag>> {
99 let _s = self
100 .context
101 .metrics
102 .node_metrics
103 .scope_processing_time
104 .with_label_values(&["CommitObserver::handle_commit"])
105 .start_timer();
106
107 let mut committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
108 self.report_metrics(&committed_sub_dags);
109
110 for subdag in committed_sub_dags.iter_mut() {
112 subdag.decided_with_local_blocks = local;
113 }
114
115 for commit in committed_sub_dags.iter() {
116 tracing::debug!(
117 "Sending commit {} leader {} to finalization and execution.",
118 commit.commit_ref,
119 commit.leader
120 );
121 tracing::trace!("Committed subdag: {:#?}", commit);
122 self.commit_finalizer_handle.send(commit.clone())?;
124 }
125
126 self.dag_state
127 .write()
128 .add_scoring_subdags(committed_sub_dags.clone());
129
130 Ok(committed_sub_dags)
131 }
132
133 async fn recover_and_send_commits(&mut self, commit_consumer: &CommitConsumerArgs) {
134 let now = Instant::now();
135
136 let replay_after_commit_index = commit_consumer.replay_after_commit_index;
137
138 let last_commit = self
139 .store
140 .read_last_commit()
141 .expect("Reading the last commit should not fail");
142 let Some(last_commit) = &last_commit else {
143 assert_eq!(
144 replay_after_commit_index, 0,
145 "Commit replay should start at the beginning if there is no commit history"
146 );
147 info!("Nothing to recover for commit observer - starting new epoch");
148 return;
149 };
150
151 let last_commit_index = last_commit.index();
152 if last_commit_index == replay_after_commit_index {
153 info!(
154 "Nothing to recover for commit observer - replay is requested immediately after last commit index {last_commit_index}"
155 );
156 return;
157 }
158 assert!(last_commit_index > replay_after_commit_index);
159
160 info!(
161 "Recovering commit observer in the range [{}..={last_commit_index}]",
162 replay_after_commit_index + 1,
163 );
164
165 const COMMIT_RECOVERY_BATCH_SIZE: u32 = if cfg!(test) { 3 } else { 250 };
168
169 let mut last_sent_commit_index = replay_after_commit_index;
170
171 self.dag_state.read().ensure_commits_to_write_is_empty();
173
174 let mut seen_unfinalized_commit = false;
175 for start_index in (replay_after_commit_index + 1..=last_commit_index)
176 .step_by(COMMIT_RECOVERY_BATCH_SIZE as usize)
177 {
178 let end_index = start_index
179 .saturating_add(COMMIT_RECOVERY_BATCH_SIZE - 1)
180 .min(last_commit_index);
181
182 let unsent_commits = self
183 .store
184 .scan_commits((start_index..=end_index).into())
185 .expect("Scanning commits should not fail");
186 assert_eq!(
187 unsent_commits.len() as u32,
188 end_index.checked_sub(start_index).unwrap() + 1,
189 "Gap in scanned commits: start index: {start_index}, end index: {end_index}, commits: {:?}",
190 unsent_commits,
191 );
192
193 self.dag_state
196 .write()
197 .recover_commits_to_write(unsent_commits.clone());
198
199 info!(
200 "Recovering {} unsent commits in range [{start_index}..={end_index}]",
201 unsent_commits.len()
202 );
203
204 for commit in unsent_commits.into_iter() {
207 last_sent_commit_index += 1;
209 assert_eq!(commit.index(), last_sent_commit_index);
210
211 let committed_sub_dag =
212 load_committed_subdag_from_store(self.store.as_ref(), commit);
213
214 if !committed_sub_dag.recovered_rejected_transactions && !seen_unfinalized_commit {
215 info!(
216 "Starting to recover unfinalized commit from {}",
217 committed_sub_dag.commit_ref
218 );
219 seen_unfinalized_commit = true;
222 }
223
224 if seen_unfinalized_commit {
225 assert!(!committed_sub_dag.recovered_rejected_transactions);
227 assert!(!committed_sub_dag.decided_with_local_blocks);
230 self.transaction_vote_tracker
233 .recover_and_vote_on_blocks(committed_sub_dag.blocks.clone());
234 }
235
236 self.commit_finalizer_handle
237 .send(committed_sub_dag)
238 .unwrap();
239
240 self.context
241 .metrics
242 .node_metrics
243 .commit_observer_last_recovered_commit_index
244 .set(last_sent_commit_index as i64);
245
246 tokio::task::yield_now().await;
247 }
248 }
249
250 assert_eq!(
251 last_sent_commit_index, last_commit_index,
252 "We should have sent all commits up to the last commit {}",
253 last_commit_index
254 );
255
256 info!(
257 "Commit observer recovery [{}..={}] completed, took {:?}",
258 replay_after_commit_index + 1,
259 last_commit_index,
260 now.elapsed()
261 );
262 }
263
264 fn report_metrics(&self, committed: &[CommittedSubDag]) {
265 let metrics = &self.context.metrics.node_metrics;
266 let utc_now = self.context.clock.timestamp_utc_ms();
267
268 for commit in committed {
269 info!(
270 "Consensus commit {} with leader {} has {} blocks",
271 commit.commit_ref,
272 commit.leader,
273 commit.blocks.len()
274 );
275
276 metrics
277 .last_committed_leader_round
278 .set(commit.leader.round as i64);
279 metrics
280 .last_commit_index
281 .set(commit.commit_ref.index as i64);
282 metrics
283 .blocks_per_commit_count
284 .observe(commit.blocks.len() as f64);
285
286 for block in &commit.blocks {
287 let latency_ms = utc_now
288 .checked_sub(block.timestamp_ms())
289 .unwrap_or_default();
290 metrics
291 .block_commit_latency
292 .observe(Duration::from_millis(latency_ms).as_secs_f64());
293 }
294 }
295
296 self.context
297 .metrics
298 .node_metrics
299 .sub_dags_per_commit_count
300 .observe(committed.len() as f64);
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use consensus_types::block::BlockRef;
307 use mysten_metrics::monitored_mpsc::UnboundedReceiver;
308 use parking_lot::RwLock;
309 use rstest::rstest;
310 use tokio::time::timeout;
311
312 use super::*;
313 use crate::{
314 CommitIndex, block_verifier::NoopBlockVerifier, context::Context, dag_state::DagState,
315 linearizer::median_timestamp_by_stake, storage::mem_store::MemStore,
316 test_dag_builder::DagBuilder,
317 };
318
319 #[rstest]
320 #[tokio::test]
321 async fn test_handle_commit() {
322 telemetry_subscribers::init_for_testing();
323 let num_authorities = 4;
324 let (context, _keys) = Context::new_for_test(num_authorities);
325 let context = Arc::new(context);
326
327 let mem_store = Arc::new(MemStore::new());
328 let dag_state = Arc::new(RwLock::new(DagState::new(
329 context.clone(),
330 mem_store.clone(),
331 )));
332 let last_processed_commit_index = 0;
333 let (commit_consumer, mut commit_receiver) =
334 CommitConsumerArgs::new(0, last_processed_commit_index);
335 let transaction_vote_tracker = TransactionVoteTracker::new(
336 context.clone(),
337 Arc::new(NoopBlockVerifier {}),
338 dag_state.clone(),
339 );
340
341 let mut observer = CommitObserver::new(
342 context.clone(),
343 commit_consumer,
344 dag_state.clone(),
345 transaction_vote_tracker.clone(),
346 )
347 .await;
348
349 let num_rounds = 10;
351 let mut builder = DagBuilder::new(context.clone());
352 builder
353 .layers(1..=num_rounds)
354 .build()
355 .persist_layers(dag_state.clone());
356 transaction_vote_tracker.add_voted_blocks(
357 builder
358 .all_blocks()
359 .iter()
360 .map(|b| (b.clone(), vec![]))
361 .collect(),
362 );
363
364 let leaders = builder
365 .leader_blocks(1..=num_rounds)
366 .into_iter()
367 .map(Option::unwrap)
368 .collect::<Vec<_>>();
369
370 let commits = observer.handle_commit(leaders.clone(), true).unwrap();
371
372 let mut expected_stored_refs: Vec<BlockRef> = vec![];
374 for (idx, subdag) in commits.iter().enumerate() {
375 tracing::info!("{subdag:?}");
376 assert_eq!(subdag.leader, leaders[idx].reference());
377
378 let expected_ts = {
379 let block_refs = leaders[idx]
380 .ancestors()
381 .iter()
382 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
383 .cloned()
384 .collect::<Vec<_>>();
385 let blocks = dag_state
386 .read()
387 .get_blocks(&block_refs)
388 .into_iter()
389 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
390 median_timestamp_by_stake(&context, blocks).unwrap()
391 };
392
393 let expected_ts = if idx == 0 {
394 expected_ts
395 } else {
396 expected_ts.max(commits[idx - 1].timestamp_ms)
397 };
398
399 assert_eq!(expected_ts, subdag.timestamp_ms);
400
401 if idx == 0 {
402 assert_eq!(subdag.blocks.len(), 1);
405 } else {
406 assert_eq!(subdag.blocks.len(), num_authorities);
409 }
410 for block in subdag.blocks.iter() {
411 expected_stored_refs.push(block.reference());
412 assert!(block.round() <= leaders[idx].round());
413 }
414 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
415 }
416
417 let mut processed_subdag_index = 0;
419 while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
420 assert_eq!(subdag, commits[processed_subdag_index]);
421 processed_subdag_index = subdag.commit_ref.index as usize;
422 if processed_subdag_index == leaders.len() {
423 break;
424 }
425 }
426 assert_eq!(processed_subdag_index, leaders.len());
427
428 verify_channel_empty(&mut commit_receiver).await;
429
430 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
432 assert_eq!(
433 last_commit.index(),
434 commits.last().unwrap().commit_ref.index
435 );
436 let all_stored_commits = mem_store
437 .scan_commits((0..=CommitIndex::MAX).into())
438 .unwrap();
439 assert_eq!(all_stored_commits.len(), leaders.len());
440 let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
441 assert!(blocks_existence.iter().all(|exists| *exists));
442 }
443
444 #[tokio::test]
445 async fn test_recover_and_send_commits() {
446 telemetry_subscribers::init_for_testing();
447 let num_authorities = 4;
448 let context = Arc::new(Context::new_for_test(num_authorities).0);
449 let mem_store = Arc::new(MemStore::new());
450 let dag_state = Arc::new(RwLock::new(DagState::new(
451 context.clone(),
452 mem_store.clone(),
453 )));
454 let transaction_vote_tracker = TransactionVoteTracker::new(
455 context.clone(),
456 Arc::new(NoopBlockVerifier {}),
457 dag_state.clone(),
458 );
459 let last_processed_commit_index = 0;
460 let (commit_consumer, mut commit_receiver) =
461 CommitConsumerArgs::new(0, last_processed_commit_index);
462
463 let mut observer = CommitObserver::new(
464 context.clone(),
465 commit_consumer,
466 dag_state.clone(),
467 transaction_vote_tracker.clone(),
468 )
469 .await;
470
471 let num_rounds = 10;
473 let mut builder = DagBuilder::new(context.clone());
474 builder
475 .layers(1..=num_rounds)
476 .build()
477 .persist_layers(dag_state.clone());
478 transaction_vote_tracker.add_voted_blocks(
479 builder
480 .all_blocks()
481 .iter()
482 .map(|b| (b.clone(), vec![]))
483 .collect(),
484 );
485
486 let leaders = builder
487 .leader_blocks(1..=num_rounds)
488 .into_iter()
489 .map(Option::unwrap)
490 .collect::<Vec<_>>();
491
492 let expected_last_processed_index: usize = 2;
495 let mut commits = observer
496 .handle_commit(leaders[..expected_last_processed_index].to_vec(), true)
497 .unwrap();
498
499 let mut processed_subdag_index = 0;
501 while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
502 tracing::info!("Processed {subdag}");
503 assert_eq!(subdag, commits[processed_subdag_index]);
504 processed_subdag_index = subdag.commit_ref.index as usize;
505 if processed_subdag_index == expected_last_processed_index {
506 break;
507 }
508 }
509 assert_eq!(processed_subdag_index, expected_last_processed_index);
510
511 verify_channel_empty(&mut commit_receiver).await;
512
513 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
515 assert_eq!(
516 last_commit.index(),
517 expected_last_processed_index as CommitIndex
518 );
519
520 commits.append(
524 &mut observer
525 .handle_commit(leaders[expected_last_processed_index..].to_vec(), true)
526 .unwrap(),
527 );
528
529 let expected_last_sent_index = num_rounds as usize;
530 while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
531 tracing::info!("{subdag} was sent but not processed by consumer");
532 assert_eq!(subdag, commits[processed_subdag_index]);
533 assert!(subdag.decided_with_local_blocks);
534 processed_subdag_index = subdag.commit_ref.index as usize;
535 if processed_subdag_index == expected_last_sent_index {
536 break;
537 }
538 }
539 assert_eq!(processed_subdag_index, expected_last_sent_index);
540
541 verify_channel_empty(&mut commit_receiver).await;
542
543 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
547 assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
548
549 {
551 let replay_after_commit_index = 2;
552 let consumer_last_processed_commit_index = 10;
553 let dag_state = Arc::new(RwLock::new(DagState::new(
554 context.clone(),
555 mem_store.clone(),
556 )));
557 let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
558 replay_after_commit_index,
559 consumer_last_processed_commit_index,
560 );
561 let _observer = CommitObserver::new(
562 context.clone(),
563 commit_consumer,
564 dag_state.clone(),
565 transaction_vote_tracker.clone(),
566 )
567 .await;
568
569 let mut processed_subdag_index = replay_after_commit_index;
570 while let Ok(Some(mut subdag)) =
571 timeout(Duration::from_secs(1), commit_receiver.recv()).await
572 {
573 tracing::info!("Received {subdag} on recovery");
574 assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
575 assert!(subdag.recovered_rejected_transactions);
576
577 subdag.recovered_rejected_transactions = false;
579 assert_eq!(subdag, commits[processed_subdag_index as usize]);
580
581 assert!(subdag.decided_with_local_blocks);
582 processed_subdag_index = subdag.commit_ref.index;
583 if processed_subdag_index == consumer_last_processed_commit_index {
584 break;
585 }
586 }
587 assert_eq!(processed_subdag_index, consumer_last_processed_commit_index);
588
589 verify_channel_empty(&mut commit_receiver).await;
590 }
591
592 {
594 let replay_after_commit_index = 10;
595 let consumer_last_processed_commit_index = 10;
596 let dag_state = Arc::new(RwLock::new(DagState::new(
597 context.clone(),
598 mem_store.clone(),
599 )));
600 let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
603 replay_after_commit_index,
604 consumer_last_processed_commit_index,
605 );
606 let _observer = CommitObserver::new(
607 context.clone(),
608 commit_consumer,
609 dag_state.clone(),
610 transaction_vote_tracker.clone(),
611 )
612 .await;
613
614 verify_channel_empty(&mut commit_receiver).await;
617 }
618
619 {
621 let replay_after_commit_index = 2;
622 let consumer_last_processed_commit_index = 4;
623 let dag_state = Arc::new(RwLock::new(DagState::new(
624 context.clone(),
625 mem_store.clone(),
626 )));
627 let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
628 replay_after_commit_index,
629 consumer_last_processed_commit_index,
630 );
631 let _observer = CommitObserver::new(
632 context.clone(),
633 commit_consumer,
634 dag_state.clone(),
635 transaction_vote_tracker.clone(),
636 )
637 .await;
638
639 let mut processed_subdag_index = replay_after_commit_index;
642 while let Ok(Some(subdag)) =
643 timeout(Duration::from_secs(1), commit_receiver.recv()).await
644 {
645 tracing::info!("Received {subdag} on recovery");
646 assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
647 assert!(subdag.decided_with_local_blocks);
648 processed_subdag_index = subdag.commit_ref.index;
649 if processed_subdag_index == expected_last_sent_index as CommitIndex {
650 break;
651 }
652 }
653 assert_eq!(
654 processed_subdag_index,
655 expected_last_sent_index as CommitIndex
656 );
657
658 verify_channel_empty(&mut commit_receiver).await;
659 }
660
661 {
665 let replay_after_commit_index = 2;
666 let consumer_last_processed_commit_index = 20;
667 let dag_state = Arc::new(RwLock::new(DagState::new(
668 context.clone(),
669 mem_store.clone(),
670 )));
671 let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
672 replay_after_commit_index,
673 consumer_last_processed_commit_index,
674 );
675 let _observer = CommitObserver::new(
676 context.clone(),
677 commit_consumer,
678 dag_state.clone(),
679 transaction_vote_tracker.clone(),
680 )
681 .await;
682
683 let mut processed_subdag_index = replay_after_commit_index;
686 while let Ok(Some(mut subdag)) =
687 timeout(Duration::from_secs(1), commit_receiver.recv()).await
688 {
689 tracing::info!("Received {subdag} on recovery");
690 assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
691 assert!(subdag.recovered_rejected_transactions);
692
693 subdag.recovered_rejected_transactions = false;
695 assert_eq!(subdag, commits[processed_subdag_index as usize]);
696
697 assert!(subdag.decided_with_local_blocks);
698 processed_subdag_index = subdag.commit_ref.index;
699 if processed_subdag_index == expected_last_sent_index as CommitIndex {
700 break;
701 }
702 }
703 assert_eq!(
704 processed_subdag_index,
705 expected_last_sent_index as CommitIndex
706 );
707 assert_eq!(10, expected_last_sent_index);
708
709 verify_channel_empty(&mut commit_receiver).await;
710 }
711 }
712
713 async fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
715 if let Ok(Some(_)) = timeout(Duration::from_secs(1), receiver.recv()).await {
716 panic!("Expected the consensus output channel to be empty, but found more subdags.")
717 }
718 }
719}