consensus_core/
commit_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use parking_lot::RwLock;
7use tokio::time::Instant;
8use tracing::info;
9
10use crate::{
11    CommitConsumerArgs, CommittedSubDag,
12    block::{BlockAPI, VerifiedBlock},
13    commit::{CommitAPI, load_committed_subdag_from_store},
14    commit_finalizer::{CommitFinalizer, CommitFinalizerHandle},
15    context::Context,
16    dag_state::DagState,
17    error::ConsensusResult,
18    leader_schedule::LeaderSchedule,
19    linearizer::Linearizer,
20    storage::Store,
21    transaction_certifier::TransactionCertifier,
22};
23
24/// Role of CommitObserver
25/// - Called by core when try_commit() returns newly committed leaders.
26/// - The newly committed leaders are sent to commit observer and then commit observer
27///   gets subdags for each leader via the commit interpreter (linearizer)
28/// - The committed subdags are sent as consensus output via an unbounded tokio channel.
29///
30/// There is no flow control on sending output. Consensus backpressure is applied earlier
31/// at consensus input level, and on commit sync.
32///
33/// Commit is persisted in store before the CommittedSubDag is sent to the commit handler.
34/// When Sui recovers, it blocks until the commits it knows about are recovered. So consensus
35/// must be able to quickly recover the commits it has sent to Sui.
36pub(crate) struct CommitObserver {
37    context: Arc<Context>,
38    dag_state: Arc<RwLock<DagState>>,
39    /// Persistent storage for blocks, commits and other consensus data.
40    store: Arc<dyn Store>,
41    transaction_certifier: TransactionCertifier,
42    leader_schedule: Arc<LeaderSchedule>,
43    /// Component to deterministically collect subdags for committed leaders.
44    commit_interpreter: Linearizer,
45    /// Handle to an unbounded channel to send output commits.
46    commit_finalizer_handle: CommitFinalizerHandle,
47}
48
49impl CommitObserver {
50    pub(crate) async fn new(
51        context: Arc<Context>,
52        commit_consumer: CommitConsumerArgs,
53        dag_state: Arc<RwLock<DagState>>,
54        transaction_certifier: TransactionCertifier,
55        leader_schedule: Arc<LeaderSchedule>,
56    ) -> Self {
57        let store = dag_state.read().store();
58        let commit_interpreter = Linearizer::new(context.clone(), dag_state.clone());
59        let commit_finalizer_handle = CommitFinalizer::start(
60            context.clone(),
61            dag_state.clone(),
62            transaction_certifier.clone(),
63            commit_consumer.commit_sender.clone(),
64        );
65
66        let mut observer = Self {
67            context,
68            dag_state,
69            store,
70            transaction_certifier,
71            leader_schedule,
72            commit_interpreter,
73            commit_finalizer_handle,
74        };
75        observer.recover_and_send_commits(&commit_consumer).await;
76
77        // Recover blocks needed for future commits (and block proposals).
78        // Some blocks might have been recovered as committed blocks in recover_and_send_commits().
79        // They will just be ignored.
80        tokio::runtime::Handle::current()
81            .spawn_blocking({
82                let transaction_certifier = observer.transaction_certifier.clone();
83                let gc_round = observer.dag_state.read().gc_round();
84                move || {
85                    transaction_certifier.recover_blocks_after_round(gc_round);
86                }
87            })
88            .await
89            .expect("Spawn blocking should not fail");
90
91        observer
92    }
93
94    /// Creates and returns a list of committed subdags containing committed blocks, from a sequence
95    /// of selected leader blocks, and whether they come from local committer or commit sync remotely.
96    ///
97    /// Also, buffers the commits to DagState and forwards committed subdags to commit finalizer.
98    pub(crate) fn handle_commit(
99        &mut self,
100        committed_leaders: Vec<VerifiedBlock>,
101        local: bool,
102    ) -> ConsensusResult<Vec<CommittedSubDag>> {
103        let _s = self
104            .context
105            .metrics
106            .node_metrics
107            .scope_processing_time
108            .with_label_values(&["CommitObserver::handle_commit"])
109            .start_timer();
110
111        let mut committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
112        self.report_metrics(&committed_sub_dags);
113
114        // Set if the commit is produced from local DAG, or received through commit sync.
115        for subdag in committed_sub_dags.iter_mut() {
116            subdag.decided_with_local_blocks = local;
117        }
118
119        // Send scores as part of the first sub dag, if the leader schedule has been updated.
120        let schedule_updated = self
121            .leader_schedule
122            .leader_schedule_updated(&self.dag_state);
123        if schedule_updated {
124            let reputation_scores_desc = self
125                .leader_schedule
126                .leader_swap_table
127                .read()
128                .reputation_scores_desc
129                .clone();
130            committed_sub_dags[0].reputation_scores_desc = reputation_scores_desc;
131        }
132
133        for commit in committed_sub_dags.iter() {
134            tracing::debug!(
135                "Sending commit {} leader {} to finalization and execution.",
136                commit.commit_ref,
137                commit.leader
138            );
139            tracing::trace!("Committed subdag: {:#?}", commit);
140            // Failures in sender.send() are assumed to be permanent
141            self.commit_finalizer_handle.send(commit.clone())?;
142        }
143
144        self.dag_state
145            .write()
146            .add_scoring_subdags(committed_sub_dags.clone());
147
148        Ok(committed_sub_dags)
149    }
150
151    async fn recover_and_send_commits(&mut self, commit_consumer: &CommitConsumerArgs) {
152        let now = Instant::now();
153
154        let replay_after_commit_index = commit_consumer.replay_after_commit_index;
155
156        let last_commit = self
157            .store
158            .read_last_commit()
159            .expect("Reading the last commit should not fail");
160        let Some(last_commit) = &last_commit else {
161            assert_eq!(
162                replay_after_commit_index, 0,
163                "Commit replay should start at the beginning if there is no commit history"
164            );
165            info!("Nothing to recover for commit observer - starting new epoch");
166            return;
167        };
168
169        let last_commit_index = last_commit.index();
170        if last_commit_index == replay_after_commit_index {
171            info!(
172                "Nothing to recover for commit observer - replay is requested immediately after last commit index {last_commit_index}"
173            );
174            return;
175        }
176        assert!(last_commit_index > replay_after_commit_index);
177
178        info!(
179            "Recovering commit observer in the range [{}..={last_commit_index}]",
180            replay_after_commit_index + 1,
181        );
182
183        // To avoid scanning too many commits at once and load in memory,
184        // we limit the batch size to 250 and iterate over.
185        const COMMIT_RECOVERY_BATCH_SIZE: u32 = if cfg!(test) { 3 } else { 250 };
186
187        let mut last_sent_commit_index = replay_after_commit_index;
188
189        // Make sure that there is no pending commits to be written to the store.
190        self.dag_state.read().ensure_commits_to_write_is_empty();
191
192        let mut seen_unfinalized_commit = false;
193        for start_index in (replay_after_commit_index + 1..=last_commit_index)
194            .step_by(COMMIT_RECOVERY_BATCH_SIZE as usize)
195        {
196            let end_index = start_index
197                .saturating_add(COMMIT_RECOVERY_BATCH_SIZE - 1)
198                .min(last_commit_index);
199
200            let unsent_commits = self
201                .store
202                .scan_commits((start_index..=end_index).into())
203                .expect("Scanning commits should not fail");
204            assert_eq!(
205                unsent_commits.len() as u32,
206                end_index.checked_sub(start_index).unwrap() + 1,
207                "Gap in scanned commits: start index: {start_index}, end index: {end_index}, commits: {:?}",
208                unsent_commits,
209            );
210
211            // Buffered unsent commits in DAG state which is required to contain them when they are flushed
212            // by CommitFinalizer.
213            self.dag_state
214                .write()
215                .recover_commits_to_write(unsent_commits.clone());
216
217            info!(
218                "Recovering {} unsent commits in range [{start_index}..={end_index}]",
219                unsent_commits.len()
220            );
221
222            // Resend all the committed subdags to the consensus output channel
223            // for all the commits above the last processed index.
224            for commit in unsent_commits.into_iter() {
225                // Commit index must be continuous.
226                last_sent_commit_index += 1;
227                assert_eq!(commit.index(), last_sent_commit_index);
228
229                // On recovery leader schedule will be updated with the current scores
230                // and the scores will be passed along with the last commit of this recovered batch sent to
231                // Sui so that the current scores are available for submission.
232                let reputation_scores = if commit.index() == last_commit_index {
233                    self.leader_schedule
234                        .leader_swap_table
235                        .read()
236                        .reputation_scores_desc
237                        .clone()
238                } else {
239                    vec![]
240                };
241
242                let committed_sub_dag = load_committed_subdag_from_store(
243                    self.store.as_ref(),
244                    commit,
245                    reputation_scores,
246                );
247
248                if !committed_sub_dag.recovered_rejected_transactions && !seen_unfinalized_commit {
249                    info!(
250                        "Starting to recover unfinalized commit from {}",
251                        committed_sub_dag.commit_ref
252                    );
253                    // When the commit has no associated storage entry for rejected transactions,
254                    // not even an empty set, the commit is unfinalized.
255                    seen_unfinalized_commit = true;
256                }
257
258                if seen_unfinalized_commit {
259                    // After observing the first unfinalized commit, the rest of recovered commits should all be unfinalized.
260                    assert!(!committed_sub_dag.recovered_rejected_transactions);
261                    // All unfinalized commit cannot be assumed to be decided with local blocks, because they
262                    // might have been received through commit sync.
263                    assert!(!committed_sub_dag.decided_with_local_blocks);
264                    // All unfinalized commits need to be processed by the CommitFinalizer, making it necessary to
265                    // recover and vote on the blocks in this commit.
266                    self.transaction_certifier
267                        .recover_and_vote_on_blocks(committed_sub_dag.blocks.clone());
268                }
269
270                self.commit_finalizer_handle
271                    .send(committed_sub_dag)
272                    .unwrap();
273
274                self.context
275                    .metrics
276                    .node_metrics
277                    .commit_observer_last_recovered_commit_index
278                    .set(last_sent_commit_index as i64);
279
280                tokio::task::yield_now().await;
281            }
282        }
283
284        assert_eq!(
285            last_sent_commit_index, last_commit_index,
286            "We should have sent all commits up to the last commit {}",
287            last_commit_index
288        );
289
290        info!(
291            "Commit observer recovery [{}..={}] completed, took {:?}",
292            replay_after_commit_index + 1,
293            last_commit_index,
294            now.elapsed()
295        );
296    }
297
298    fn report_metrics(&self, committed: &[CommittedSubDag]) {
299        let metrics = &self.context.metrics.node_metrics;
300        let utc_now = self.context.clock.timestamp_utc_ms();
301
302        for commit in committed {
303            info!(
304                "Consensus commit {} with leader {} has {} blocks",
305                commit.commit_ref,
306                commit.leader,
307                commit.blocks.len()
308            );
309
310            metrics
311                .last_committed_leader_round
312                .set(commit.leader.round as i64);
313            metrics
314                .last_commit_index
315                .set(commit.commit_ref.index as i64);
316            metrics
317                .blocks_per_commit_count
318                .observe(commit.blocks.len() as f64);
319
320            for block in &commit.blocks {
321                let latency_ms = utc_now
322                    .checked_sub(block.timestamp_ms())
323                    .unwrap_or_default();
324                metrics
325                    .block_commit_latency
326                    .observe(Duration::from_millis(latency_ms).as_secs_f64());
327            }
328        }
329
330        self.context
331            .metrics
332            .node_metrics
333            .sub_dags_per_commit_count
334            .observe(committed.len() as f64);
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use consensus_config::AuthorityIndex;
341    use consensus_types::block::BlockRef;
342    use mysten_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
343    use parking_lot::RwLock;
344    use rstest::rstest;
345    use tokio::time::timeout;
346
347    use super::*;
348    use crate::{
349        CommitIndex, block_verifier::NoopBlockVerifier, context::Context, dag_state::DagState,
350        linearizer::median_timestamp_by_stake, storage::mem_store::MemStore,
351        test_dag_builder::DagBuilder,
352    };
353
354    #[rstest]
355    #[tokio::test]
356    async fn test_handle_commit() {
357        use crate::leader_schedule::LeaderSwapTable;
358
359        telemetry_subscribers::init_for_testing();
360        let num_authorities = 4;
361        let (context, _keys) = Context::new_for_test(num_authorities);
362        let context = Arc::new(context);
363
364        let mem_store = Arc::new(MemStore::new());
365        let dag_state = Arc::new(RwLock::new(DagState::new(
366            context.clone(),
367            mem_store.clone(),
368        )));
369        let last_processed_commit_index = 0;
370        let (commit_consumer, mut commit_receiver, _transaction_receiver) =
371            CommitConsumerArgs::new(0, last_processed_commit_index);
372        let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
373        let transaction_certifier = TransactionCertifier::new(
374            context.clone(),
375            Arc::new(NoopBlockVerifier {}),
376            dag_state.clone(),
377            blocks_sender,
378        );
379        const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 5;
380        let leader_schedule = Arc::new(
381            LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
382                .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
383        );
384
385        let mut observer = CommitObserver::new(
386            context.clone(),
387            commit_consumer,
388            dag_state.clone(),
389            transaction_certifier.clone(),
390            leader_schedule.clone(),
391        )
392        .await;
393
394        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
395        let num_rounds = 10;
396        let mut builder = DagBuilder::new(context.clone());
397        builder
398            .layers(1..=num_rounds)
399            .build()
400            .persist_layers(dag_state.clone());
401        transaction_certifier.add_voted_blocks(
402            builder
403                .all_blocks()
404                .iter()
405                .map(|b| (b.clone(), vec![]))
406                .collect(),
407        );
408
409        let leaders = builder
410            .leader_blocks(1..=num_rounds)
411            .into_iter()
412            .map(Option::unwrap)
413            .collect::<Vec<_>>();
414
415        // Commit first 5 leaders.
416        let mut commits = observer
417            .handle_commit(leaders[0..5].to_vec(), true)
418            .unwrap();
419
420        // Trigger a leader schedule update.
421        leader_schedule.update_leader_schedule_v2(&dag_state);
422
423        // Commit the next 5 leaders.
424        commits.extend(observer.handle_commit(leaders[5..].to_vec(), true).unwrap());
425
426        // Check commits are returned by CommitObserver::handle_commit is accurate
427        let mut expected_stored_refs: Vec<BlockRef> = vec![];
428        for (idx, subdag) in commits.iter().enumerate() {
429            tracing::info!("{subdag:?}");
430            assert_eq!(subdag.leader, leaders[idx].reference());
431
432            // 5th subdag should contain the updated scores.
433            if idx == 5 {
434                let scores = vec![
435                    (AuthorityIndex::new_for_test(1), 9),
436                    (AuthorityIndex::new_for_test(3), 9),
437                    (AuthorityIndex::new_for_test(0), 9),
438                    (AuthorityIndex::new_for_test(2), 9),
439                ];
440                assert_eq!(subdag.reputation_scores_desc, scores);
441            } else {
442                assert!(subdag.reputation_scores_desc.is_empty());
443            }
444
445            let expected_ts = {
446                let block_refs = leaders[idx]
447                    .ancestors()
448                    .iter()
449                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
450                    .cloned()
451                    .collect::<Vec<_>>();
452                let blocks = dag_state
453                    .read()
454                    .get_blocks(&block_refs)
455                    .into_iter()
456                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
457                median_timestamp_by_stake(&context, blocks).unwrap()
458            };
459
460            let expected_ts = if idx == 0 {
461                expected_ts
462            } else {
463                expected_ts.max(commits[idx - 1].timestamp_ms)
464            };
465
466            assert_eq!(expected_ts, subdag.timestamp_ms);
467
468            if idx == 0 {
469                // First subdag includes the leader block plus all ancestor blocks
470                // of the leader minus the genesis round blocks
471                assert_eq!(subdag.blocks.len(), 1);
472            } else {
473                // Every subdag after will be missing the leader block from the previous
474                // committed subdag
475                assert_eq!(subdag.blocks.len(), num_authorities);
476            }
477            for block in subdag.blocks.iter() {
478                expected_stored_refs.push(block.reference());
479                assert!(block.round() <= leaders[idx].round());
480            }
481            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
482        }
483
484        // Check commits sent over consensus output channel is accurate
485        let mut processed_subdag_index = 0;
486        while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
487            assert_eq!(subdag, commits[processed_subdag_index]);
488            processed_subdag_index = subdag.commit_ref.index as usize;
489            if processed_subdag_index == leaders.len() {
490                break;
491            }
492        }
493        assert_eq!(processed_subdag_index, leaders.len());
494
495        verify_channel_empty(&mut commit_receiver).await;
496
497        // Check commits have been persisted to storage
498        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
499        assert_eq!(
500            last_commit.index(),
501            commits.last().unwrap().commit_ref.index
502        );
503        let all_stored_commits = mem_store
504            .scan_commits((0..=CommitIndex::MAX).into())
505            .unwrap();
506        assert_eq!(all_stored_commits.len(), leaders.len());
507        let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
508        assert!(blocks_existence.iter().all(|exists| *exists));
509    }
510
511    #[tokio::test]
512    async fn test_recover_and_send_commits() {
513        telemetry_subscribers::init_for_testing();
514        let num_authorities = 4;
515        let context = Arc::new(Context::new_for_test(num_authorities).0);
516        let mem_store = Arc::new(MemStore::new());
517        let dag_state = Arc::new(RwLock::new(DagState::new(
518            context.clone(),
519            mem_store.clone(),
520        )));
521        let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
522        let transaction_certifier = TransactionCertifier::new(
523            context.clone(),
524            Arc::new(NoopBlockVerifier {}),
525            dag_state.clone(),
526            blocks_sender,
527        );
528        let last_processed_commit_index = 0;
529        let (commit_consumer, mut commit_receiver, _transaction_receiver) =
530            CommitConsumerArgs::new(0, last_processed_commit_index);
531        let leader_schedule = Arc::new(LeaderSchedule::from_store(
532            context.clone(),
533            dag_state.clone(),
534        ));
535
536        let mut observer = CommitObserver::new(
537            context.clone(),
538            commit_consumer,
539            dag_state.clone(),
540            transaction_certifier.clone(),
541            leader_schedule.clone(),
542        )
543        .await;
544
545        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
546        let num_rounds = 10;
547        let mut builder = DagBuilder::new(context.clone());
548        builder
549            .layers(1..=num_rounds)
550            .build()
551            .persist_layers(dag_state.clone());
552        transaction_certifier.add_voted_blocks(
553            builder
554                .all_blocks()
555                .iter()
556                .map(|b| (b.clone(), vec![]))
557                .collect(),
558        );
559
560        let leaders = builder
561            .leader_blocks(1..=num_rounds)
562            .into_iter()
563            .map(Option::unwrap)
564            .collect::<Vec<_>>();
565
566        // Commit first batch of leaders (2) and "receive" the subdags as the
567        // consumer of the consensus output channel.
568        let expected_last_processed_index: usize = 2;
569        let mut commits = observer
570            .handle_commit(leaders[..expected_last_processed_index].to_vec(), true)
571            .unwrap();
572
573        // Check commits sent over consensus output channel is accurate
574        let mut processed_subdag_index = 0;
575        while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
576            tracing::info!("Processed {subdag}");
577            assert_eq!(subdag, commits[processed_subdag_index]);
578            assert_eq!(subdag.reputation_scores_desc, vec![]);
579            processed_subdag_index = subdag.commit_ref.index as usize;
580            if processed_subdag_index == expected_last_processed_index {
581                break;
582            }
583        }
584        assert_eq!(processed_subdag_index, expected_last_processed_index);
585
586        verify_channel_empty(&mut commit_receiver).await;
587
588        // Check last stored commit is correct
589        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
590        assert_eq!(
591            last_commit.index(),
592            expected_last_processed_index as CommitIndex
593        );
594
595        // Handle next batch of leaders (10 - 2 = 8), these will be sent by consensus but not
596        // "processed" by consensus output channel. Simulating something happened on
597        // the consumer side where the commits were not persisted.
598        commits.append(
599            &mut observer
600                .handle_commit(leaders[expected_last_processed_index..].to_vec(), true)
601                .unwrap(),
602        );
603
604        let expected_last_sent_index = num_rounds as usize;
605        while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
606            tracing::info!("{subdag} was sent but not processed by consumer");
607            assert_eq!(subdag, commits[processed_subdag_index]);
608            assert!(subdag.decided_with_local_blocks);
609            assert_eq!(subdag.reputation_scores_desc, vec![]);
610            processed_subdag_index = subdag.commit_ref.index as usize;
611            if processed_subdag_index == expected_last_sent_index {
612                break;
613            }
614        }
615        assert_eq!(processed_subdag_index, expected_last_sent_index);
616
617        verify_channel_empty(&mut commit_receiver).await;
618
619        // Check last stored commit is correct. We should persist the last commit
620        // that was sent over the channel regardless of how the consumer handled
621        // the commit on their end.
622        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
623        assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
624
625        // Replay commits after index 2. And use last processed index by consumer at 10, which is the last persisted commit.
626        {
627            let replay_after_commit_index = 2;
628            let consumer_last_processed_commit_index = 10;
629            let dag_state = Arc::new(RwLock::new(DagState::new(
630                context.clone(),
631                mem_store.clone(),
632            )));
633            let (commit_consumer, mut commit_receiver, _transaction_receiver) =
634                CommitConsumerArgs::new(
635                    replay_after_commit_index,
636                    consumer_last_processed_commit_index,
637                );
638            let _observer = CommitObserver::new(
639                context.clone(),
640                commit_consumer,
641                dag_state.clone(),
642                transaction_certifier.clone(),
643                leader_schedule.clone(),
644            )
645            .await;
646
647            let mut processed_subdag_index = replay_after_commit_index;
648            while let Ok(Some(mut subdag)) =
649                timeout(Duration::from_secs(1), commit_receiver.recv()).await
650            {
651                tracing::info!("Received {subdag} on recovery");
652                assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
653                assert!(subdag.recovered_rejected_transactions);
654
655                // Allow comparison with committed subdag before recovery.
656                subdag.recovered_rejected_transactions = false;
657                assert_eq!(subdag, commits[processed_subdag_index as usize]);
658
659                assert!(subdag.decided_with_local_blocks);
660                assert_eq!(subdag.reputation_scores_desc, vec![]);
661                processed_subdag_index = subdag.commit_ref.index;
662                if processed_subdag_index == consumer_last_processed_commit_index {
663                    break;
664                }
665            }
666            assert_eq!(processed_subdag_index, consumer_last_processed_commit_index);
667
668            verify_channel_empty(&mut commit_receiver).await;
669        }
670
671        // Replay commits from index 10, which is the last persisted commit.
672        {
673            let replay_after_commit_index = 10;
674            let consumer_last_processed_commit_index = 10;
675            let dag_state = Arc::new(RwLock::new(DagState::new(
676                context.clone(),
677                mem_store.clone(),
678            )));
679            // Re-create commit observer starting after index 10 which represents the
680            // last processed index from the consumer over consensus output channel
681            let (commit_consumer, mut commit_receiver, _transaction_receiver) =
682                CommitConsumerArgs::new(
683                    replay_after_commit_index,
684                    consumer_last_processed_commit_index,
685                );
686            let _observer = CommitObserver::new(
687                context.clone(),
688                commit_consumer,
689                dag_state.clone(),
690                transaction_certifier.clone(),
691                leader_schedule.clone(),
692            )
693            .await;
694
695            // No commits should be resubmitted as consensus store's last commit index
696            // is equal to replay after index by consumer
697            verify_channel_empty(&mut commit_receiver).await;
698        }
699
700        // Replay commits after index 2. And use last processed index by consumer at 4, less than the last persisted commit.
701        {
702            let replay_after_commit_index = 2;
703            let consumer_last_processed_commit_index = 4;
704            let dag_state = Arc::new(RwLock::new(DagState::new(
705                context.clone(),
706                mem_store.clone(),
707            )));
708            let (commit_consumer, mut commit_receiver, _transaction_receiver) =
709                CommitConsumerArgs::new(
710                    replay_after_commit_index,
711                    consumer_last_processed_commit_index,
712                );
713            let _observer = CommitObserver::new(
714                context.clone(),
715                commit_consumer,
716                dag_state.clone(),
717                transaction_certifier.clone(),
718                leader_schedule.clone(),
719            )
720            .await;
721
722            // Checks that commits up to expected_last_sent_index are recovered as finalized.
723            // The fact that they are finalized and have been recorded in the store.
724            let mut processed_subdag_index = replay_after_commit_index;
725            while let Ok(Some(subdag)) =
726                timeout(Duration::from_secs(1), commit_receiver.recv()).await
727            {
728                tracing::info!("Received {subdag} on recovery");
729                assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
730                assert!(subdag.decided_with_local_blocks);
731                assert_eq!(subdag.reputation_scores_desc, vec![]);
732                processed_subdag_index = subdag.commit_ref.index;
733                if processed_subdag_index == expected_last_sent_index as CommitIndex {
734                    break;
735                }
736            }
737            assert_eq!(
738                processed_subdag_index,
739                expected_last_sent_index as CommitIndex
740            );
741
742            verify_channel_empty(&mut commit_receiver).await;
743        }
744
745        // Replay commits after index 2. And use last processed index by consumer at 20,
746        // which is greater than the last persisted commit.
747        // This allows removing from the store a suffix of commits which have not been part of certified checkpoints.
748        {
749            let replay_after_commit_index = 2;
750            let consumer_last_processed_commit_index = 20;
751            let dag_state = Arc::new(RwLock::new(DagState::new(
752                context.clone(),
753                mem_store.clone(),
754            )));
755            let (commit_consumer, mut commit_receiver, _transaction_receiver) =
756                CommitConsumerArgs::new(
757                    replay_after_commit_index,
758                    consumer_last_processed_commit_index,
759                );
760            let _observer = CommitObserver::new(
761                context.clone(),
762                commit_consumer,
763                dag_state.clone(),
764                transaction_certifier.clone(),
765                leader_schedule.clone(),
766            )
767            .await;
768
769            // Check commits sent over consensus output channel is accurate starting
770            // from last processed index of 2 and finishing at last sent index of 10.
771            let mut processed_subdag_index = replay_after_commit_index;
772            while let Ok(Some(mut subdag)) =
773                timeout(Duration::from_secs(1), commit_receiver.recv()).await
774            {
775                tracing::info!("Received {subdag} on recovery");
776                assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
777                assert!(subdag.recovered_rejected_transactions);
778
779                // Allow comparison with committed subdag before recovery.
780                subdag.recovered_rejected_transactions = false;
781                assert_eq!(subdag, commits[processed_subdag_index as usize]);
782
783                assert!(subdag.decided_with_local_blocks);
784                assert_eq!(subdag.reputation_scores_desc, vec![]);
785                processed_subdag_index = subdag.commit_ref.index;
786                if processed_subdag_index == expected_last_sent_index as CommitIndex {
787                    break;
788                }
789            }
790            assert_eq!(
791                processed_subdag_index,
792                expected_last_sent_index as CommitIndex
793            );
794            assert_eq!(10, expected_last_sent_index);
795
796            verify_channel_empty(&mut commit_receiver).await;
797        }
798    }
799
800    /// After receiving all expected subdags, ensure channel is empty
801    async fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
802        if let Ok(Some(_)) = timeout(Duration::from_secs(1), receiver.recv()).await {
803            panic!("Expected the consensus output channel to be empty, but found more subdags.")
804        }
805    }
806}