1use std::collections::{BTreeMap, BTreeSet, btree_map::Entry};
5
6use consensus_types::block::Round;
7use mysten_common::sync::notify_read::NotifyRead;
8use parking_lot::{RwLock, RwLockWriteGuard};
9use sui_types::{
10 error::{SuiErrorKind, SuiResult},
11 messages_consensus::ConsensusPosition,
12};
13use tokio::sync::watch;
14use tracing::debug;
15
16pub(crate) const CONSENSUS_STATUS_RETENTION_ROUNDS: u32 = 400;
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22pub(crate) enum ConsensusTxStatus {
23 FastpathCertified,
25 Rejected,
27 Finalized,
29}
30
31#[derive(Debug, Clone)]
32pub(crate) enum NotifyReadConsensusTxStatusResult {
33 Status(ConsensusTxStatus),
35 Expired(u32),
38}
39
40pub(crate) struct ConsensusTxStatusCache {
41 consensus_gc_depth: u32,
43
44 inner: RwLock<Inner>,
45
46 status_notify_read: NotifyRead<ConsensusPosition, ConsensusTxStatus>,
47 last_committed_leader_round_tx: watch::Sender<Option<u32>>,
49 last_committed_leader_round_rx: watch::Receiver<Option<u32>>,
50}
51
52#[derive(Default)]
53struct Inner {
54 transaction_status: BTreeMap<ConsensusPosition, ConsensusTxStatus>,
56 fastpath_certified: BTreeSet<ConsensusPosition>,
58 last_committed_leader_round: Option<Round>,
60}
61
62impl ConsensusTxStatusCache {
63 pub(crate) fn new(consensus_gc_depth: Round) -> Self {
64 assert!(
65 consensus_gc_depth < CONSENSUS_STATUS_RETENTION_ROUNDS,
66 "{} vs {}",
67 consensus_gc_depth,
68 CONSENSUS_STATUS_RETENTION_ROUNDS
69 );
70 let (last_committed_leader_round_tx, last_committed_leader_round_rx) = watch::channel(None);
71 Self {
72 consensus_gc_depth,
73 inner: Default::default(),
74 status_notify_read: Default::default(),
75 last_committed_leader_round_tx,
76 last_committed_leader_round_rx,
77 }
78 }
79
80 pub(crate) fn set_transaction_status(&self, pos: ConsensusPosition, status: ConsensusTxStatus) {
81 if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
82 && pos.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= last_committed_leader_round
83 {
84 return;
86 }
87
88 let mut inner = self.inner.write();
89 self.set_transaction_status_inner(&mut inner, pos, status);
90 }
91
92 fn set_transaction_status_inner(
93 &self,
94 inner: &mut RwLockWriteGuard<Inner>,
95 pos: ConsensusPosition,
96 status: ConsensusTxStatus,
97 ) {
98 let status_entry = inner.transaction_status.entry(pos);
101 match status_entry {
102 Entry::Vacant(entry) => {
103 entry.insert(status);
105 if status == ConsensusTxStatus::FastpathCertified {
106 assert!(inner.fastpath_certified.insert(pos));
108 }
109 }
110 Entry::Occupied(mut entry) => {
111 let old_status = *entry.get();
112 match (old_status, status) {
113 (s1, s2) if s1 == s2 => return,
115 (ConsensusTxStatus::FastpathCertified, _) => {
117 entry.insert(status);
118 if old_status == ConsensusTxStatus::FastpathCertified {
119 assert!(inner.fastpath_certified.remove(&pos));
121 }
122 }
123 (
125 ConsensusTxStatus::Rejected | ConsensusTxStatus::Finalized,
126 ConsensusTxStatus::FastpathCertified,
127 ) => {
128 return;
129 }
130 _ => {
132 panic!(
133 "Conflicting status updates for transaction {:?}: {:?} -> {:?}",
134 pos, old_status, status
135 );
136 }
137 }
138 }
139 };
140
141 debug!("Transaction status is set for {}: {:?}", pos, status);
143 self.status_notify_read.notify(&pos, &status);
144 }
145
146 pub(crate) async fn notify_read_transaction_status_change(
149 &self,
150 consensus_position: ConsensusPosition,
151 old_status: Option<ConsensusTxStatus>,
152 ) -> NotifyReadConsensusTxStatusResult {
153 let registration = self.status_notify_read.register_one(&consensus_position);
156 let mut round_rx = self.last_committed_leader_round_rx.clone();
157 {
158 let inner = self.inner.read();
159 if let Some(status) = inner.transaction_status.get(&consensus_position)
160 && Some(status) != old_status.as_ref()
161 {
162 if let Some(old_status) = old_status {
163 assert_eq!(old_status, ConsensusTxStatus::FastpathCertified);
166 }
167 return NotifyReadConsensusTxStatusResult::Status(*status);
168 }
169 }
171
172 let expiration_check = async {
173 loop {
174 if let Some(last_committed_leader_round) = *round_rx.borrow()
175 && consensus_position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
176 <= last_committed_leader_round
177 {
178 return last_committed_leader_round;
179 }
180 round_rx
182 .changed()
183 .await
184 .expect("last_committed_leader_round watch channel closed unexpectedly");
185 }
186 };
187 tokio::select! {
188 status = registration => NotifyReadConsensusTxStatusResult::Status(status),
189 last_committed_leader_round = expiration_check => NotifyReadConsensusTxStatusResult::Expired(last_committed_leader_round),
190 }
191 }
192
193 pub(crate) async fn update_last_committed_leader_round(
194 &self,
195 last_committed_leader_round: u32,
196 ) {
197 debug!(
198 "Updating last committed leader round: {}",
199 last_committed_leader_round
200 );
201
202 let mut inner = self.inner.write();
203
204 let Some(leader_round) = inner
209 .last_committed_leader_round
210 .replace(last_committed_leader_round)
211 else {
212 return;
214 };
215
216 while let Some((position, _)) = inner.transaction_status.first_key_value() {
218 if position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= leader_round {
219 let (pos, status) = inner.transaction_status.pop_first().unwrap();
220 if status == ConsensusTxStatus::FastpathCertified {
222 assert!(inner.fastpath_certified.remove(&pos));
223 }
224 } else {
225 break;
226 }
227 }
228
229 while let Some(position) = inner.fastpath_certified.first().cloned() {
235 if position.block.round + self.consensus_gc_depth <= leader_round {
236 self.set_transaction_status_inner(
238 &mut inner,
239 position,
240 ConsensusTxStatus::Rejected,
241 );
242 } else {
243 break;
244 }
245 }
246
247 let _ = self.last_committed_leader_round_tx.send(Some(leader_round));
249 }
250
251 pub(crate) fn get_last_committed_leader_round(&self) -> Option<u32> {
252 *self.last_committed_leader_round_rx.borrow()
253 }
254
255 pub(crate) fn get_num_fastpath_certified(&self) -> usize {
256 self.inner.read().fastpath_certified.len()
257 }
258
259 pub(crate) fn check_position_too_ahead(&self, position: &ConsensusPosition) -> SuiResult<()> {
261 if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
262 && position.block.round
263 > last_committed_leader_round + CONSENSUS_STATUS_RETENTION_ROUNDS
264 {
265 return Err(SuiErrorKind::ValidatorConsensusLagging {
266 round: position.block.round,
267 last_committed_round: last_committed_leader_round,
268 }
269 .into());
270 }
271 Ok(())
272 }
273
274 #[cfg(test)]
275 pub(crate) fn get_transaction_status(
276 &self,
277 position: &ConsensusPosition,
278 ) -> Option<ConsensusTxStatus> {
279 let inner = self.inner.read();
280 inner.transaction_status.get(position).cloned()
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use std::{sync::Arc, time::Duration};
287
288 use super::*;
289 use consensus_types::block::{BlockRef, TransactionIndex};
290
291 fn create_test_tx_position(round: u64, index: u64) -> ConsensusPosition {
292 ConsensusPosition {
293 epoch: Default::default(),
294 block: BlockRef {
295 round: round as u32,
296 author: Default::default(),
297 digest: Default::default(),
298 },
299 index: index as TransactionIndex,
300 }
301 }
302
303 #[tokio::test]
304 async fn test_set_and_get_transaction_status() {
305 let cache = ConsensusTxStatusCache::new(60);
306 let tx_pos = create_test_tx_position(1, 0);
307
308 cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
310
311 let result = cache
313 .notify_read_transaction_status_change(tx_pos, None)
314 .await;
315 assert!(matches!(
316 result,
317 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::FastpathCertified)
318 ));
319 }
320
321 #[tokio::test]
322 async fn test_status_notification() {
323 let cache = Arc::new(ConsensusTxStatusCache::new(60));
324 let tx_pos = create_test_tx_position(1, 0);
325
326 let cache_clone = cache.clone();
328 let handle = tokio::spawn(async move {
329 cache_clone
330 .notify_read_transaction_status_change(tx_pos, None)
331 .await
332 });
333
334 tokio::time::sleep(Duration::from_millis(10)).await;
336
337 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
339
340 let result = handle.await.unwrap();
342 assert!(matches!(
343 result,
344 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
345 ));
346 }
347
348 #[tokio::test]
349 async fn test_round_expiration() {
350 let cache = ConsensusTxStatusCache::new(60);
351 let tx_pos = create_test_tx_position(1, 0);
352
353 cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
355
356 cache
358 .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 1)
359 .await;
360
361 cache
364 .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
365 .await;
366
367 let result = cache
369 .notify_read_transaction_status_change(tx_pos, None)
370 .await;
371 assert!(matches!(
372 result,
373 NotifyReadConsensusTxStatusResult::Expired(_)
374 ));
375 }
376
377 #[tokio::test]
378 async fn test_multiple_status_updates() {
379 let cache = ConsensusTxStatusCache::new(60);
380 let tx_pos = create_test_tx_position(1, 0);
381
382 cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
384
385 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
387
388 let result = cache
390 .notify_read_transaction_status_change(
391 tx_pos,
392 Some(ConsensusTxStatus::FastpathCertified),
393 )
394 .await;
395 assert!(matches!(
396 result,
397 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
398 ));
399 }
400
401 #[tokio::test]
402 async fn test_cleanup_expired_rounds() {
403 let cache = ConsensusTxStatusCache::new(60);
404
405 for round in 1..=5 {
407 let tx_pos = create_test_tx_position(round, 0);
408 cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
409 }
410
411 cache
413 .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
414 .await;
415
416 {
418 let inner = cache.inner.read();
419 let rounds = inner
420 .transaction_status
421 .keys()
422 .map(|p| p.block.round)
423 .collect::<Vec<_>>();
424 assert_eq!(rounds, vec![1, 2, 3, 4, 5]);
425 }
426
427 cache
430 .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 3)
431 .await;
432
433 {
435 let inner = cache.inner.read();
436 let rounds = inner
437 .transaction_status
438 .keys()
439 .map(|p| p.block.round)
440 .collect::<Vec<_>>();
441 assert_eq!(rounds, vec![3, 4, 5]);
442 }
443
444 cache
447 .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 4)
448 .await;
449
450 {
452 let inner = cache.inner.read();
453 let rounds = inner
454 .transaction_status
455 .keys()
456 .map(|p| p.block.round)
457 .collect::<Vec<_>>();
458 assert_eq!(rounds, vec![4, 5]);
459 }
460 }
461
462 #[tokio::test]
463 async fn test_concurrent_operations() {
464 let cache = Arc::new(ConsensusTxStatusCache::new(60));
465 let tx_pos = create_test_tx_position(1, 0);
466
467 let mut handles = vec![];
469 for _ in 0..3 {
470 let cache_clone = cache.clone();
471 handles.push(tokio::spawn(async move {
472 cache_clone
473 .notify_read_transaction_status_change(tx_pos, None)
474 .await
475 }));
476 }
477
478 tokio::time::sleep(Duration::from_millis(10)).await;
480
481 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
483
484 for handle in handles {
486 let result = handle.await.unwrap();
487 assert!(matches!(
488 result,
489 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
490 ));
491 }
492 }
493
494 #[tokio::test]
495 async fn test_out_of_order_status_updates() {
496 let cache = Arc::new(ConsensusTxStatusCache::new(60));
497 let tx_pos = create_test_tx_position(1, 0);
498
499 cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
501 let result = cache
502 .notify_read_transaction_status_change(tx_pos, None)
503 .await;
504 assert!(matches!(
505 result,
506 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
507 ));
508
509 let cache_clone = cache.clone();
510 let notify_read_task = tokio::spawn(async move {
511 cache_clone
512 .notify_read_transaction_status_change(tx_pos, Some(ConsensusTxStatus::Finalized))
513 .await
514 });
515
516 cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
518 let result = tokio::time::timeout(Duration::from_secs(3), notify_read_task).await;
519 assert!(result.is_err());
520 assert_eq!(
521 cache.get_transaction_status(&tx_pos),
522 Some(ConsensusTxStatus::Finalized)
523 );
524 }
525
526 #[tokio::test]
527 async fn test_fastpath_certified_tracking() {
528 let cache = Arc::new(ConsensusTxStatusCache::new(60));
529
530 assert_eq!(cache.get_num_fastpath_certified(), 0);
532
533 let tx_pos1 = create_test_tx_position(100, 0);
535 let tx_pos2 = create_test_tx_position(100, 1);
536 let tx_pos3 = create_test_tx_position(101, 2);
537 let tx_pos4 = create_test_tx_position(102, 3);
538
539 cache.set_transaction_status(tx_pos1, ConsensusTxStatus::FastpathCertified);
540 assert_eq!(cache.get_num_fastpath_certified(), 1);
541
542 cache.set_transaction_status(tx_pos2, ConsensusTxStatus::FastpathCertified);
543 assert_eq!(cache.get_num_fastpath_certified(), 2);
544
545 cache.set_transaction_status(tx_pos3, ConsensusTxStatus::FastpathCertified);
546 assert_eq!(cache.get_num_fastpath_certified(), 3);
547
548 cache.set_transaction_status(tx_pos4, ConsensusTxStatus::FastpathCertified);
549 assert_eq!(cache.get_num_fastpath_certified(), 4);
550
551 let tx_pos5 = create_test_tx_position(103, 4);
553 cache.set_transaction_status(tx_pos5, ConsensusTxStatus::Finalized);
554 assert_eq!(cache.get_num_fastpath_certified(), 4);
555
556 cache.set_transaction_status(tx_pos1, ConsensusTxStatus::Finalized);
558 assert_eq!(cache.get_num_fastpath_certified(), 3);
559 assert_eq!(
560 cache.get_transaction_status(&tx_pos1),
561 Some(ConsensusTxStatus::Finalized)
562 );
563
564 cache.set_transaction_status(tx_pos2, ConsensusTxStatus::Rejected);
566 assert_eq!(cache.get_num_fastpath_certified(), 2);
567 assert_eq!(
568 cache.get_transaction_status(&tx_pos2),
569 Some(ConsensusTxStatus::Rejected)
570 );
571
572 cache.update_last_committed_leader_round(160).await;
578 assert_eq!(cache.get_num_fastpath_certified(), 2);
579 assert_eq!(
580 cache.get_transaction_status(&tx_pos3),
581 Some(ConsensusTxStatus::FastpathCertified)
582 );
583 assert_eq!(
584 cache.get_transaction_status(&tx_pos4),
585 Some(ConsensusTxStatus::FastpathCertified)
586 );
587
588 cache.update_last_committed_leader_round(161).await;
591 assert_eq!(cache.get_num_fastpath_certified(), 2);
592 assert_eq!(
593 cache.get_transaction_status(&tx_pos3),
594 Some(ConsensusTxStatus::FastpathCertified)
595 );
596 assert_eq!(
597 cache.get_transaction_status(&tx_pos4),
598 Some(ConsensusTxStatus::FastpathCertified)
599 );
600
601 cache.update_last_committed_leader_round(162).await;
605 assert_eq!(cache.get_num_fastpath_certified(), 1);
606 assert_eq!(
607 cache.get_transaction_status(&tx_pos3),
608 Some(ConsensusTxStatus::Rejected)
609 );
610 assert_eq!(
611 cache.get_transaction_status(&tx_pos4),
612 Some(ConsensusTxStatus::FastpathCertified)
613 );
614
615 cache.update_last_committed_leader_round(163).await;
618 assert_eq!(cache.get_num_fastpath_certified(), 0);
619 assert_eq!(
620 cache.get_transaction_status(&tx_pos4),
621 Some(ConsensusTxStatus::Rejected)
622 );
623
624 let tx_pos6 = create_test_tx_position(200, 5);
626 cache.set_transaction_status(tx_pos6, ConsensusTxStatus::Finalized);
627 assert_eq!(cache.get_num_fastpath_certified(), 0);
628
629 cache.set_transaction_status(tx_pos6, ConsensusTxStatus::FastpathCertified);
631 assert_eq!(cache.get_num_fastpath_certified(), 0);
632 assert_eq!(
633 cache.get_transaction_status(&tx_pos6),
634 Some(ConsensusTxStatus::Finalized)
635 );
636 }
637}