consensus_core/
commit_consumer.rs1use std::sync::Arc;
5
6use mysten_metrics::monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
7use tokio::sync::watch;
8use tracing::debug;
9
10use crate::{CommitIndex, CommittedSubDag, block::CertifiedBlocksOutput};
11
12#[derive(Clone)]
15pub struct CommitConsumerArgs {
16 pub(crate) replay_after_commit_index: CommitIndex,
19 pub(crate) consumer_last_processed_commit_index: CommitIndex,
23
24 pub(crate) commit_sender: UnboundedSender<CommittedSubDag>,
26 pub(crate) block_sender: UnboundedSender<CertifiedBlocksOutput>,
29 monitor: Arc<CommitConsumerMonitor>,
31}
32
33impl CommitConsumerArgs {
34 pub fn new(
35 replay_after_commit_index: CommitIndex,
36 consumer_last_processed_commit_index: CommitIndex,
37 ) -> (Self, UnboundedReceiver<CommittedSubDag>) {
38 let (commit_sender, commit_receiver) = unbounded_channel("consensus_commit_output");
39 let (block_sender, _block_receiver) = unbounded_channel("consensus_block_output");
40
41 let monitor = Arc::new(CommitConsumerMonitor::new(
42 replay_after_commit_index,
43 consumer_last_processed_commit_index,
44 ));
45 (
46 Self {
47 replay_after_commit_index,
48 consumer_last_processed_commit_index,
49 commit_sender,
50 block_sender,
51 monitor,
52 },
53 commit_receiver,
54 )
55 }
56
57 pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
58 self.monitor.clone()
59 }
60}
61
62pub struct CommitConsumerMonitor {
71 highest_handled_commit: watch::Sender<u32>,
73
74 consumer_last_processed_commit_index: CommitIndex,
77}
78
79impl CommitConsumerMonitor {
80 pub(crate) fn new(
81 replay_after_commit_index: CommitIndex,
82 consumer_last_processed_commit_index: CommitIndex,
83 ) -> Self {
84 Self {
85 highest_handled_commit: watch::Sender::new(replay_after_commit_index),
86 consumer_last_processed_commit_index,
87 }
88 }
89
90 pub fn highest_handled_commit(&self) -> CommitIndex {
92 *self.highest_handled_commit.borrow()
93 }
94
95 pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
97 debug!("Highest handled commit set to {}", highest_handled_commit);
98 self.highest_handled_commit
99 .send_replace(highest_handled_commit);
100 }
101
102 pub async fn replay_to_consumer_last_processed_commit_complete(&self) {
104 let mut rx = self.highest_handled_commit.subscribe();
105 loop {
106 let highest_handled = *rx.borrow_and_update();
107 if highest_handled >= self.consumer_last_processed_commit_index {
108 return;
109 }
110 rx.changed().await.unwrap();
111 }
112 }
113}
114
115#[cfg(test)]
116mod test {
117 use crate::CommitConsumerMonitor;
118
119 #[test]
120 fn test_commit_consumer_monitor() {
121 let monitor = CommitConsumerMonitor::new(0, 10);
122 assert_eq!(monitor.highest_handled_commit(), 0);
123
124 monitor.set_highest_handled_commit(100);
125 assert_eq!(monitor.highest_handled_commit(), 100);
126 }
127}