consensus_core/
commit_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use parking_lot::RwLock;
7use tokio::time::Instant;
8use tracing::info;
9
10use crate::{
11    CommitConsumerArgs, CommittedSubDag,
12    block::{BlockAPI, VerifiedBlock},
13    commit::{CommitAPI, load_committed_subdag_from_store},
14    commit_finalizer::{CommitFinalizer, CommitFinalizerHandle},
15    context::Context,
16    dag_state::DagState,
17    error::ConsensusResult,
18    leader_schedule::LeaderSchedule,
19    linearizer::Linearizer,
20    storage::Store,
21    transaction_certifier::TransactionCertifier,
22};
23
24/// Role of CommitObserver
25/// - Called by core when try_commit() returns newly committed leaders.
26/// - The newly committed leaders are sent to commit observer and then commit observer
27///   gets subdags for each leader via the commit interpreter (linearizer)
28/// - The committed subdags are sent as consensus output via an unbounded tokio channel.
29///
30/// There is no flow control on sending output. Consensus backpressure is applied earlier
31/// at consensus input level, and on commit sync.
32///
33/// Commit is persisted in store before the CommittedSubDag is sent to the commit handler.
34/// When Sui recovers, it blocks until the commits it knows about are recovered. So consensus
35/// must be able to quickly recover the commits it has sent to Sui.
36pub(crate) struct CommitObserver {
37    context: Arc<Context>,
38    dag_state: Arc<RwLock<DagState>>,
39    /// Persistent storage for blocks, commits and other consensus data.
40    store: Arc<dyn Store>,
41    transaction_certifier: TransactionCertifier,
42    leader_schedule: Arc<LeaderSchedule>,
43    /// Component to deterministically collect subdags for committed leaders.
44    commit_interpreter: Linearizer,
45    /// Handle to an unbounded channel to send output commits.
46    commit_finalizer_handle: CommitFinalizerHandle,
47}
48
49impl CommitObserver {
50    pub(crate) async fn new(
51        context: Arc<Context>,
52        commit_consumer: CommitConsumerArgs,
53        dag_state: Arc<RwLock<DagState>>,
54        transaction_certifier: TransactionCertifier,
55        leader_schedule: Arc<LeaderSchedule>,
56    ) -> Self {
57        let store = dag_state.read().store();
58        let commit_interpreter = Linearizer::new(context.clone(), dag_state.clone());
59        let commit_finalizer_handle = CommitFinalizer::start(
60            context.clone(),
61            dag_state.clone(),
62            transaction_certifier.clone(),
63            commit_consumer.commit_sender.clone(),
64        );
65
66        let mut observer = Self {
67            context,
68            dag_state,
69            store,
70            transaction_certifier,
71            leader_schedule,
72            commit_interpreter,
73            commit_finalizer_handle,
74        };
75        observer.recover_and_send_commits(&commit_consumer).await;
76
77        // Recover blocks needed for future commits (and block proposals).
78        // Some blocks might have been recovered as committed blocks in recover_and_send_commits().
79        // They will just be ignored.
80        tokio::runtime::Handle::current()
81            .spawn_blocking({
82                let transaction_certifier = observer.transaction_certifier.clone();
83                let gc_round = observer.dag_state.read().gc_round();
84                move || {
85                    transaction_certifier.recover_blocks_after_round(gc_round);
86                }
87            })
88            .await
89            .expect("Spawn blocking should not fail");
90
91        observer
92    }
93
94    /// Creates and returns a list of committed subdags containing committed blocks, from a sequence
95    /// of selected leader blocks, and whether they come from local committer or commit sync remotely.
96    ///
97    /// Also, buffers the commits to DagState and forwards committed subdags to commit finalizer.
98    pub(crate) fn handle_commit(
99        &mut self,
100        committed_leaders: Vec<VerifiedBlock>,
101        local: bool,
102    ) -> ConsensusResult<Vec<CommittedSubDag>> {
103        let _s = self
104            .context
105            .metrics
106            .node_metrics
107            .scope_processing_time
108            .with_label_values(&["CommitObserver::handle_commit"])
109            .start_timer();
110
111        let mut committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
112        self.report_metrics(&committed_sub_dags);
113
114        // Set if the commit is produced from local DAG, or received through commit sync.
115        for subdag in committed_sub_dags.iter_mut() {
116            subdag.decided_with_local_blocks = local;
117        }
118
119        // Send scores as part of the first sub dag, if the leader schedule has been updated.
120        let schedule_updated = self
121            .leader_schedule
122            .leader_schedule_updated(&self.dag_state);
123        if schedule_updated {
124            let reputation_scores_desc = self
125                .leader_schedule
126                .leader_swap_table
127                .read()
128                .reputation_scores_desc
129                .clone();
130            committed_sub_dags[0].reputation_scores_desc = reputation_scores_desc;
131        }
132
133        for commit in committed_sub_dags.iter() {
134            tracing::debug!(
135                "Sending commit {} leader {} to finalization and execution.",
136                commit.commit_ref,
137                commit.leader
138            );
139            tracing::trace!("Committed subdag: {:#?}", commit);
140            // Failures in sender.send() are assumed to be permanent
141            self.commit_finalizer_handle.send(commit.clone())?;
142        }
143
144        self.dag_state
145            .write()
146            .add_scoring_subdags(committed_sub_dags.clone());
147
148        Ok(committed_sub_dags)
149    }
150
151    async fn recover_and_send_commits(&mut self, commit_consumer: &CommitConsumerArgs) {
152        let now = Instant::now();
153
154        let replay_after_commit_index = commit_consumer.replay_after_commit_index;
155
156        let last_commit = self
157            .store
158            .read_last_commit()
159            .expect("Reading the last commit should not fail");
160        let Some(last_commit) = &last_commit else {
161            assert_eq!(
162                replay_after_commit_index, 0,
163                "Commit replay should start at the beginning if there is no commit history"
164            );
165            info!("Nothing to recover for commit observer - starting new epoch");
166            return;
167        };
168
169        let last_commit_index = last_commit.index();
170        if last_commit_index == replay_after_commit_index {
171            info!(
172                "Nothing to recover for commit observer - replay is requested immediately after last commit index {last_commit_index}"
173            );
174            return;
175        }
176        assert!(last_commit_index > replay_after_commit_index);
177
178        info!(
179            "Recovering commit observer in the range [{}..={last_commit_index}]",
180            replay_after_commit_index + 1,
181        );
182
183        // To avoid scanning too many commits at once and load in memory,
184        // we limit the batch size to 250 and iterate over.
185        const COMMIT_RECOVERY_BATCH_SIZE: u32 = if cfg!(test) { 3 } else { 250 };
186
187        let mut last_sent_commit_index = replay_after_commit_index;
188
189        // Make sure that there is no pending commits to be written to the store.
190        self.dag_state.read().ensure_commits_to_write_is_empty();
191
192        let mut seen_unfinalized_commit = false;
193        for start_index in (replay_after_commit_index + 1..=last_commit_index)
194            .step_by(COMMIT_RECOVERY_BATCH_SIZE as usize)
195        {
196            let end_index = start_index
197                .saturating_add(COMMIT_RECOVERY_BATCH_SIZE - 1)
198                .min(last_commit_index);
199
200            let unsent_commits = self
201                .store
202                .scan_commits((start_index..=end_index).into())
203                .expect("Scanning commits should not fail");
204            assert_eq!(
205                unsent_commits.len() as u32,
206                end_index.checked_sub(start_index).unwrap() + 1,
207                "Gap in scanned commits: start index: {start_index}, end index: {end_index}, commits: {:?}",
208                unsent_commits,
209            );
210
211            // Buffered unsent commits in DAG state which is required to contain them when they are flushed
212            // by CommitFinalizer.
213            self.dag_state
214                .write()
215                .recover_commits_to_write(unsent_commits.clone());
216
217            info!(
218                "Recovering {} unsent commits in range [{start_index}..={end_index}]",
219                unsent_commits.len()
220            );
221
222            // Resend all the committed subdags to the consensus output channel
223            // for all the commits above the last processed index.
224            for commit in unsent_commits.into_iter() {
225                // Commit index must be continuous.
226                last_sent_commit_index += 1;
227                assert_eq!(commit.index(), last_sent_commit_index);
228
229                // On recovery leader schedule will be updated with the current scores
230                // and the scores will be passed along with the last commit of this recovered batch sent to
231                // Sui so that the current scores are available for submission.
232                let reputation_scores = if commit.index() == last_commit_index {
233                    self.leader_schedule
234                        .leader_swap_table
235                        .read()
236                        .reputation_scores_desc
237                        .clone()
238                } else {
239                    vec![]
240                };
241
242                let committed_sub_dag = load_committed_subdag_from_store(
243                    &self.context,
244                    self.store.as_ref(),
245                    commit,
246                    reputation_scores,
247                );
248
249                if !committed_sub_dag.recovered_rejected_transactions && !seen_unfinalized_commit {
250                    info!(
251                        "Starting to recover unfinalized commit from {}",
252                        committed_sub_dag.commit_ref
253                    );
254                    // When the commit has no associated storage entry for rejected transactions,
255                    // not even an empty set, the commit is unfinalized.
256                    seen_unfinalized_commit = true;
257                }
258
259                if seen_unfinalized_commit {
260                    // After observing the first unfinalized commit, the rest of recovered commits should all be unfinalized.
261                    assert!(!committed_sub_dag.recovered_rejected_transactions);
262                    // All unfinalized commit cannot be assumed to be decided with local blocks, because they
263                    // might have been received through commit sync.
264                    assert!(!committed_sub_dag.decided_with_local_blocks);
265                    // All unfinalized commits need to be processed by the CommitFinalizer, making it necessary to
266                    // recover and vote on the blocks in this commit.
267                    self.transaction_certifier
268                        .recover_and_vote_on_blocks(committed_sub_dag.blocks.clone());
269                }
270
271                self.commit_finalizer_handle
272                    .send(committed_sub_dag)
273                    .unwrap();
274
275                self.context
276                    .metrics
277                    .node_metrics
278                    .commit_observer_last_recovered_commit_index
279                    .set(last_sent_commit_index as i64);
280
281                tokio::task::yield_now().await;
282            }
283        }
284
285        assert_eq!(
286            last_sent_commit_index, last_commit_index,
287            "We should have sent all commits up to the last commit {}",
288            last_commit_index
289        );
290
291        info!(
292            "Commit observer recovery [{}..={}] completed, took {:?}",
293            replay_after_commit_index + 1,
294            last_commit_index,
295            now.elapsed()
296        );
297    }
298
299    fn report_metrics(&self, committed: &[CommittedSubDag]) {
300        let metrics = &self.context.metrics.node_metrics;
301        let utc_now = self.context.clock.timestamp_utc_ms();
302
303        for commit in committed {
304            info!(
305                "Consensus commit {} with leader {} has {} blocks",
306                commit.commit_ref,
307                commit.leader,
308                commit.blocks.len()
309            );
310
311            metrics
312                .last_committed_leader_round
313                .set(commit.leader.round as i64);
314            metrics
315                .last_commit_index
316                .set(commit.commit_ref.index as i64);
317            metrics
318                .blocks_per_commit_count
319                .observe(commit.blocks.len() as f64);
320
321            for block in &commit.blocks {
322                let latency_ms = utc_now
323                    .checked_sub(block.timestamp_ms())
324                    .unwrap_or_default();
325                metrics
326                    .block_commit_latency
327                    .observe(Duration::from_millis(latency_ms).as_secs_f64());
328            }
329        }
330
331        self.context
332            .metrics
333            .node_metrics
334            .sub_dags_per_commit_count
335            .observe(committed.len() as f64);
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use consensus_config::AuthorityIndex;
342    use consensus_types::block::BlockRef;
343    use mysten_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
344    use parking_lot::RwLock;
345    use rstest::rstest;
346    use tokio::time::timeout;
347
348    use super::*;
349    use crate::{
350        CommitIndex, block_verifier::NoopBlockVerifier, context::Context, dag_state::DagState,
351        linearizer::median_timestamp_by_stake, storage::mem_store::MemStore,
352        test_dag_builder::DagBuilder,
353    };
354
355    #[rstest]
356    #[tokio::test]
357    async fn test_handle_commit() {
358        use crate::leader_schedule::LeaderSwapTable;
359
360        telemetry_subscribers::init_for_testing();
361        let num_authorities = 4;
362        let (context, _keys) = Context::new_for_test(num_authorities);
363        let context = Arc::new(context);
364
365        let mem_store = Arc::new(MemStore::new());
366        let dag_state = Arc::new(RwLock::new(DagState::new(
367            context.clone(),
368            mem_store.clone(),
369        )));
370        let last_processed_commit_index = 0;
371        let (commit_consumer, mut commit_receiver) =
372            CommitConsumerArgs::new(0, last_processed_commit_index);
373        let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
374        let transaction_certifier = TransactionCertifier::new(
375            context.clone(),
376            Arc::new(NoopBlockVerifier {}),
377            dag_state.clone(),
378            blocks_sender,
379        );
380        const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 5;
381        let leader_schedule = Arc::new(
382            LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
383                .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
384        );
385
386        let mut observer = CommitObserver::new(
387            context.clone(),
388            commit_consumer,
389            dag_state.clone(),
390            transaction_certifier.clone(),
391            leader_schedule.clone(),
392        )
393        .await;
394
395        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
396        let num_rounds = 10;
397        let mut builder = DagBuilder::new(context.clone());
398        builder
399            .layers(1..=num_rounds)
400            .build()
401            .persist_layers(dag_state.clone());
402        transaction_certifier.add_voted_blocks(
403            builder
404                .all_blocks()
405                .iter()
406                .map(|b| (b.clone(), vec![]))
407                .collect(),
408        );
409
410        let leaders = builder
411            .leader_blocks(1..=num_rounds)
412            .into_iter()
413            .map(Option::unwrap)
414            .collect::<Vec<_>>();
415
416        // Commit first 5 leaders.
417        let mut commits = observer
418            .handle_commit(leaders[0..5].to_vec(), true)
419            .unwrap();
420
421        // Trigger a leader schedule update.
422        leader_schedule.update_leader_schedule_v2(&dag_state);
423
424        // Commit the next 5 leaders.
425        commits.extend(observer.handle_commit(leaders[5..].to_vec(), true).unwrap());
426
427        // Check commits are returned by CommitObserver::handle_commit is accurate
428        let mut expected_stored_refs: Vec<BlockRef> = vec![];
429        for (idx, subdag) in commits.iter().enumerate() {
430            tracing::info!("{subdag:?}");
431            assert_eq!(subdag.leader, leaders[idx].reference());
432
433            // 5th subdag should contain the updated scores.
434            if idx == 5 {
435                let scores = vec![
436                    (AuthorityIndex::new_for_test(1), 9),
437                    (AuthorityIndex::new_for_test(3), 9),
438                    (AuthorityIndex::new_for_test(0), 9),
439                    (AuthorityIndex::new_for_test(2), 9),
440                ];
441                assert_eq!(subdag.reputation_scores_desc, scores);
442            } else {
443                assert!(subdag.reputation_scores_desc.is_empty());
444            }
445
446            let expected_ts = {
447                let block_refs = leaders[idx]
448                    .ancestors()
449                    .iter()
450                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
451                    .cloned()
452                    .collect::<Vec<_>>();
453                let blocks = dag_state
454                    .read()
455                    .get_blocks(&block_refs)
456                    .into_iter()
457                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
458                median_timestamp_by_stake(&context, blocks).unwrap()
459            };
460
461            let expected_ts = if idx == 0 {
462                expected_ts
463            } else {
464                expected_ts.max(commits[idx - 1].timestamp_ms)
465            };
466
467            assert_eq!(expected_ts, subdag.timestamp_ms);
468
469            if idx == 0 {
470                // First subdag includes the leader block plus all ancestor blocks
471                // of the leader minus the genesis round blocks
472                assert_eq!(subdag.blocks.len(), 1);
473            } else {
474                // Every subdag after will be missing the leader block from the previous
475                // committed subdag
476                assert_eq!(subdag.blocks.len(), num_authorities);
477            }
478            for block in subdag.blocks.iter() {
479                expected_stored_refs.push(block.reference());
480                assert!(block.round() <= leaders[idx].round());
481            }
482            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
483        }
484
485        // Check commits sent over consensus output channel is accurate
486        let mut processed_subdag_index = 0;
487        while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
488            assert_eq!(subdag, commits[processed_subdag_index]);
489            processed_subdag_index = subdag.commit_ref.index as usize;
490            if processed_subdag_index == leaders.len() {
491                break;
492            }
493        }
494        assert_eq!(processed_subdag_index, leaders.len());
495
496        verify_channel_empty(&mut commit_receiver).await;
497
498        // Check commits have been persisted to storage
499        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
500        assert_eq!(
501            last_commit.index(),
502            commits.last().unwrap().commit_ref.index
503        );
504        let all_stored_commits = mem_store
505            .scan_commits((0..=CommitIndex::MAX).into())
506            .unwrap();
507        assert_eq!(all_stored_commits.len(), leaders.len());
508        let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
509        assert!(blocks_existence.iter().all(|exists| *exists));
510    }
511
512    #[tokio::test]
513    async fn test_recover_and_send_commits() {
514        telemetry_subscribers::init_for_testing();
515        let num_authorities = 4;
516        let context = Arc::new(Context::new_for_test(num_authorities).0);
517        let mem_store = Arc::new(MemStore::new());
518        let dag_state = Arc::new(RwLock::new(DagState::new(
519            context.clone(),
520            mem_store.clone(),
521        )));
522        let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
523        let transaction_certifier = TransactionCertifier::new(
524            context.clone(),
525            Arc::new(NoopBlockVerifier {}),
526            dag_state.clone(),
527            blocks_sender,
528        );
529        let last_processed_commit_index = 0;
530        let (commit_consumer, mut commit_receiver) =
531            CommitConsumerArgs::new(0, last_processed_commit_index);
532        let leader_schedule = Arc::new(LeaderSchedule::from_store(
533            context.clone(),
534            dag_state.clone(),
535        ));
536
537        let mut observer = CommitObserver::new(
538            context.clone(),
539            commit_consumer,
540            dag_state.clone(),
541            transaction_certifier.clone(),
542            leader_schedule.clone(),
543        )
544        .await;
545
546        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
547        let num_rounds = 10;
548        let mut builder = DagBuilder::new(context.clone());
549        builder
550            .layers(1..=num_rounds)
551            .build()
552            .persist_layers(dag_state.clone());
553        transaction_certifier.add_voted_blocks(
554            builder
555                .all_blocks()
556                .iter()
557                .map(|b| (b.clone(), vec![]))
558                .collect(),
559        );
560
561        let leaders = builder
562            .leader_blocks(1..=num_rounds)
563            .into_iter()
564            .map(Option::unwrap)
565            .collect::<Vec<_>>();
566
567        // Commit first batch of leaders (2) and "receive" the subdags as the
568        // consumer of the consensus output channel.
569        let expected_last_processed_index: usize = 2;
570        let mut commits = observer
571            .handle_commit(leaders[..expected_last_processed_index].to_vec(), true)
572            .unwrap();
573
574        // Check commits sent over consensus output channel is accurate
575        let mut processed_subdag_index = 0;
576        while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
577            tracing::info!("Processed {subdag}");
578            assert_eq!(subdag, commits[processed_subdag_index]);
579            assert_eq!(subdag.reputation_scores_desc, vec![]);
580            processed_subdag_index = subdag.commit_ref.index as usize;
581            if processed_subdag_index == expected_last_processed_index {
582                break;
583            }
584        }
585        assert_eq!(processed_subdag_index, expected_last_processed_index);
586
587        verify_channel_empty(&mut commit_receiver).await;
588
589        // Check last stored commit is correct
590        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
591        assert_eq!(
592            last_commit.index(),
593            expected_last_processed_index as CommitIndex
594        );
595
596        // Handle next batch of leaders (10 - 2 = 8), these will be sent by consensus but not
597        // "processed" by consensus output channel. Simulating something happened on
598        // the consumer side where the commits were not persisted.
599        commits.append(
600            &mut observer
601                .handle_commit(leaders[expected_last_processed_index..].to_vec(), true)
602                .unwrap(),
603        );
604
605        let expected_last_sent_index = num_rounds as usize;
606        while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
607            tracing::info!("{subdag} was sent but not processed by consumer");
608            assert_eq!(subdag, commits[processed_subdag_index]);
609            assert!(subdag.decided_with_local_blocks);
610            assert_eq!(subdag.reputation_scores_desc, vec![]);
611            processed_subdag_index = subdag.commit_ref.index as usize;
612            if processed_subdag_index == expected_last_sent_index {
613                break;
614            }
615        }
616        assert_eq!(processed_subdag_index, expected_last_sent_index);
617
618        verify_channel_empty(&mut commit_receiver).await;
619
620        // Check last stored commit is correct. We should persist the last commit
621        // that was sent over the channel regardless of how the consumer handled
622        // the commit on their end.
623        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
624        assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
625
626        // Replay commits after index 2. And use last processed index by consumer at 10, which is the last persisted commit.
627        {
628            let replay_after_commit_index = 2;
629            let consumer_last_processed_commit_index = 10;
630            let dag_state = Arc::new(RwLock::new(DagState::new(
631                context.clone(),
632                mem_store.clone(),
633            )));
634            let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
635                replay_after_commit_index,
636                consumer_last_processed_commit_index,
637            );
638            let _observer = CommitObserver::new(
639                context.clone(),
640                commit_consumer,
641                dag_state.clone(),
642                transaction_certifier.clone(),
643                leader_schedule.clone(),
644            )
645            .await;
646
647            let mut processed_subdag_index = replay_after_commit_index;
648            while let Ok(Some(mut subdag)) =
649                timeout(Duration::from_secs(1), commit_receiver.recv()).await
650            {
651                tracing::info!("Received {subdag} on recovery");
652                assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
653                assert!(subdag.recovered_rejected_transactions);
654
655                // Allow comparison with committed subdag before recovery.
656                subdag.recovered_rejected_transactions = false;
657                assert_eq!(subdag, commits[processed_subdag_index as usize]);
658
659                assert!(subdag.decided_with_local_blocks);
660                assert_eq!(subdag.reputation_scores_desc, vec![]);
661                processed_subdag_index = subdag.commit_ref.index;
662                if processed_subdag_index == consumer_last_processed_commit_index {
663                    break;
664                }
665            }
666            assert_eq!(processed_subdag_index, consumer_last_processed_commit_index);
667
668            verify_channel_empty(&mut commit_receiver).await;
669        }
670
671        // Replay commits from index 10, which is the last persisted commit.
672        {
673            let replay_after_commit_index = 10;
674            let consumer_last_processed_commit_index = 10;
675            let dag_state = Arc::new(RwLock::new(DagState::new(
676                context.clone(),
677                mem_store.clone(),
678            )));
679            // Re-create commit observer starting after index 10 which represents the
680            // last processed index from the consumer over consensus output channel
681            let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
682                replay_after_commit_index,
683                consumer_last_processed_commit_index,
684            );
685            let _observer = CommitObserver::new(
686                context.clone(),
687                commit_consumer,
688                dag_state.clone(),
689                transaction_certifier.clone(),
690                leader_schedule.clone(),
691            )
692            .await;
693
694            // No commits should be resubmitted as consensus store's last commit index
695            // is equal to replay after index by consumer
696            verify_channel_empty(&mut commit_receiver).await;
697        }
698
699        // Replay commits after index 2. And use last processed index by consumer at 4, less than the last persisted commit.
700        {
701            let replay_after_commit_index = 2;
702            let consumer_last_processed_commit_index = 4;
703            let dag_state = Arc::new(RwLock::new(DagState::new(
704                context.clone(),
705                mem_store.clone(),
706            )));
707            let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
708                replay_after_commit_index,
709                consumer_last_processed_commit_index,
710            );
711            let _observer = CommitObserver::new(
712                context.clone(),
713                commit_consumer,
714                dag_state.clone(),
715                transaction_certifier.clone(),
716                leader_schedule.clone(),
717            )
718            .await;
719
720            // Checks that commits up to expected_last_sent_index are recovered as finalized.
721            // The fact that they are finalized and have been recorded in the store.
722            let mut processed_subdag_index = replay_after_commit_index;
723            while let Ok(Some(subdag)) =
724                timeout(Duration::from_secs(1), commit_receiver.recv()).await
725            {
726                tracing::info!("Received {subdag} on recovery");
727                assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
728                assert!(subdag.decided_with_local_blocks);
729                assert_eq!(subdag.reputation_scores_desc, vec![]);
730                processed_subdag_index = subdag.commit_ref.index;
731                if processed_subdag_index == expected_last_sent_index as CommitIndex {
732                    break;
733                }
734            }
735            assert_eq!(
736                processed_subdag_index,
737                expected_last_sent_index as CommitIndex
738            );
739
740            verify_channel_empty(&mut commit_receiver).await;
741        }
742
743        // Replay commits after index 2. And use last processed index by consumer at 20,
744        // which is greater than the last persisted commit.
745        // This allows removing from the store a suffix of commits which have not been part of certified checkpoints.
746        {
747            let replay_after_commit_index = 2;
748            let consumer_last_processed_commit_index = 20;
749            let dag_state = Arc::new(RwLock::new(DagState::new(
750                context.clone(),
751                mem_store.clone(),
752            )));
753            let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
754                replay_after_commit_index,
755                consumer_last_processed_commit_index,
756            );
757            let _observer = CommitObserver::new(
758                context.clone(),
759                commit_consumer,
760                dag_state.clone(),
761                transaction_certifier.clone(),
762                leader_schedule.clone(),
763            )
764            .await;
765
766            // Check commits sent over consensus output channel is accurate starting
767            // from last processed index of 2 and finishing at last sent index of 10.
768            let mut processed_subdag_index = replay_after_commit_index;
769            while let Ok(Some(mut subdag)) =
770                timeout(Duration::from_secs(1), commit_receiver.recv()).await
771            {
772                tracing::info!("Received {subdag} on recovery");
773                assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
774                assert!(subdag.recovered_rejected_transactions);
775
776                // Allow comparison with committed subdag before recovery.
777                subdag.recovered_rejected_transactions = false;
778                assert_eq!(subdag, commits[processed_subdag_index as usize]);
779
780                assert!(subdag.decided_with_local_blocks);
781                assert_eq!(subdag.reputation_scores_desc, vec![]);
782                processed_subdag_index = subdag.commit_ref.index;
783                if processed_subdag_index == expected_last_sent_index as CommitIndex {
784                    break;
785                }
786            }
787            assert_eq!(
788                processed_subdag_index,
789                expected_last_sent_index as CommitIndex
790            );
791            assert_eq!(10, expected_last_sent_index);
792
793            verify_channel_empty(&mut commit_receiver).await;
794        }
795    }
796
797    /// After receiving all expected subdags, ensure channel is empty
798    async fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
799        if let Ok(Some(_)) = timeout(Duration::from_secs(1), receiver.recv()).await {
800            panic!("Expected the consensus output channel to be empty, but found more subdags.")
801        }
802    }
803}