consensus_core/
commit_consumer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use mysten_metrics::monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
7use tokio::sync::watch;
8use tracing::debug;
9
10use crate::{CommitIndex, CommittedSubDag};
11
12/// Arguments from commit consumer to this consensus instance.
13/// This includes both parameters and components for communications.
14#[derive(Clone)]
15pub struct CommitConsumerArgs {
16    /// The consumer requests consensus to replay from commit replay_after_commit_index + 1.
17    /// Set to 0 to replay from the start (as commit sequence starts at index = 1).
18    pub(crate) replay_after_commit_index: CommitIndex,
19    /// Index of the last commit that the consumer has processed.  This is useful during
20    /// crash recovery when other components can wait for the consumer to finish processing
21    /// up to this index.
22    pub(crate) consumer_last_processed_commit_index: CommitIndex,
23
24    /// A channel to output the committed sub dags.
25    pub(crate) commit_sender: UnboundedSender<CommittedSubDag>,
26    // Allows the commit consumer to report its progress.
27    monitor: Arc<CommitConsumerMonitor>,
28}
29
30impl CommitConsumerArgs {
31    pub fn new(
32        replay_after_commit_index: CommitIndex,
33        consumer_last_processed_commit_index: CommitIndex,
34    ) -> (Self, UnboundedReceiver<CommittedSubDag>) {
35        let (commit_sender, commit_receiver) = unbounded_channel("consensus_commit_output");
36
37        let monitor = Arc::new(CommitConsumerMonitor::new(
38            replay_after_commit_index,
39            consumer_last_processed_commit_index,
40        ));
41        (
42            Self {
43                replay_after_commit_index,
44                consumer_last_processed_commit_index,
45                commit_sender,
46                monitor,
47            },
48            commit_receiver,
49        )
50    }
51
52    pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
53        self.monitor.clone()
54    }
55}
56
57/// Helps monitor the progress of the consensus commit handler (consumer).
58///
59/// This component currently has two use usages:
60/// 1. Checking the highest commit index processed by the consensus commit handler.
61///    Consensus components can decide to wait for more commits to be processed before proceeding with
62///    their work.
63/// 2. Waiting for consensus commit handler to finish processing replayed commits.
64///    Current usage is actually outside of consensus.
65pub struct CommitConsumerMonitor {
66    // highest commit that has been handled by the consumer.
67    highest_handled_commit: watch::Sender<u32>,
68
69    // At node startup, the last consensus commit processed by the commit consumer from the previous run.
70    // This can be 0 if starting a new epoch.
71    consumer_last_processed_commit_index: CommitIndex,
72}
73
74impl CommitConsumerMonitor {
75    pub(crate) fn new(
76        replay_after_commit_index: CommitIndex,
77        consumer_last_processed_commit_index: CommitIndex,
78    ) -> Self {
79        Self {
80            highest_handled_commit: watch::Sender::new(replay_after_commit_index),
81            consumer_last_processed_commit_index,
82        }
83    }
84
85    /// Gets the highest commit index processed by the consensus commit handler.
86    pub fn highest_handled_commit(&self) -> CommitIndex {
87        *self.highest_handled_commit.borrow()
88    }
89
90    /// Updates the highest commit index processed by the consensus commit handler.
91    pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
92        debug!("Highest handled commit set to {}", highest_handled_commit);
93        self.highest_handled_commit
94            .send_replace(highest_handled_commit);
95    }
96
97    /// Waits for consensus to replay commits until the consumer last processed commit index.
98    pub async fn replay_to_consumer_last_processed_commit_complete(&self) {
99        let mut rx = self.highest_handled_commit.subscribe();
100        loop {
101            let highest_handled = *rx.borrow_and_update();
102            if highest_handled >= self.consumer_last_processed_commit_index {
103                return;
104            }
105            rx.changed().await.unwrap();
106        }
107    }
108}
109
110#[cfg(test)]
111mod test {
112    use crate::CommitConsumerMonitor;
113
114    #[test]
115    fn test_commit_consumer_monitor() {
116        let monitor = CommitConsumerMonitor::new(0, 10);
117        assert_eq!(monitor.highest_handled_commit(), 0);
118
119        monitor.set_highest_handled_commit(100);
120        assert_eq!(monitor.highest_handled_commit(), 100);
121    }
122}