consensus_core/
commit_consumer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use mysten_metrics::monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
7use tokio::sync::watch;
8use tracing::debug;
9
10use crate::{CommitIndex, CommittedSubDag, block::CertifiedBlocksOutput};
11
12/// Arguments from commit consumer to this consensus instance.
13/// This includes both parameters and components for communications.
14#[derive(Clone)]
15pub struct CommitConsumerArgs {
16    /// The consumer requests consensus to replay from commit replay_after_commit_index + 1.
17    /// Set to 0 to replay from the start (as commit sequence starts at index = 1).
18    pub(crate) replay_after_commit_index: CommitIndex,
19    /// Index of the last commit that the consumer has processed.  This is useful during
20    /// crash recovery when other components can wait for the consumer to finish processing
21    /// up to this index.
22    pub(crate) consumer_last_processed_commit_index: CommitIndex,
23
24    /// A channel to output the committed sub dags.
25    pub(crate) commit_sender: UnboundedSender<CommittedSubDag>,
26    /// A channel to output blocks for processing, separated from consensus commits.
27    /// In each block output, transactions that are not rejected are considered certified.
28    pub(crate) block_sender: UnboundedSender<CertifiedBlocksOutput>,
29    // Allows the commit consumer to report its progress.
30    monitor: Arc<CommitConsumerMonitor>,
31}
32
33impl CommitConsumerArgs {
34    pub fn new(
35        replay_after_commit_index: CommitIndex,
36        consumer_last_processed_commit_index: CommitIndex,
37    ) -> (Self, UnboundedReceiver<CommittedSubDag>) {
38        let (commit_sender, commit_receiver) = unbounded_channel("consensus_commit_output");
39        let (block_sender, _block_receiver) = unbounded_channel("consensus_block_output");
40
41        let monitor = Arc::new(CommitConsumerMonitor::new(
42            replay_after_commit_index,
43            consumer_last_processed_commit_index,
44        ));
45        (
46            Self {
47                replay_after_commit_index,
48                consumer_last_processed_commit_index,
49                commit_sender,
50                block_sender,
51                monitor,
52            },
53            commit_receiver,
54        )
55    }
56
57    pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
58        self.monitor.clone()
59    }
60}
61
62/// Helps monitor the progress of the consensus commit handler (consumer).
63///
64/// This component currently has two use usages:
65/// 1. Checking the highest commit index processed by the consensus commit handler.
66///    Consensus components can decide to wait for more commits to be processed before proceeding with
67///    their work.
68/// 2. Waiting for consensus commit handler to finish processing replayed commits.
69///    Current usage is actually outside of consensus.
70pub struct CommitConsumerMonitor {
71    // highest commit that has been handled by the consumer.
72    highest_handled_commit: watch::Sender<u32>,
73
74    // At node startup, the last consensus commit processed by the commit consumer from the previous run.
75    // This can be 0 if starting a new epoch.
76    consumer_last_processed_commit_index: CommitIndex,
77}
78
79impl CommitConsumerMonitor {
80    pub(crate) fn new(
81        replay_after_commit_index: CommitIndex,
82        consumer_last_processed_commit_index: CommitIndex,
83    ) -> Self {
84        Self {
85            highest_handled_commit: watch::Sender::new(replay_after_commit_index),
86            consumer_last_processed_commit_index,
87        }
88    }
89
90    /// Gets the highest commit index processed by the consensus commit handler.
91    pub fn highest_handled_commit(&self) -> CommitIndex {
92        *self.highest_handled_commit.borrow()
93    }
94
95    /// Updates the highest commit index processed by the consensus commit handler.
96    pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
97        debug!("Highest handled commit set to {}", highest_handled_commit);
98        self.highest_handled_commit
99            .send_replace(highest_handled_commit);
100    }
101
102    /// Waits for consensus to replay commits until the consumer last processed commit index.
103    pub async fn replay_to_consumer_last_processed_commit_complete(&self) {
104        let mut rx = self.highest_handled_commit.subscribe();
105        loop {
106            let highest_handled = *rx.borrow_and_update();
107            if highest_handled >= self.consumer_last_processed_commit_index {
108                return;
109            }
110            rx.changed().await.unwrap();
111        }
112    }
113}
114
115#[cfg(test)]
116mod test {
117    use crate::CommitConsumerMonitor;
118
119    #[test]
120    fn test_commit_consumer_monitor() {
121        let monitor = CommitConsumerMonitor::new(0, 10);
122        assert_eq!(monitor.highest_handled_commit(), 0);
123
124        monitor.set_highest_handled_commit(100);
125        assert_eq!(monitor.highest_handled_commit(), 100);
126    }
127}