1use std::collections::BTreeMap;
5
6use consensus_types::block::Round;
7use mysten_common::sync::notify_read::NotifyRead;
8use parking_lot::RwLock;
9use sui_types::{
10 error::{SuiErrorKind, SuiResult},
11 messages_consensus::ConsensusPosition,
12};
13use tokio::sync::watch;
14use tracing::debug;
15
16pub(crate) const CONSENSUS_STATUS_RETENTION_ROUNDS: u32 = 400;
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22pub(crate) enum ConsensusTxStatus {
23 Rejected,
25 Finalized,
27 Dropped,
33}
34
35#[derive(Debug, Clone)]
36pub(crate) enum NotifyReadConsensusTxStatusResult {
37 Status(ConsensusTxStatus),
39 Expired(u32),
42}
43
44pub(crate) struct ConsensusTxStatusCache {
45 inner: RwLock<Inner>,
46
47 status_notify_read: NotifyRead<ConsensusPosition, ConsensusTxStatus>,
48 last_committed_leader_round_tx: watch::Sender<Option<u32>>,
50 last_committed_leader_round_rx: watch::Receiver<Option<u32>>,
51}
52
53#[derive(Default)]
54struct Inner {
55 transaction_status: BTreeMap<ConsensusPosition, ConsensusTxStatus>,
57 last_committed_leader_round: Option<Round>,
59}
60
61impl ConsensusTxStatusCache {
62 pub(crate) fn new(consensus_gc_depth: Round) -> Self {
63 assert!(
64 consensus_gc_depth < CONSENSUS_STATUS_RETENTION_ROUNDS,
65 "{} vs {}",
66 consensus_gc_depth,
67 CONSENSUS_STATUS_RETENTION_ROUNDS
68 );
69 let (last_committed_leader_round_tx, last_committed_leader_round_rx) = watch::channel(None);
70 Self {
71 inner: Default::default(),
72 status_notify_read: Default::default(),
73 last_committed_leader_round_tx,
74 last_committed_leader_round_rx,
75 }
76 }
77
78 #[cfg(test)]
81 pub(crate) fn set_transaction_status(&self, pos: ConsensusPosition, status: ConsensusTxStatus) {
82 self.set_transaction_statuses(vec![(pos, status)]);
83 }
84
85 pub(crate) fn set_transaction_statuses(
93 &self,
94 updates: Vec<(ConsensusPosition, ConsensusTxStatus)>,
95 ) {
96 let last_committed_leader_round = *self.last_committed_leader_round_rx.borrow();
99 let mut to_notify = Vec::with_capacity(updates.len());
100 {
101 let mut inner = self.inner.write();
102 for (pos, status) in updates {
103 if let Some(last_committed_leader_round) = last_committed_leader_round
104 && pos.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
105 <= last_committed_leader_round
106 {
107 continue;
109 }
110 let old_status = inner.transaction_status.insert(pos, status);
111 if let Some(old_status) = old_status
112 && old_status != status
113 {
114 panic!(
115 "Conflicting status updates for transaction {:?}: {:?} -> {:?}",
116 pos, old_status, status
117 );
118 }
119 debug!("Transaction status is set for {}: {:?}", pos, status);
120 to_notify.push((pos, status));
121 }
122 }
123 for (pos, status) in to_notify {
124 self.status_notify_read.notify(&pos, &status);
125 }
126 }
127
128 pub(crate) async fn notify_read_transaction_status(
131 &self,
132 consensus_position: ConsensusPosition,
133 ) -> NotifyReadConsensusTxStatusResult {
134 let registration = self.status_notify_read.register_one(&consensus_position);
135 let mut round_rx = self.last_committed_leader_round_rx.clone();
136 {
137 let inner = self.inner.read();
138 if let Some(status) = inner.transaction_status.get(&consensus_position) {
139 return NotifyReadConsensusTxStatusResult::Status(*status);
140 }
141 }
143 let expiration_check = async {
144 loop {
145 if let Some(last_committed_leader_round) = *round_rx.borrow()
146 && consensus_position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
147 <= last_committed_leader_round
148 {
149 return last_committed_leader_round;
150 }
151 round_rx
153 .changed()
154 .await
155 .expect("last_committed_leader_round watch channel closed unexpectedly");
156 }
157 };
158 tokio::select! {
159 status = registration => NotifyReadConsensusTxStatusResult::Status(status),
160 last_committed_leader_round = expiration_check => NotifyReadConsensusTxStatusResult::Expired(last_committed_leader_round),
161 }
162 }
163
164 pub(crate) fn update_last_committed_leader_round(&self, last_committed_leader_round: u32) {
165 debug!(
166 "Updating last committed leader round: {}",
167 last_committed_leader_round
168 );
169
170 let mut inner = self.inner.write();
171
172 let Some(leader_round) = inner
177 .last_committed_leader_round
178 .replace(last_committed_leader_round)
179 else {
180 return;
182 };
183
184 while let Some((position, _)) = inner.transaction_status.first_key_value() {
186 if position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= leader_round {
187 inner.transaction_status.pop_first();
188 } else {
189 break;
190 }
191 }
192
193 let _ = self.last_committed_leader_round_tx.send(Some(leader_round));
195 }
196
197 pub(crate) fn get_last_committed_leader_round(&self) -> Option<u32> {
198 *self.last_committed_leader_round_rx.borrow()
199 }
200
201 pub(crate) fn check_position_too_ahead(&self, position: &ConsensusPosition) -> SuiResult<()> {
203 if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
204 && position.block.round
205 > last_committed_leader_round + CONSENSUS_STATUS_RETENTION_ROUNDS
206 {
207 return Err(SuiErrorKind::ValidatorConsensusLagging {
208 round: position.block.round,
209 last_committed_round: last_committed_leader_round,
210 }
211 .into());
212 }
213 Ok(())
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use std::{sync::Arc, time::Duration};
220
221 use super::*;
222 use consensus_types::block::{BlockRef, TransactionIndex};
223
224 fn create_test_tx_position(round: u64, index: u64) -> ConsensusPosition {
225 ConsensusPosition {
226 epoch: Default::default(),
227 block: BlockRef {
228 round: round as u32,
229 author: Default::default(),
230 digest: Default::default(),
231 },
232 index: index as TransactionIndex,
233 }
234 }
235
236 #[tokio::test]
237 async fn test_set_and_get_transaction_status() {
238 let cache = ConsensusTxStatusCache::new(60);
239 let tx_pos = create_test_tx_position(1, 0);
240
241 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
243
244 let result = cache.notify_read_transaction_status(tx_pos).await;
246 assert!(matches!(
247 result,
248 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
249 ));
250 }
251
252 #[tokio::test]
253 async fn test_status_notification() {
254 let cache = Arc::new(ConsensusTxStatusCache::new(60));
255 let tx_pos = create_test_tx_position(1, 0);
256
257 let cache_clone = cache.clone();
259 let handle =
260 tokio::spawn(async move { cache_clone.notify_read_transaction_status(tx_pos).await });
261
262 tokio::time::sleep(Duration::from_millis(10)).await;
264
265 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
267
268 let result = handle.await.unwrap();
270 assert!(matches!(
271 result,
272 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
273 ));
274 }
275
276 #[tokio::test]
277 async fn test_round_expiration() {
278 let cache = ConsensusTxStatusCache::new(60);
279 let tx_pos = create_test_tx_position(1, 0);
280
281 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
283
284 cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 1);
286
287 cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2);
290
291 let result = cache.notify_read_transaction_status(tx_pos).await;
293 assert!(matches!(
294 result,
295 NotifyReadConsensusTxStatusResult::Expired(_)
296 ));
297 }
298
299 #[tokio::test]
300 async fn test_cleanup_expired_rounds() {
301 let cache = ConsensusTxStatusCache::new(60);
302
303 for round in 1..=5 {
305 let tx_pos = create_test_tx_position(round, 0);
306 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Rejected);
307 }
308
309 cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2);
311
312 {
314 let inner = cache.inner.read();
315 let rounds = inner
316 .transaction_status
317 .keys()
318 .map(|p| p.block.round)
319 .collect::<Vec<_>>();
320 assert_eq!(rounds, vec![1, 2, 3, 4, 5]);
321 }
322
323 cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 3);
326
327 {
329 let inner = cache.inner.read();
330 let rounds = inner
331 .transaction_status
332 .keys()
333 .map(|p| p.block.round)
334 .collect::<Vec<_>>();
335 assert_eq!(rounds, vec![3, 4, 5]);
336 }
337
338 cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 4);
341
342 {
344 let inner = cache.inner.read();
345 let rounds = inner
346 .transaction_status
347 .keys()
348 .map(|p| p.block.round)
349 .collect::<Vec<_>>();
350 assert_eq!(rounds, vec![4, 5]);
351 }
352 }
353
354 #[tokio::test]
355 async fn test_concurrent_operations() {
356 let cache = Arc::new(ConsensusTxStatusCache::new(60));
357 let tx_pos = create_test_tx_position(1, 0);
358
359 let mut handles = vec![];
361 for _ in 0..3 {
362 let cache_clone = cache.clone();
363 handles.push(tokio::spawn(async move {
364 cache_clone.notify_read_transaction_status(tx_pos).await
365 }));
366 }
367
368 tokio::time::sleep(Duration::from_millis(10)).await;
370
371 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
373
374 for handle in handles {
376 let result = handle.await.unwrap();
377 assert!(matches!(
378 result,
379 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
380 ));
381 }
382 }
383
384 #[tokio::test]
385 #[should_panic(expected = "Conflicting status updates")]
386 async fn test_out_of_order_status_updates() {
387 let cache = Arc::new(ConsensusTxStatusCache::new(60));
388 let tx_pos = create_test_tx_position(1, 0);
389
390 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
392
393 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Rejected);
395 }
396}