consensus_core/
commit_consumer.rs1use std::sync::Arc;
5
6use mysten_metrics::monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
7use tokio::sync::watch;
8use tracing::debug;
9
10use crate::{CommitIndex, CommittedSubDag};
11
12#[derive(Clone)]
15pub struct CommitConsumerArgs {
16 pub(crate) replay_after_commit_index: CommitIndex,
19 pub(crate) consumer_last_processed_commit_index: CommitIndex,
23
24 pub(crate) commit_sender: UnboundedSender<CommittedSubDag>,
26 monitor: Arc<CommitConsumerMonitor>,
28}
29
30impl CommitConsumerArgs {
31 pub fn new(
32 replay_after_commit_index: CommitIndex,
33 consumer_last_processed_commit_index: CommitIndex,
34 ) -> (Self, UnboundedReceiver<CommittedSubDag>) {
35 let (commit_sender, commit_receiver) = unbounded_channel("consensus_commit_output");
36
37 let monitor = Arc::new(CommitConsumerMonitor::new(
38 replay_after_commit_index,
39 consumer_last_processed_commit_index,
40 ));
41 (
42 Self {
43 replay_after_commit_index,
44 consumer_last_processed_commit_index,
45 commit_sender,
46 monitor,
47 },
48 commit_receiver,
49 )
50 }
51
52 pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
53 self.monitor.clone()
54 }
55}
56
57pub struct CommitConsumerMonitor {
66 highest_handled_commit: watch::Sender<u32>,
68
69 consumer_last_processed_commit_index: CommitIndex,
72}
73
74impl CommitConsumerMonitor {
75 pub(crate) fn new(
76 replay_after_commit_index: CommitIndex,
77 consumer_last_processed_commit_index: CommitIndex,
78 ) -> Self {
79 Self {
80 highest_handled_commit: watch::Sender::new(replay_after_commit_index),
81 consumer_last_processed_commit_index,
82 }
83 }
84
85 pub fn highest_handled_commit(&self) -> CommitIndex {
87 *self.highest_handled_commit.borrow()
88 }
89
90 pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
92 debug!("Highest handled commit set to {}", highest_handled_commit);
93 self.highest_handled_commit
94 .send_replace(highest_handled_commit);
95 }
96
97 pub async fn replay_to_consumer_last_processed_commit_complete(&self) {
99 let mut rx = self.highest_handled_commit.subscribe();
100 loop {
101 let highest_handled = *rx.borrow_and_update();
102 if highest_handled >= self.consumer_last_processed_commit_index {
103 return;
104 }
105 rx.changed().await.unwrap();
106 }
107 }
108}
109
110#[cfg(test)]
111mod test {
112 use crate::CommitConsumerMonitor;
113
114 #[test]
115 fn test_commit_consumer_monitor() {
116 let monitor = CommitConsumerMonitor::new(0, 10);
117 assert_eq!(monitor.highest_handled_commit(), 0);
118
119 monitor.set_highest_handled_commit(100);
120 assert_eq!(monitor.highest_handled_commit(), 100);
121 }
122}