consensus_core/
commit_consumer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use mysten_metrics::monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
7use tokio::sync::watch;
8use tracing::debug;
9
10use crate::{CommitIndex, CommittedSubDag, block::CertifiedBlocksOutput};
11
12/// Arguments from commit consumer to this consensus instance.
13/// This includes both parameters and components for communications.
14#[derive(Clone)]
15pub struct CommitConsumerArgs {
16    /// The consumer requests consensus to replay from commit replay_after_commit_index + 1.
17    /// Set to 0 to replay from the start (as commit sequence starts at index = 1).
18    pub(crate) replay_after_commit_index: CommitIndex,
19    /// Index of the last commit that the consumer has processed.  This is useful during
20    /// crash recovery when other components can wait for the consumer to finish processing
21    /// up to this index.
22    pub(crate) consumer_last_processed_commit_index: CommitIndex,
23
24    /// A channel to output the committed sub dags.
25    pub(crate) commit_sender: UnboundedSender<CommittedSubDag>,
26    /// A channel to output blocks for processing, separated from consensus commits.
27    /// In each block output, transactions that are not rejected are considered certified.
28    pub(crate) block_sender: UnboundedSender<CertifiedBlocksOutput>,
29    // Allows the commit consumer to report its progress.
30    monitor: Arc<CommitConsumerMonitor>,
31}
32
33impl CommitConsumerArgs {
34    pub fn new(
35        replay_after_commit_index: CommitIndex,
36        consumer_last_processed_commit_index: CommitIndex,
37    ) -> (
38        Self,
39        UnboundedReceiver<CommittedSubDag>,
40        UnboundedReceiver<CertifiedBlocksOutput>,
41    ) {
42        let (commit_sender, commit_receiver) = unbounded_channel("consensus_commit_output");
43        let (block_sender, block_receiver) = unbounded_channel("consensus_block_output");
44
45        let monitor = Arc::new(CommitConsumerMonitor::new(
46            replay_after_commit_index,
47            consumer_last_processed_commit_index,
48        ));
49        (
50            Self {
51                replay_after_commit_index,
52                consumer_last_processed_commit_index,
53                commit_sender,
54                block_sender,
55                monitor,
56            },
57            commit_receiver,
58            block_receiver,
59        )
60    }
61
62    pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
63        self.monitor.clone()
64    }
65}
66
67/// Helps monitor the progress of the consensus commit handler (consumer).
68///
69/// This component currently has two use usages:
70/// 1. Checking the highest commit index processed by the consensus commit handler.
71///    Consensus components can decide to wait for more commits to be processed before proceeding with
72///    their work.
73/// 2. Waiting for consensus commit handler to finish processing replayed commits.
74///    Current usage is actually outside of consensus.
75pub struct CommitConsumerMonitor {
76    // highest commit that has been handled by the consumer.
77    highest_handled_commit: watch::Sender<u32>,
78
79    // At node startup, the last consensus commit processed by the commit consumer from the previous run.
80    // This can be 0 if starting a new epoch.
81    consumer_last_processed_commit_index: CommitIndex,
82}
83
84impl CommitConsumerMonitor {
85    pub(crate) fn new(
86        replay_after_commit_index: CommitIndex,
87        consumer_last_processed_commit_index: CommitIndex,
88    ) -> Self {
89        Self {
90            highest_handled_commit: watch::Sender::new(replay_after_commit_index),
91            consumer_last_processed_commit_index,
92        }
93    }
94
95    /// Gets the highest commit index processed by the consensus commit handler.
96    pub fn highest_handled_commit(&self) -> CommitIndex {
97        *self.highest_handled_commit.borrow()
98    }
99
100    /// Updates the highest commit index processed by the consensus commit handler.
101    pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
102        debug!("Highest handled commit set to {}", highest_handled_commit);
103        self.highest_handled_commit
104            .send_replace(highest_handled_commit);
105    }
106
107    /// Waits for consensus to replay commits until the consumer last processed commit index.
108    pub async fn replay_to_consumer_last_processed_commit_complete(&self) {
109        let mut rx = self.highest_handled_commit.subscribe();
110        loop {
111            let highest_handled = *rx.borrow_and_update();
112            if highest_handled >= self.consumer_last_processed_commit_index {
113                return;
114            }
115            rx.changed().await.unwrap();
116        }
117    }
118}
119
120#[cfg(test)]
121mod test {
122    use crate::CommitConsumerMonitor;
123
124    #[test]
125    fn test_commit_consumer_monitor() {
126        let monitor = CommitConsumerMonitor::new(0, 10);
127        assert_eq!(monitor.highest_handled_commit(), 0);
128
129        monitor.set_highest_handled_commit(100);
130        assert_eq!(monitor.highest_handled_commit(), 100);
131    }
132}