sui_core/authority/
consensus_tx_status_cache.rs1use std::collections::BTreeMap;
5
6use consensus_types::block::Round;
7use mysten_common::sync::notify_read::NotifyRead;
8use parking_lot::RwLock;
9use sui_types::{
10 error::{SuiErrorKind, SuiResult},
11 messages_consensus::ConsensusPosition,
12};
13use tokio::sync::watch;
14use tracing::debug;
15
16pub(crate) const CONSENSUS_STATUS_RETENTION_ROUNDS: u32 = 400;
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22pub(crate) enum ConsensusTxStatus {
23 Rejected,
25 Finalized,
27 Dropped,
33}
34
35#[derive(Debug, Clone)]
36pub(crate) enum NotifyReadConsensusTxStatusResult {
37 Status(ConsensusTxStatus),
39 Expired(u32),
42}
43
44pub(crate) struct ConsensusTxStatusCache {
45 inner: RwLock<Inner>,
46
47 status_notify_read: NotifyRead<ConsensusPosition, ConsensusTxStatus>,
48 last_committed_leader_round_tx: watch::Sender<Option<u32>>,
50 last_committed_leader_round_rx: watch::Receiver<Option<u32>>,
51}
52
53#[derive(Default)]
54struct Inner {
55 transaction_status: BTreeMap<ConsensusPosition, ConsensusTxStatus>,
57 last_committed_leader_round: Option<Round>,
59}
60
61impl ConsensusTxStatusCache {
62 pub(crate) fn new(consensus_gc_depth: Round) -> Self {
63 assert!(
64 consensus_gc_depth < CONSENSUS_STATUS_RETENTION_ROUNDS,
65 "{} vs {}",
66 consensus_gc_depth,
67 CONSENSUS_STATUS_RETENTION_ROUNDS
68 );
69 let (last_committed_leader_round_tx, last_committed_leader_round_rx) = watch::channel(None);
70 Self {
71 inner: Default::default(),
72 status_notify_read: Default::default(),
73 last_committed_leader_round_tx,
74 last_committed_leader_round_rx,
75 }
76 }
77
78 pub(crate) fn set_transaction_status(&self, pos: ConsensusPosition, status: ConsensusTxStatus) {
79 if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
80 && pos.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= last_committed_leader_round
81 {
82 return;
84 }
85
86 let mut inner = self.inner.write();
87 let old_status = inner.transaction_status.insert(pos, status);
88 if let Some(old_status) = old_status
89 && old_status != status
90 {
91 panic!(
92 "Conflicting status updates for transaction {:?}: {:?} -> {:?}",
93 pos, old_status, status
94 );
95 }
96
97 debug!("Transaction status is set for {}: {:?}", pos, status);
99 self.status_notify_read.notify(&pos, &status);
100 }
101
102 pub(crate) async fn notify_read_transaction_status(
105 &self,
106 consensus_position: ConsensusPosition,
107 ) -> NotifyReadConsensusTxStatusResult {
108 let registration = self.status_notify_read.register_one(&consensus_position);
109 let mut round_rx = self.last_committed_leader_round_rx.clone();
110 {
111 let inner = self.inner.read();
112 if let Some(status) = inner.transaction_status.get(&consensus_position) {
113 return NotifyReadConsensusTxStatusResult::Status(*status);
114 }
115 }
117 let expiration_check = async {
118 loop {
119 if let Some(last_committed_leader_round) = *round_rx.borrow()
120 && consensus_position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
121 <= last_committed_leader_round
122 {
123 return last_committed_leader_round;
124 }
125 round_rx
127 .changed()
128 .await
129 .expect("last_committed_leader_round watch channel closed unexpectedly");
130 }
131 };
132 tokio::select! {
133 status = registration => NotifyReadConsensusTxStatusResult::Status(status),
134 last_committed_leader_round = expiration_check => NotifyReadConsensusTxStatusResult::Expired(last_committed_leader_round),
135 }
136 }
137
138 pub(crate) fn update_last_committed_leader_round(&self, last_committed_leader_round: u32) {
139 debug!(
140 "Updating last committed leader round: {}",
141 last_committed_leader_round
142 );
143
144 let mut inner = self.inner.write();
145
146 let Some(leader_round) = inner
151 .last_committed_leader_round
152 .replace(last_committed_leader_round)
153 else {
154 return;
156 };
157
158 while let Some((position, _)) = inner.transaction_status.first_key_value() {
160 if position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= leader_round {
161 inner.transaction_status.pop_first();
162 } else {
163 break;
164 }
165 }
166
167 let _ = self.last_committed_leader_round_tx.send(Some(leader_round));
169 }
170
171 pub(crate) fn get_last_committed_leader_round(&self) -> Option<u32> {
172 *self.last_committed_leader_round_rx.borrow()
173 }
174
175 pub(crate) fn check_position_too_ahead(&self, position: &ConsensusPosition) -> SuiResult<()> {
177 if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
178 && position.block.round
179 > last_committed_leader_round + CONSENSUS_STATUS_RETENTION_ROUNDS
180 {
181 return Err(SuiErrorKind::ValidatorConsensusLagging {
182 round: position.block.round,
183 last_committed_round: last_committed_leader_round,
184 }
185 .into());
186 }
187 Ok(())
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use std::{sync::Arc, time::Duration};
194
195 use super::*;
196 use consensus_types::block::{BlockRef, TransactionIndex};
197
198 fn create_test_tx_position(round: u64, index: u64) -> ConsensusPosition {
199 ConsensusPosition {
200 epoch: Default::default(),
201 block: BlockRef {
202 round: round as u32,
203 author: Default::default(),
204 digest: Default::default(),
205 },
206 index: index as TransactionIndex,
207 }
208 }
209
210 #[tokio::test]
211 async fn test_set_and_get_transaction_status() {
212 let cache = ConsensusTxStatusCache::new(60);
213 let tx_pos = create_test_tx_position(1, 0);
214
215 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
217
218 let result = cache.notify_read_transaction_status(tx_pos).await;
220 assert!(matches!(
221 result,
222 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
223 ));
224 }
225
226 #[tokio::test]
227 async fn test_status_notification() {
228 let cache = Arc::new(ConsensusTxStatusCache::new(60));
229 let tx_pos = create_test_tx_position(1, 0);
230
231 let cache_clone = cache.clone();
233 let handle =
234 tokio::spawn(async move { cache_clone.notify_read_transaction_status(tx_pos).await });
235
236 tokio::time::sleep(Duration::from_millis(10)).await;
238
239 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
241
242 let result = handle.await.unwrap();
244 assert!(matches!(
245 result,
246 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
247 ));
248 }
249
250 #[tokio::test]
251 async fn test_round_expiration() {
252 let cache = ConsensusTxStatusCache::new(60);
253 let tx_pos = create_test_tx_position(1, 0);
254
255 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
257
258 cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 1);
260
261 cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2);
264
265 let result = cache.notify_read_transaction_status(tx_pos).await;
267 assert!(matches!(
268 result,
269 NotifyReadConsensusTxStatusResult::Expired(_)
270 ));
271 }
272
273 #[tokio::test]
274 async fn test_cleanup_expired_rounds() {
275 let cache = ConsensusTxStatusCache::new(60);
276
277 for round in 1..=5 {
279 let tx_pos = create_test_tx_position(round, 0);
280 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Rejected);
281 }
282
283 cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2);
285
286 {
288 let inner = cache.inner.read();
289 let rounds = inner
290 .transaction_status
291 .keys()
292 .map(|p| p.block.round)
293 .collect::<Vec<_>>();
294 assert_eq!(rounds, vec![1, 2, 3, 4, 5]);
295 }
296
297 cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 3);
300
301 {
303 let inner = cache.inner.read();
304 let rounds = inner
305 .transaction_status
306 .keys()
307 .map(|p| p.block.round)
308 .collect::<Vec<_>>();
309 assert_eq!(rounds, vec![3, 4, 5]);
310 }
311
312 cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 4);
315
316 {
318 let inner = cache.inner.read();
319 let rounds = inner
320 .transaction_status
321 .keys()
322 .map(|p| p.block.round)
323 .collect::<Vec<_>>();
324 assert_eq!(rounds, vec![4, 5]);
325 }
326 }
327
328 #[tokio::test]
329 async fn test_concurrent_operations() {
330 let cache = Arc::new(ConsensusTxStatusCache::new(60));
331 let tx_pos = create_test_tx_position(1, 0);
332
333 let mut handles = vec![];
335 for _ in 0..3 {
336 let cache_clone = cache.clone();
337 handles.push(tokio::spawn(async move {
338 cache_clone.notify_read_transaction_status(tx_pos).await
339 }));
340 }
341
342 tokio::time::sleep(Duration::from_millis(10)).await;
344
345 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
347
348 for handle in handles {
350 let result = handle.await.unwrap();
351 assert!(matches!(
352 result,
353 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
354 ));
355 }
356 }
357
358 #[tokio::test]
359 #[should_panic(expected = "Conflicting status updates")]
360 async fn test_out_of_order_status_updates() {
361 let cache = Arc::new(ConsensusTxStatusCache::new(60));
362 let tx_pos = create_test_tx_position(1, 0);
363
364 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
366
367 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Rejected);
369 }
370}