consensus_core/
commit_consumer.rs1use std::sync::Arc;
5
6use mysten_metrics::monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
7use tokio::sync::watch;
8use tracing::debug;
9
10use crate::{CommitIndex, CommittedSubDag, block::CertifiedBlocksOutput};
11
12#[derive(Clone)]
15pub struct CommitConsumerArgs {
16 pub(crate) replay_after_commit_index: CommitIndex,
19 pub(crate) consumer_last_processed_commit_index: CommitIndex,
23
24 pub(crate) commit_sender: UnboundedSender<CommittedSubDag>,
26 pub(crate) block_sender: UnboundedSender<CertifiedBlocksOutput>,
29 monitor: Arc<CommitConsumerMonitor>,
31}
32
33impl CommitConsumerArgs {
34 pub fn new(
35 replay_after_commit_index: CommitIndex,
36 consumer_last_processed_commit_index: CommitIndex,
37 ) -> (
38 Self,
39 UnboundedReceiver<CommittedSubDag>,
40 UnboundedReceiver<CertifiedBlocksOutput>,
41 ) {
42 let (commit_sender, commit_receiver) = unbounded_channel("consensus_commit_output");
43 let (block_sender, block_receiver) = unbounded_channel("consensus_block_output");
44
45 let monitor = Arc::new(CommitConsumerMonitor::new(
46 replay_after_commit_index,
47 consumer_last_processed_commit_index,
48 ));
49 (
50 Self {
51 replay_after_commit_index,
52 consumer_last_processed_commit_index,
53 commit_sender,
54 block_sender,
55 monitor,
56 },
57 commit_receiver,
58 block_receiver,
59 )
60 }
61
62 pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
63 self.monitor.clone()
64 }
65}
66
67pub struct CommitConsumerMonitor {
76 highest_handled_commit: watch::Sender<u32>,
78
79 consumer_last_processed_commit_index: CommitIndex,
82}
83
84impl CommitConsumerMonitor {
85 pub(crate) fn new(
86 replay_after_commit_index: CommitIndex,
87 consumer_last_processed_commit_index: CommitIndex,
88 ) -> Self {
89 Self {
90 highest_handled_commit: watch::Sender::new(replay_after_commit_index),
91 consumer_last_processed_commit_index,
92 }
93 }
94
95 pub fn highest_handled_commit(&self) -> CommitIndex {
97 *self.highest_handled_commit.borrow()
98 }
99
100 pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
102 debug!("Highest handled commit set to {}", highest_handled_commit);
103 self.highest_handled_commit
104 .send_replace(highest_handled_commit);
105 }
106
107 pub async fn replay_to_consumer_last_processed_commit_complete(&self) {
109 let mut rx = self.highest_handled_commit.subscribe();
110 loop {
111 let highest_handled = *rx.borrow_and_update();
112 if highest_handled >= self.consumer_last_processed_commit_index {
113 return;
114 }
115 rx.changed().await.unwrap();
116 }
117 }
118}
119
120#[cfg(test)]
121mod test {
122 use crate::CommitConsumerMonitor;
123
124 #[test]
125 fn test_commit_consumer_monitor() {
126 let monitor = CommitConsumerMonitor::new(0, 10);
127 assert_eq!(monitor.highest_handled_commit(), 0);
128
129 monitor.set_highest_handled_commit(100);
130 assert_eq!(monitor.highest_handled_commit(), 100);
131 }
132}