1use std::collections::{BTreeMap, BTreeSet, btree_map::Entry};
5
6use consensus_types::block::Round;
7use mysten_common::sync::notify_read::NotifyRead;
8use parking_lot::{RwLock, RwLockWriteGuard};
9use sui_types::{
10 error::{SuiErrorKind, SuiResult},
11 messages_consensus::ConsensusPosition,
12};
13use tokio::sync::watch;
14use tracing::debug;
15
16pub(crate) const CONSENSUS_STATUS_RETENTION_ROUNDS: u32 = 400;
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22pub(crate) enum ConsensusTxStatus {
23 FastpathCertified,
25 Rejected,
27 Finalized,
29 Dropped,
35}
36
37#[derive(Debug, Clone)]
38pub(crate) enum NotifyReadConsensusTxStatusResult {
39 Status(ConsensusTxStatus),
41 Expired(u32),
44}
45
46pub(crate) struct ConsensusTxStatusCache {
47 consensus_gc_depth: u32,
49
50 inner: RwLock<Inner>,
51
52 status_notify_read: NotifyRead<ConsensusPosition, ConsensusTxStatus>,
53 last_committed_leader_round_tx: watch::Sender<Option<u32>>,
55 last_committed_leader_round_rx: watch::Receiver<Option<u32>>,
56}
57
58#[derive(Default)]
59struct Inner {
60 transaction_status: BTreeMap<ConsensusPosition, ConsensusTxStatus>,
62 fastpath_certified: BTreeSet<ConsensusPosition>,
64 last_committed_leader_round: Option<Round>,
66}
67
68impl ConsensusTxStatusCache {
69 pub(crate) fn new(consensus_gc_depth: Round) -> Self {
70 assert!(
71 consensus_gc_depth < CONSENSUS_STATUS_RETENTION_ROUNDS,
72 "{} vs {}",
73 consensus_gc_depth,
74 CONSENSUS_STATUS_RETENTION_ROUNDS
75 );
76 let (last_committed_leader_round_tx, last_committed_leader_round_rx) = watch::channel(None);
77 Self {
78 consensus_gc_depth,
79 inner: Default::default(),
80 status_notify_read: Default::default(),
81 last_committed_leader_round_tx,
82 last_committed_leader_round_rx,
83 }
84 }
85
86 pub(crate) fn set_transaction_status(&self, pos: ConsensusPosition, status: ConsensusTxStatus) {
87 if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
88 && pos.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= last_committed_leader_round
89 {
90 return;
92 }
93
94 let mut inner = self.inner.write();
95 self.set_transaction_status_inner(&mut inner, pos, status);
96 }
97
98 fn set_transaction_status_inner(
99 &self,
100 inner: &mut RwLockWriteGuard<Inner>,
101 pos: ConsensusPosition,
102 status: ConsensusTxStatus,
103 ) {
104 let status_entry = inner.transaction_status.entry(pos);
107 match status_entry {
108 Entry::Vacant(entry) => {
109 entry.insert(status);
111 if status == ConsensusTxStatus::FastpathCertified {
112 assert!(inner.fastpath_certified.insert(pos));
114 }
115 }
116 Entry::Occupied(mut entry) => {
117 let old_status = *entry.get();
118 match (old_status, status) {
119 (s1, s2) if s1 == s2 => return,
121 (ConsensusTxStatus::FastpathCertified, _) => {
123 entry.insert(status);
124 if old_status == ConsensusTxStatus::FastpathCertified {
125 assert!(inner.fastpath_certified.remove(&pos));
127 }
128 }
129 (
131 ConsensusTxStatus::Rejected
132 | ConsensusTxStatus::Dropped
133 | ConsensusTxStatus::Finalized,
134 ConsensusTxStatus::FastpathCertified,
135 ) => {
136 return;
137 }
138 _ => {
140 panic!(
141 "Conflicting status updates for transaction {:?}: {:?} -> {:?}",
142 pos, old_status, status
143 );
144 }
145 }
146 }
147 };
148
149 debug!("Transaction status is set for {}: {:?}", pos, status);
151 self.status_notify_read.notify(&pos, &status);
152 }
153
154 pub(crate) async fn notify_read_transaction_status_change(
157 &self,
158 consensus_position: ConsensusPosition,
159 old_status: Option<ConsensusTxStatus>,
160 ) -> NotifyReadConsensusTxStatusResult {
161 let registration = self.status_notify_read.register_one(&consensus_position);
164 let mut round_rx = self.last_committed_leader_round_rx.clone();
165 {
166 let inner = self.inner.read();
167 if let Some(status) = inner.transaction_status.get(&consensus_position)
168 && Some(status) != old_status.as_ref()
169 {
170 if let Some(old_status) = old_status {
171 assert_eq!(old_status, ConsensusTxStatus::FastpathCertified);
174 }
175 return NotifyReadConsensusTxStatusResult::Status(*status);
176 }
177 }
179
180 let expiration_check = async {
181 loop {
182 if let Some(last_committed_leader_round) = *round_rx.borrow()
183 && consensus_position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
184 <= last_committed_leader_round
185 {
186 return last_committed_leader_round;
187 }
188 round_rx
190 .changed()
191 .await
192 .expect("last_committed_leader_round watch channel closed unexpectedly");
193 }
194 };
195 tokio::select! {
196 status = registration => NotifyReadConsensusTxStatusResult::Status(status),
197 last_committed_leader_round = expiration_check => NotifyReadConsensusTxStatusResult::Expired(last_committed_leader_round),
198 }
199 }
200
201 pub(crate) async fn update_last_committed_leader_round(
202 &self,
203 last_committed_leader_round: u32,
204 ) {
205 debug!(
206 "Updating last committed leader round: {}",
207 last_committed_leader_round
208 );
209
210 let mut inner = self.inner.write();
211
212 let Some(leader_round) = inner
217 .last_committed_leader_round
218 .replace(last_committed_leader_round)
219 else {
220 return;
222 };
223
224 while let Some((position, _)) = inner.transaction_status.first_key_value() {
226 if position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= leader_round {
227 let (pos, status) = inner.transaction_status.pop_first().unwrap();
228 if status == ConsensusTxStatus::FastpathCertified {
230 assert!(inner.fastpath_certified.remove(&pos));
231 }
232 } else {
233 break;
234 }
235 }
236
237 while let Some(position) = inner.fastpath_certified.first().cloned() {
243 if position.block.round + self.consensus_gc_depth <= leader_round {
244 self.set_transaction_status_inner(
246 &mut inner,
247 position,
248 ConsensusTxStatus::Rejected,
249 );
250 } else {
251 break;
252 }
253 }
254
255 let _ = self.last_committed_leader_round_tx.send(Some(leader_round));
257 }
258
259 pub(crate) fn get_last_committed_leader_round(&self) -> Option<u32> {
260 *self.last_committed_leader_round_rx.borrow()
261 }
262
263 pub(crate) fn get_num_fastpath_certified(&self) -> usize {
264 self.inner.read().fastpath_certified.len()
265 }
266
267 pub(crate) fn check_position_too_ahead(&self, position: &ConsensusPosition) -> SuiResult<()> {
269 if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
270 && position.block.round
271 > last_committed_leader_round + CONSENSUS_STATUS_RETENTION_ROUNDS
272 {
273 return Err(SuiErrorKind::ValidatorConsensusLagging {
274 round: position.block.round,
275 last_committed_round: last_committed_leader_round,
276 }
277 .into());
278 }
279 Ok(())
280 }
281
282 #[cfg(test)]
283 pub(crate) fn get_transaction_status(
284 &self,
285 position: &ConsensusPosition,
286 ) -> Option<ConsensusTxStatus> {
287 let inner = self.inner.read();
288 inner.transaction_status.get(position).cloned()
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use std::{sync::Arc, time::Duration};
295
296 use super::*;
297 use consensus_types::block::{BlockRef, TransactionIndex};
298
299 fn create_test_tx_position(round: u64, index: u64) -> ConsensusPosition {
300 ConsensusPosition {
301 epoch: Default::default(),
302 block: BlockRef {
303 round: round as u32,
304 author: Default::default(),
305 digest: Default::default(),
306 },
307 index: index as TransactionIndex,
308 }
309 }
310
311 #[tokio::test]
312 async fn test_set_and_get_transaction_status() {
313 let cache = ConsensusTxStatusCache::new(60);
314 let tx_pos = create_test_tx_position(1, 0);
315
316 cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
318
319 let result = cache
321 .notify_read_transaction_status_change(tx_pos, None)
322 .await;
323 assert!(matches!(
324 result,
325 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::FastpathCertified)
326 ));
327 }
328
329 #[tokio::test]
330 async fn test_status_notification() {
331 let cache = Arc::new(ConsensusTxStatusCache::new(60));
332 let tx_pos = create_test_tx_position(1, 0);
333
334 let cache_clone = cache.clone();
336 let handle = tokio::spawn(async move {
337 cache_clone
338 .notify_read_transaction_status_change(tx_pos, None)
339 .await
340 });
341
342 tokio::time::sleep(Duration::from_millis(10)).await;
344
345 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
347
348 let result = handle.await.unwrap();
350 assert!(matches!(
351 result,
352 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
353 ));
354 }
355
356 #[tokio::test]
357 async fn test_round_expiration() {
358 let cache = ConsensusTxStatusCache::new(60);
359 let tx_pos = create_test_tx_position(1, 0);
360
361 cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
363
364 cache
366 .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 1)
367 .await;
368
369 cache
372 .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
373 .await;
374
375 let result = cache
377 .notify_read_transaction_status_change(tx_pos, None)
378 .await;
379 assert!(matches!(
380 result,
381 NotifyReadConsensusTxStatusResult::Expired(_)
382 ));
383 }
384
385 #[tokio::test]
386 async fn test_multiple_status_updates() {
387 let cache = ConsensusTxStatusCache::new(60);
388 let tx_pos = create_test_tx_position(1, 0);
389
390 cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
392
393 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
395
396 let result = cache
398 .notify_read_transaction_status_change(
399 tx_pos,
400 Some(ConsensusTxStatus::FastpathCertified),
401 )
402 .await;
403 assert!(matches!(
404 result,
405 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
406 ));
407 }
408
409 #[tokio::test]
410 async fn test_cleanup_expired_rounds() {
411 let cache = ConsensusTxStatusCache::new(60);
412
413 for round in 1..=5 {
415 let tx_pos = create_test_tx_position(round, 0);
416 cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
417 }
418
419 cache
421 .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
422 .await;
423
424 {
426 let inner = cache.inner.read();
427 let rounds = inner
428 .transaction_status
429 .keys()
430 .map(|p| p.block.round)
431 .collect::<Vec<_>>();
432 assert_eq!(rounds, vec![1, 2, 3, 4, 5]);
433 }
434
435 cache
438 .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 3)
439 .await;
440
441 {
443 let inner = cache.inner.read();
444 let rounds = inner
445 .transaction_status
446 .keys()
447 .map(|p| p.block.round)
448 .collect::<Vec<_>>();
449 assert_eq!(rounds, vec![3, 4, 5]);
450 }
451
452 cache
455 .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 4)
456 .await;
457
458 {
460 let inner = cache.inner.read();
461 let rounds = inner
462 .transaction_status
463 .keys()
464 .map(|p| p.block.round)
465 .collect::<Vec<_>>();
466 assert_eq!(rounds, vec![4, 5]);
467 }
468 }
469
470 #[tokio::test]
471 async fn test_concurrent_operations() {
472 let cache = Arc::new(ConsensusTxStatusCache::new(60));
473 let tx_pos = create_test_tx_position(1, 0);
474
475 let mut handles = vec![];
477 for _ in 0..3 {
478 let cache_clone = cache.clone();
479 handles.push(tokio::spawn(async move {
480 cache_clone
481 .notify_read_transaction_status_change(tx_pos, None)
482 .await
483 }));
484 }
485
486 tokio::time::sleep(Duration::from_millis(10)).await;
488
489 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
491
492 for handle in handles {
494 let result = handle.await.unwrap();
495 assert!(matches!(
496 result,
497 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
498 ));
499 }
500 }
501
502 #[tokio::test]
503 async fn test_out_of_order_status_updates() {
504 let cache = Arc::new(ConsensusTxStatusCache::new(60));
505 let tx_pos = create_test_tx_position(1, 0);
506
507 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
509 let result = cache
510 .notify_read_transaction_status_change(tx_pos, None)
511 .await;
512 assert!(matches!(
513 result,
514 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
515 ));
516
517 let cache_clone = cache.clone();
518 let notify_read_task = tokio::spawn(async move {
519 cache_clone
520 .notify_read_transaction_status_change(tx_pos, Some(ConsensusTxStatus::Finalized))
521 .await
522 });
523
524 cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
526 let result = tokio::time::timeout(Duration::from_secs(3), notify_read_task).await;
527 assert!(result.is_err());
528 assert_eq!(
529 cache.get_transaction_status(&tx_pos),
530 Some(ConsensusTxStatus::Finalized)
531 );
532 }
533
534 #[tokio::test]
535 async fn test_fastpath_certified_tracking() {
536 let cache = Arc::new(ConsensusTxStatusCache::new(60));
537
538 assert_eq!(cache.get_num_fastpath_certified(), 0);
540
541 let tx_pos1 = create_test_tx_position(100, 0);
543 let tx_pos2 = create_test_tx_position(100, 1);
544 let tx_pos3 = create_test_tx_position(101, 2);
545 let tx_pos4 = create_test_tx_position(102, 3);
546
547 cache.set_transaction_status(tx_pos1, ConsensusTxStatus::FastpathCertified);
548 assert_eq!(cache.get_num_fastpath_certified(), 1);
549
550 cache.set_transaction_status(tx_pos2, ConsensusTxStatus::FastpathCertified);
551 assert_eq!(cache.get_num_fastpath_certified(), 2);
552
553 cache.set_transaction_status(tx_pos3, ConsensusTxStatus::FastpathCertified);
554 assert_eq!(cache.get_num_fastpath_certified(), 3);
555
556 cache.set_transaction_status(tx_pos4, ConsensusTxStatus::FastpathCertified);
557 assert_eq!(cache.get_num_fastpath_certified(), 4);
558
559 let tx_pos5 = create_test_tx_position(103, 4);
561 cache.set_transaction_status(tx_pos5, ConsensusTxStatus::Finalized);
562 assert_eq!(cache.get_num_fastpath_certified(), 4);
563
564 cache.set_transaction_status(tx_pos1, ConsensusTxStatus::Finalized);
566 assert_eq!(cache.get_num_fastpath_certified(), 3);
567 assert_eq!(
568 cache.get_transaction_status(&tx_pos1),
569 Some(ConsensusTxStatus::Finalized)
570 );
571
572 cache.set_transaction_status(tx_pos2, ConsensusTxStatus::Rejected);
574 assert_eq!(cache.get_num_fastpath_certified(), 2);
575 assert_eq!(
576 cache.get_transaction_status(&tx_pos2),
577 Some(ConsensusTxStatus::Rejected)
578 );
579
580 cache.update_last_committed_leader_round(160).await;
586 assert_eq!(cache.get_num_fastpath_certified(), 2);
587 assert_eq!(
588 cache.get_transaction_status(&tx_pos3),
589 Some(ConsensusTxStatus::FastpathCertified)
590 );
591 assert_eq!(
592 cache.get_transaction_status(&tx_pos4),
593 Some(ConsensusTxStatus::FastpathCertified)
594 );
595
596 cache.update_last_committed_leader_round(161).await;
599 assert_eq!(cache.get_num_fastpath_certified(), 2);
600 assert_eq!(
601 cache.get_transaction_status(&tx_pos3),
602 Some(ConsensusTxStatus::FastpathCertified)
603 );
604 assert_eq!(
605 cache.get_transaction_status(&tx_pos4),
606 Some(ConsensusTxStatus::FastpathCertified)
607 );
608
609 cache.update_last_committed_leader_round(162).await;
613 assert_eq!(cache.get_num_fastpath_certified(), 1);
614 assert_eq!(
615 cache.get_transaction_status(&tx_pos3),
616 Some(ConsensusTxStatus::Rejected)
617 );
618 assert_eq!(
619 cache.get_transaction_status(&tx_pos4),
620 Some(ConsensusTxStatus::FastpathCertified)
621 );
622
623 cache.update_last_committed_leader_round(163).await;
626 assert_eq!(cache.get_num_fastpath_certified(), 0);
627 assert_eq!(
628 cache.get_transaction_status(&tx_pos4),
629 Some(ConsensusTxStatus::Rejected)
630 );
631
632 let tx_pos6 = create_test_tx_position(200, 5);
634 cache.set_transaction_status(tx_pos6, ConsensusTxStatus::Finalized);
635 assert_eq!(cache.get_num_fastpath_certified(), 0);
636
637 cache.set_transaction_status(tx_pos6, ConsensusTxStatus::FastpathCertified);
639 assert_eq!(cache.get_num_fastpath_certified(), 0);
640 assert_eq!(
641 cache.get_transaction_status(&tx_pos6),
642 Some(ConsensusTxStatus::Finalized)
643 );
644 }
645}