consensus_core/
commit_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use parking_lot::RwLock;
7use tokio::time::Instant;
8use tracing::info;
9
10use crate::{
11    CommitConsumerArgs, CommittedSubDag,
12    block::{BlockAPI, VerifiedBlock},
13    commit::{CommitAPI, load_committed_subdag_from_store},
14    commit_finalizer::{CommitFinalizer, CommitFinalizerHandle},
15    context::Context,
16    dag_state::DagState,
17    error::ConsensusResult,
18    linearizer::Linearizer,
19    storage::Store,
20    transaction_vote_tracker::TransactionVoteTracker,
21};
22
23/// Role of CommitObserver
24/// - Called by core when try_commit() returns newly committed leaders.
25/// - The newly committed leaders are sent to commit observer and then commit observer
26///   gets subdags for each leader via the commit interpreter (linearizer)
27/// - The committed subdags are sent as consensus output via an unbounded tokio channel.
28///
29/// There is no flow control on sending output. Consensus backpressure is applied earlier
30/// at consensus input level, and on commit sync.
31///
32/// Commit is persisted in store before the CommittedSubDag is sent to the commit handler.
33/// When Sui recovers, it blocks until the commits it knows about are recovered. So consensus
34/// must be able to quickly recover the commits it has sent to Sui.
35pub(crate) struct CommitObserver {
36    context: Arc<Context>,
37    dag_state: Arc<RwLock<DagState>>,
38    /// Persistent storage for blocks, commits and other consensus data.
39    store: Arc<dyn Store>,
40    transaction_vote_tracker: TransactionVoteTracker,
41    /// Component to deterministically collect subdags for committed leaders.
42    commit_interpreter: Linearizer,
43    /// Handle to an unbounded channel to send output commits.
44    commit_finalizer_handle: CommitFinalizerHandle,
45}
46
47impl CommitObserver {
48    pub(crate) async fn new(
49        context: Arc<Context>,
50        commit_consumer: CommitConsumerArgs,
51        dag_state: Arc<RwLock<DagState>>,
52        transaction_vote_tracker: TransactionVoteTracker,
53    ) -> Self {
54        let store = dag_state.read().store();
55        let commit_interpreter = Linearizer::new(context.clone(), dag_state.clone());
56        let commit_finalizer_handle = CommitFinalizer::start(
57            context.clone(),
58            dag_state.clone(),
59            transaction_vote_tracker.clone(),
60            commit_consumer.commit_sender.clone(),
61        );
62
63        let mut observer = Self {
64            context,
65            dag_state,
66            store,
67            transaction_vote_tracker,
68            commit_interpreter,
69            commit_finalizer_handle,
70        };
71        observer.recover_and_send_commits(&commit_consumer).await;
72
73        // Recover blocks needed for future commits (and block proposals).
74        // Some blocks might have been recovered as committed blocks in recover_and_send_commits().
75        // They will just be ignored.
76        tokio::runtime::Handle::current()
77            .spawn_blocking({
78                let transaction_vote_tracker = observer.transaction_vote_tracker.clone();
79                let gc_round = observer.dag_state.read().gc_round();
80                move || {
81                    transaction_vote_tracker.recover_blocks_after_round(gc_round);
82                }
83            })
84            .await
85            .expect("Spawn blocking should not fail");
86
87        observer
88    }
89
90    /// Creates and returns a list of committed subdags containing committed blocks, from a sequence
91    /// of selected leader blocks, and whether they come from local committer or commit sync remotely.
92    ///
93    /// Also, buffers the commits to DagState and forwards committed subdags to commit finalizer.
94    pub(crate) fn handle_commit(
95        &mut self,
96        committed_leaders: Vec<VerifiedBlock>,
97        local: bool,
98    ) -> ConsensusResult<Vec<CommittedSubDag>> {
99        let _s = self
100            .context
101            .metrics
102            .node_metrics
103            .scope_processing_time
104            .with_label_values(&["CommitObserver::handle_commit"])
105            .start_timer();
106
107        let mut committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
108        self.report_metrics(&committed_sub_dags);
109
110        // Set if the commit is produced from local DAG, or received through commit sync.
111        for subdag in committed_sub_dags.iter_mut() {
112            subdag.decided_with_local_blocks = local;
113        }
114
115        for commit in committed_sub_dags.iter() {
116            tracing::debug!(
117                "Sending commit {} leader {} to finalization and execution.",
118                commit.commit_ref,
119                commit.leader
120            );
121            tracing::trace!("Committed subdag: {:#?}", commit);
122            // Failures in sender.send() are assumed to be permanent
123            self.commit_finalizer_handle.send(commit.clone())?;
124        }
125
126        self.dag_state
127            .write()
128            .add_scoring_subdags(committed_sub_dags.clone());
129
130        Ok(committed_sub_dags)
131    }
132
133    async fn recover_and_send_commits(&mut self, commit_consumer: &CommitConsumerArgs) {
134        let now = Instant::now();
135
136        let replay_after_commit_index = commit_consumer.replay_after_commit_index;
137
138        let last_commit = self
139            .store
140            .read_last_commit()
141            .expect("Reading the last commit should not fail");
142        let Some(last_commit) = &last_commit else {
143            assert_eq!(
144                replay_after_commit_index, 0,
145                "Commit replay should start at the beginning if there is no commit history"
146            );
147            info!("Nothing to recover for commit observer - starting new epoch");
148            return;
149        };
150
151        let last_commit_index = last_commit.index();
152        if last_commit_index == replay_after_commit_index {
153            info!(
154                "Nothing to recover for commit observer - replay is requested immediately after last commit index {last_commit_index}"
155            );
156            return;
157        }
158        assert!(last_commit_index > replay_after_commit_index);
159
160        info!(
161            "Recovering commit observer in the range [{}..={last_commit_index}]",
162            replay_after_commit_index + 1,
163        );
164
165        // To avoid scanning too many commits at once and load in memory,
166        // we limit the batch size to 250 and iterate over.
167        const COMMIT_RECOVERY_BATCH_SIZE: u32 = if cfg!(test) { 3 } else { 250 };
168
169        let mut last_sent_commit_index = replay_after_commit_index;
170
171        // Make sure that there is no pending commits to be written to the store.
172        self.dag_state.read().ensure_commits_to_write_is_empty();
173
174        let mut seen_unfinalized_commit = false;
175        for start_index in (replay_after_commit_index + 1..=last_commit_index)
176            .step_by(COMMIT_RECOVERY_BATCH_SIZE as usize)
177        {
178            let end_index = start_index
179                .saturating_add(COMMIT_RECOVERY_BATCH_SIZE - 1)
180                .min(last_commit_index);
181
182            let unsent_commits = self
183                .store
184                .scan_commits((start_index..=end_index).into())
185                .expect("Scanning commits should not fail");
186            assert_eq!(
187                unsent_commits.len() as u32,
188                end_index.checked_sub(start_index).unwrap() + 1,
189                "Gap in scanned commits: start index: {start_index}, end index: {end_index}, commits: {:?}",
190                unsent_commits,
191            );
192
193            // Buffered unsent commits in DAG state which is required to contain them when they are flushed
194            // by CommitFinalizer.
195            self.dag_state
196                .write()
197                .recover_commits_to_write(unsent_commits.clone());
198
199            info!(
200                "Recovering {} unsent commits in range [{start_index}..={end_index}]",
201                unsent_commits.len()
202            );
203
204            // Resend all the committed subdags to the consensus output channel
205            // for all the commits above the last processed index.
206            for commit in unsent_commits.into_iter() {
207                // Commit index must be continuous.
208                last_sent_commit_index += 1;
209                assert_eq!(commit.index(), last_sent_commit_index);
210
211                let committed_sub_dag =
212                    load_committed_subdag_from_store(self.store.as_ref(), commit);
213
214                if !committed_sub_dag.recovered_rejected_transactions && !seen_unfinalized_commit {
215                    info!(
216                        "Starting to recover unfinalized commit from {}",
217                        committed_sub_dag.commit_ref
218                    );
219                    // When the commit has no associated storage entry for rejected transactions,
220                    // not even an empty set, the commit is unfinalized.
221                    seen_unfinalized_commit = true;
222                }
223
224                if seen_unfinalized_commit {
225                    // After observing the first unfinalized commit, the rest of recovered commits should all be unfinalized.
226                    assert!(!committed_sub_dag.recovered_rejected_transactions);
227                    // All unfinalized commit cannot be assumed to be decided with local blocks, because they
228                    // might have been received through commit sync.
229                    assert!(!committed_sub_dag.decided_with_local_blocks);
230                    // All unfinalized commits need to be processed by the CommitFinalizer, making it necessary to
231                    // recover and vote on the blocks in this commit.
232                    self.transaction_vote_tracker
233                        .recover_and_vote_on_blocks(committed_sub_dag.blocks.clone());
234                }
235
236                self.commit_finalizer_handle
237                    .send(committed_sub_dag)
238                    .unwrap();
239
240                self.context
241                    .metrics
242                    .node_metrics
243                    .commit_observer_last_recovered_commit_index
244                    .set(last_sent_commit_index as i64);
245
246                tokio::task::yield_now().await;
247            }
248        }
249
250        assert_eq!(
251            last_sent_commit_index, last_commit_index,
252            "We should have sent all commits up to the last commit {}",
253            last_commit_index
254        );
255
256        info!(
257            "Commit observer recovery [{}..={}] completed, took {:?}",
258            replay_after_commit_index + 1,
259            last_commit_index,
260            now.elapsed()
261        );
262    }
263
264    fn report_metrics(&self, committed: &[CommittedSubDag]) {
265        let metrics = &self.context.metrics.node_metrics;
266        let utc_now = self.context.clock.timestamp_utc_ms();
267
268        for commit in committed {
269            info!(
270                "Consensus commit {} with leader {} has {} blocks",
271                commit.commit_ref,
272                commit.leader,
273                commit.blocks.len()
274            );
275
276            metrics
277                .last_committed_leader_round
278                .set(commit.leader.round as i64);
279            metrics
280                .last_commit_index
281                .set(commit.commit_ref.index as i64);
282            metrics
283                .blocks_per_commit_count
284                .observe(commit.blocks.len() as f64);
285
286            for block in &commit.blocks {
287                let latency_ms = utc_now
288                    .checked_sub(block.timestamp_ms())
289                    .unwrap_or_default();
290                metrics
291                    .block_commit_latency
292                    .observe(Duration::from_millis(latency_ms).as_secs_f64());
293            }
294        }
295
296        self.context
297            .metrics
298            .node_metrics
299            .sub_dags_per_commit_count
300            .observe(committed.len() as f64);
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use consensus_types::block::BlockRef;
307    use mysten_metrics::monitored_mpsc::UnboundedReceiver;
308    use parking_lot::RwLock;
309    use rstest::rstest;
310    use tokio::time::timeout;
311
312    use super::*;
313    use crate::{
314        CommitIndex, block_verifier::NoopBlockVerifier, context::Context, dag_state::DagState,
315        linearizer::median_timestamp_by_stake, storage::mem_store::MemStore,
316        test_dag_builder::DagBuilder,
317    };
318
319    #[rstest]
320    #[tokio::test]
321    async fn test_handle_commit() {
322        telemetry_subscribers::init_for_testing();
323        let num_authorities = 4;
324        let (context, _keys) = Context::new_for_test(num_authorities);
325        let context = Arc::new(context);
326
327        let mem_store = Arc::new(MemStore::new());
328        let dag_state = Arc::new(RwLock::new(DagState::new(
329            context.clone(),
330            mem_store.clone(),
331        )));
332        let last_processed_commit_index = 0;
333        let (commit_consumer, mut commit_receiver) =
334            CommitConsumerArgs::new(0, last_processed_commit_index);
335        let transaction_vote_tracker = TransactionVoteTracker::new(
336            context.clone(),
337            Arc::new(NoopBlockVerifier {}),
338            dag_state.clone(),
339        );
340
341        let mut observer = CommitObserver::new(
342            context.clone(),
343            commit_consumer,
344            dag_state.clone(),
345            transaction_vote_tracker.clone(),
346        )
347        .await;
348
349        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
350        let num_rounds = 10;
351        let mut builder = DagBuilder::new(context.clone());
352        builder
353            .layers(1..=num_rounds)
354            .build()
355            .persist_layers(dag_state.clone());
356        transaction_vote_tracker.add_voted_blocks(
357            builder
358                .all_blocks()
359                .iter()
360                .map(|b| (b.clone(), vec![]))
361                .collect(),
362        );
363
364        let leaders = builder
365            .leader_blocks(1..=num_rounds)
366            .into_iter()
367            .map(Option::unwrap)
368            .collect::<Vec<_>>();
369
370        let commits = observer.handle_commit(leaders.clone(), true).unwrap();
371
372        // Check commits are returned by CommitObserver::handle_commit is accurate
373        let mut expected_stored_refs: Vec<BlockRef> = vec![];
374        for (idx, subdag) in commits.iter().enumerate() {
375            tracing::info!("{subdag:?}");
376            assert_eq!(subdag.leader, leaders[idx].reference());
377
378            let expected_ts = {
379                let block_refs = leaders[idx]
380                    .ancestors()
381                    .iter()
382                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
383                    .cloned()
384                    .collect::<Vec<_>>();
385                let blocks = dag_state
386                    .read()
387                    .get_blocks(&block_refs)
388                    .into_iter()
389                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
390                median_timestamp_by_stake(&context, blocks).unwrap()
391            };
392
393            let expected_ts = if idx == 0 {
394                expected_ts
395            } else {
396                expected_ts.max(commits[idx - 1].timestamp_ms)
397            };
398
399            assert_eq!(expected_ts, subdag.timestamp_ms);
400
401            if idx == 0 {
402                // First subdag includes the leader block plus all ancestor blocks
403                // of the leader minus the genesis round blocks
404                assert_eq!(subdag.blocks.len(), 1);
405            } else {
406                // Every subdag after will be missing the leader block from the previous
407                // committed subdag
408                assert_eq!(subdag.blocks.len(), num_authorities);
409            }
410            for block in subdag.blocks.iter() {
411                expected_stored_refs.push(block.reference());
412                assert!(block.round() <= leaders[idx].round());
413            }
414            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
415        }
416
417        // Check commits sent over consensus output channel is accurate
418        let mut processed_subdag_index = 0;
419        while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
420            assert_eq!(subdag, commits[processed_subdag_index]);
421            processed_subdag_index = subdag.commit_ref.index as usize;
422            if processed_subdag_index == leaders.len() {
423                break;
424            }
425        }
426        assert_eq!(processed_subdag_index, leaders.len());
427
428        verify_channel_empty(&mut commit_receiver).await;
429
430        // Check commits have been persisted to storage
431        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
432        assert_eq!(
433            last_commit.index(),
434            commits.last().unwrap().commit_ref.index
435        );
436        let all_stored_commits = mem_store
437            .scan_commits((0..=CommitIndex::MAX).into())
438            .unwrap();
439        assert_eq!(all_stored_commits.len(), leaders.len());
440        let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
441        assert!(blocks_existence.iter().all(|exists| *exists));
442    }
443
444    #[tokio::test]
445    async fn test_recover_and_send_commits() {
446        telemetry_subscribers::init_for_testing();
447        let num_authorities = 4;
448        let context = Arc::new(Context::new_for_test(num_authorities).0);
449        let mem_store = Arc::new(MemStore::new());
450        let dag_state = Arc::new(RwLock::new(DagState::new(
451            context.clone(),
452            mem_store.clone(),
453        )));
454        let transaction_vote_tracker = TransactionVoteTracker::new(
455            context.clone(),
456            Arc::new(NoopBlockVerifier {}),
457            dag_state.clone(),
458        );
459        let last_processed_commit_index = 0;
460        let (commit_consumer, mut commit_receiver) =
461            CommitConsumerArgs::new(0, last_processed_commit_index);
462
463        let mut observer = CommitObserver::new(
464            context.clone(),
465            commit_consumer,
466            dag_state.clone(),
467            transaction_vote_tracker.clone(),
468        )
469        .await;
470
471        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
472        let num_rounds = 10;
473        let mut builder = DagBuilder::new(context.clone());
474        builder
475            .layers(1..=num_rounds)
476            .build()
477            .persist_layers(dag_state.clone());
478        transaction_vote_tracker.add_voted_blocks(
479            builder
480                .all_blocks()
481                .iter()
482                .map(|b| (b.clone(), vec![]))
483                .collect(),
484        );
485
486        let leaders = builder
487            .leader_blocks(1..=num_rounds)
488            .into_iter()
489            .map(Option::unwrap)
490            .collect::<Vec<_>>();
491
492        // Commit first batch of leaders (2) and "receive" the subdags as the
493        // consumer of the consensus output channel.
494        let expected_last_processed_index: usize = 2;
495        let mut commits = observer
496            .handle_commit(leaders[..expected_last_processed_index].to_vec(), true)
497            .unwrap();
498
499        // Check commits sent over consensus output channel is accurate
500        let mut processed_subdag_index = 0;
501        while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
502            tracing::info!("Processed {subdag}");
503            assert_eq!(subdag, commits[processed_subdag_index]);
504            processed_subdag_index = subdag.commit_ref.index as usize;
505            if processed_subdag_index == expected_last_processed_index {
506                break;
507            }
508        }
509        assert_eq!(processed_subdag_index, expected_last_processed_index);
510
511        verify_channel_empty(&mut commit_receiver).await;
512
513        // Check last stored commit is correct
514        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
515        assert_eq!(
516            last_commit.index(),
517            expected_last_processed_index as CommitIndex
518        );
519
520        // Handle next batch of leaders (10 - 2 = 8), these will be sent by consensus but not
521        // "processed" by consensus output channel. Simulating something happened on
522        // the consumer side where the commits were not persisted.
523        commits.append(
524            &mut observer
525                .handle_commit(leaders[expected_last_processed_index..].to_vec(), true)
526                .unwrap(),
527        );
528
529        let expected_last_sent_index = num_rounds as usize;
530        while let Ok(Some(subdag)) = timeout(Duration::from_secs(1), commit_receiver.recv()).await {
531            tracing::info!("{subdag} was sent but not processed by consumer");
532            assert_eq!(subdag, commits[processed_subdag_index]);
533            assert!(subdag.decided_with_local_blocks);
534            processed_subdag_index = subdag.commit_ref.index as usize;
535            if processed_subdag_index == expected_last_sent_index {
536                break;
537            }
538        }
539        assert_eq!(processed_subdag_index, expected_last_sent_index);
540
541        verify_channel_empty(&mut commit_receiver).await;
542
543        // Check last stored commit is correct. We should persist the last commit
544        // that was sent over the channel regardless of how the consumer handled
545        // the commit on their end.
546        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
547        assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
548
549        // Replay commits after index 2. And use last processed index by consumer at 10, which is the last persisted commit.
550        {
551            let replay_after_commit_index = 2;
552            let consumer_last_processed_commit_index = 10;
553            let dag_state = Arc::new(RwLock::new(DagState::new(
554                context.clone(),
555                mem_store.clone(),
556            )));
557            let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
558                replay_after_commit_index,
559                consumer_last_processed_commit_index,
560            );
561            let _observer = CommitObserver::new(
562                context.clone(),
563                commit_consumer,
564                dag_state.clone(),
565                transaction_vote_tracker.clone(),
566            )
567            .await;
568
569            let mut processed_subdag_index = replay_after_commit_index;
570            while let Ok(Some(mut subdag)) =
571                timeout(Duration::from_secs(1), commit_receiver.recv()).await
572            {
573                tracing::info!("Received {subdag} on recovery");
574                assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
575                assert!(subdag.recovered_rejected_transactions);
576
577                // Allow comparison with committed subdag before recovery.
578                subdag.recovered_rejected_transactions = false;
579                assert_eq!(subdag, commits[processed_subdag_index as usize]);
580
581                assert!(subdag.decided_with_local_blocks);
582                processed_subdag_index = subdag.commit_ref.index;
583                if processed_subdag_index == consumer_last_processed_commit_index {
584                    break;
585                }
586            }
587            assert_eq!(processed_subdag_index, consumer_last_processed_commit_index);
588
589            verify_channel_empty(&mut commit_receiver).await;
590        }
591
592        // Replay commits from index 10, which is the last persisted commit.
593        {
594            let replay_after_commit_index = 10;
595            let consumer_last_processed_commit_index = 10;
596            let dag_state = Arc::new(RwLock::new(DagState::new(
597                context.clone(),
598                mem_store.clone(),
599            )));
600            // Re-create commit observer starting after index 10 which represents the
601            // last processed index from the consumer over consensus output channel
602            let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
603                replay_after_commit_index,
604                consumer_last_processed_commit_index,
605            );
606            let _observer = CommitObserver::new(
607                context.clone(),
608                commit_consumer,
609                dag_state.clone(),
610                transaction_vote_tracker.clone(),
611            )
612            .await;
613
614            // No commits should be resubmitted as consensus store's last commit index
615            // is equal to replay after index by consumer
616            verify_channel_empty(&mut commit_receiver).await;
617        }
618
619        // Replay commits after index 2. And use last processed index by consumer at 4, less than the last persisted commit.
620        {
621            let replay_after_commit_index = 2;
622            let consumer_last_processed_commit_index = 4;
623            let dag_state = Arc::new(RwLock::new(DagState::new(
624                context.clone(),
625                mem_store.clone(),
626            )));
627            let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
628                replay_after_commit_index,
629                consumer_last_processed_commit_index,
630            );
631            let _observer = CommitObserver::new(
632                context.clone(),
633                commit_consumer,
634                dag_state.clone(),
635                transaction_vote_tracker.clone(),
636            )
637            .await;
638
639            // Checks that commits up to expected_last_sent_index are recovered as finalized.
640            // The fact that they are finalized and have been recorded in the store.
641            let mut processed_subdag_index = replay_after_commit_index;
642            while let Ok(Some(subdag)) =
643                timeout(Duration::from_secs(1), commit_receiver.recv()).await
644            {
645                tracing::info!("Received {subdag} on recovery");
646                assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
647                assert!(subdag.decided_with_local_blocks);
648                processed_subdag_index = subdag.commit_ref.index;
649                if processed_subdag_index == expected_last_sent_index as CommitIndex {
650                    break;
651                }
652            }
653            assert_eq!(
654                processed_subdag_index,
655                expected_last_sent_index as CommitIndex
656            );
657
658            verify_channel_empty(&mut commit_receiver).await;
659        }
660
661        // Replay commits after index 2. And use last processed index by consumer at 20,
662        // which is greater than the last persisted commit.
663        // This allows removing from the store a suffix of commits which have not been part of certified checkpoints.
664        {
665            let replay_after_commit_index = 2;
666            let consumer_last_processed_commit_index = 20;
667            let dag_state = Arc::new(RwLock::new(DagState::new(
668                context.clone(),
669                mem_store.clone(),
670            )));
671            let (commit_consumer, mut commit_receiver) = CommitConsumerArgs::new(
672                replay_after_commit_index,
673                consumer_last_processed_commit_index,
674            );
675            let _observer = CommitObserver::new(
676                context.clone(),
677                commit_consumer,
678                dag_state.clone(),
679                transaction_vote_tracker.clone(),
680            )
681            .await;
682
683            // Check commits sent over consensus output channel is accurate starting
684            // from last processed index of 2 and finishing at last sent index of 10.
685            let mut processed_subdag_index = replay_after_commit_index;
686            while let Ok(Some(mut subdag)) =
687                timeout(Duration::from_secs(1), commit_receiver.recv()).await
688            {
689                tracing::info!("Received {subdag} on recovery");
690                assert_eq!(subdag.commit_ref.index, processed_subdag_index + 1);
691                assert!(subdag.recovered_rejected_transactions);
692
693                // Allow comparison with committed subdag before recovery.
694                subdag.recovered_rejected_transactions = false;
695                assert_eq!(subdag, commits[processed_subdag_index as usize]);
696
697                assert!(subdag.decided_with_local_blocks);
698                processed_subdag_index = subdag.commit_ref.index;
699                if processed_subdag_index == expected_last_sent_index as CommitIndex {
700                    break;
701                }
702            }
703            assert_eq!(
704                processed_subdag_index,
705                expected_last_sent_index as CommitIndex
706            );
707            assert_eq!(10, expected_last_sent_index);
708
709            verify_channel_empty(&mut commit_receiver).await;
710        }
711    }
712
713    /// After receiving all expected subdags, ensure channel is empty
714    async fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
715        if let Ok(Some(_)) = timeout(Duration::from_secs(1), receiver.recv()).await {
716            panic!("Expected the consensus output channel to be empty, but found more subdags.")
717        }
718    }
719}