sui_core/authority/
consensus_tx_status_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{BTreeMap, BTreeSet, btree_map::Entry};
5
6use consensus_types::block::Round;
7use mysten_common::sync::notify_read::NotifyRead;
8use parking_lot::{RwLock, RwLockWriteGuard};
9use sui_types::{
10    error::{SuiErrorKind, SuiResult},
11    messages_consensus::ConsensusPosition,
12};
13use tokio::sync::watch;
14use tracing::debug;
15
16/// The number of consensus rounds to retain transaction status information before garbage collection.
17/// Used to expire positions from old rounds, as well as to check if a transaction is too far ahead of the last committed round.
18/// Assuming a max round rate of 15/sec, this allows status updates to be valid within a window of ~25-30 seconds.
19pub(crate) const CONSENSUS_STATUS_RETENTION_ROUNDS: u32 = 400;
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22pub(crate) enum ConsensusTxStatus {
23    // Transaction is voted to accept by a quorum of validators on fastpath.
24    FastpathCertified,
25    // Transaction is rejected, either by a quorum of validators or indirectly post-commit.
26    Rejected,
27    // Transaction is finalized post commit.
28    Finalized,
29}
30
31#[derive(Debug, Clone)]
32pub(crate) enum NotifyReadConsensusTxStatusResult {
33    // The consensus position to be read has been updated with a new status.
34    Status(ConsensusTxStatus),
35    // The consensus position to be read has expired.
36    // Provided with the last committed round that was used to check for expiration.
37    Expired(u32),
38}
39
40pub(crate) struct ConsensusTxStatusCache {
41    // GC depth in consensus.
42    consensus_gc_depth: u32,
43
44    inner: RwLock<Inner>,
45
46    status_notify_read: NotifyRead<ConsensusPosition, ConsensusTxStatus>,
47    /// Watch channel for last committed leader round updates
48    last_committed_leader_round_tx: watch::Sender<Option<u32>>,
49    last_committed_leader_round_rx: watch::Receiver<Option<u32>>,
50}
51
52#[derive(Default)]
53struct Inner {
54    /// A map of transaction position to its status from consensus.
55    transaction_status: BTreeMap<ConsensusPosition, ConsensusTxStatus>,
56    /// Consensus positions that are currently in the fastpath certified state.
57    fastpath_certified: BTreeSet<ConsensusPosition>,
58    /// The last leader round updated in update_last_committed_leader_round().
59    last_committed_leader_round: Option<Round>,
60}
61
62impl ConsensusTxStatusCache {
63    pub(crate) fn new(consensus_gc_depth: Round) -> Self {
64        assert!(
65            consensus_gc_depth < CONSENSUS_STATUS_RETENTION_ROUNDS,
66            "{} vs {}",
67            consensus_gc_depth,
68            CONSENSUS_STATUS_RETENTION_ROUNDS
69        );
70        let (last_committed_leader_round_tx, last_committed_leader_round_rx) = watch::channel(None);
71        Self {
72            consensus_gc_depth,
73            inner: Default::default(),
74            status_notify_read: Default::default(),
75            last_committed_leader_round_tx,
76            last_committed_leader_round_rx,
77        }
78    }
79
80    pub(crate) fn set_transaction_status(&self, pos: ConsensusPosition, status: ConsensusTxStatus) {
81        if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
82            && pos.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= last_committed_leader_round
83        {
84            // Ignore stale status updates.
85            return;
86        }
87
88        let mut inner = self.inner.write();
89        self.set_transaction_status_inner(&mut inner, pos, status);
90    }
91
92    fn set_transaction_status_inner(
93        &self,
94        inner: &mut RwLockWriteGuard<Inner>,
95        pos: ConsensusPosition,
96        status: ConsensusTxStatus,
97    ) {
98        // Calls to set_transaction_status are async and can be out of order.
99        // Makes sure this is tolerated by handling state transitions properly.
100        let status_entry = inner.transaction_status.entry(pos);
101        match status_entry {
102            Entry::Vacant(entry) => {
103                // Set the status for the first time.
104                entry.insert(status);
105                if status == ConsensusTxStatus::FastpathCertified {
106                    // Only path where a status can be set to fastpath certified.
107                    assert!(inner.fastpath_certified.insert(pos));
108                }
109            }
110            Entry::Occupied(mut entry) => {
111                let old_status = *entry.get();
112                match (old_status, status) {
113                    // If the statuses are the same, no update is needed.
114                    (s1, s2) if s1 == s2 => return,
115                    // FastpathCertified is transient and can be updated to other statuses.
116                    (ConsensusTxStatus::FastpathCertified, _) => {
117                        entry.insert(status);
118                        if old_status == ConsensusTxStatus::FastpathCertified {
119                            // Only path where a status can transition out of fastpath certified.
120                            assert!(inner.fastpath_certified.remove(&pos));
121                        }
122                    }
123                    // This happens when statuses arrive out-of-order, and is a no-op.
124                    (
125                        ConsensusTxStatus::Rejected | ConsensusTxStatus::Finalized,
126                        ConsensusTxStatus::FastpathCertified,
127                    ) => {
128                        return;
129                    }
130                    // Transitions between terminal statuses are invalid.
131                    _ => {
132                        panic!(
133                            "Conflicting status updates for transaction {:?}: {:?} -> {:?}",
134                            pos, old_status, status
135                        );
136                    }
137                }
138            }
139        };
140
141        // All code paths leading to here should have set the status.
142        debug!("Transaction status is set for {}: {:?}", pos, status);
143        self.status_notify_read.notify(&pos, &status);
144    }
145
146    /// Given a known previous status provided by `old_status`, this function will return a new
147    /// status once the transaction status has changed, or if the consensus position has expired.
148    pub(crate) async fn notify_read_transaction_status_change(
149        &self,
150        consensus_position: ConsensusPosition,
151        old_status: Option<ConsensusTxStatus>,
152    ) -> NotifyReadConsensusTxStatusResult {
153        // TODO(fastpath): We should track the typical distance between the last committed round
154        // and the requested round notified as metrics.
155        let registration = self.status_notify_read.register_one(&consensus_position);
156        let mut round_rx = self.last_committed_leader_round_rx.clone();
157        {
158            let inner = self.inner.read();
159            if let Some(status) = inner.transaction_status.get(&consensus_position)
160                && Some(status) != old_status.as_ref()
161            {
162                if let Some(old_status) = old_status {
163                    // The only scenario where the status may change, is when the transaction
164                    // is initially fastpath certified, and then later finalized or rejected.
165                    assert_eq!(old_status, ConsensusTxStatus::FastpathCertified);
166                }
167                return NotifyReadConsensusTxStatusResult::Status(*status);
168            }
169            // Inner read lock dropped here.
170        }
171
172        let expiration_check = async {
173            loop {
174                if let Some(last_committed_leader_round) = *round_rx.borrow()
175                    && consensus_position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
176                        <= last_committed_leader_round
177                {
178                    return last_committed_leader_round;
179                }
180                // Channel closed - this should never happen in practice, so panic
181                round_rx
182                    .changed()
183                    .await
184                    .expect("last_committed_leader_round watch channel closed unexpectedly");
185            }
186        };
187        tokio::select! {
188            status = registration => NotifyReadConsensusTxStatusResult::Status(status),
189            last_committed_leader_round = expiration_check => NotifyReadConsensusTxStatusResult::Expired(last_committed_leader_round),
190        }
191    }
192
193    pub(crate) async fn update_last_committed_leader_round(
194        &self,
195        last_committed_leader_round: u32,
196    ) {
197        debug!(
198            "Updating last committed leader round: {}",
199            last_committed_leader_round
200        );
201
202        let mut inner = self.inner.write();
203
204        // Consensus only bumps GC round after generating a commit. So if we expire and GC transactions
205        // based on the latest committed leader round, we may expire transactions in the current commit, or
206        // make these transactions' statuses very short lived.
207        // So we only expire and GC transactions with the previous committed leader round.
208        let Some(leader_round) = inner
209            .last_committed_leader_round
210            .replace(last_committed_leader_round)
211        else {
212            // This is the first update. Do not expire or GC any transactions.
213            return;
214        };
215
216        // Remove transactions that are expired.
217        while let Some((position, _)) = inner.transaction_status.first_key_value() {
218            if position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= leader_round {
219                let (pos, status) = inner.transaction_status.pop_first().unwrap();
220                // Ensure the transaction is not in the fastpath certified set.
221                if status == ConsensusTxStatus::FastpathCertified {
222                    assert!(inner.fastpath_certified.remove(&pos));
223                }
224            } else {
225                break;
226            }
227        }
228
229        // GC fastpath certified transactions.
230        // In theory, notify_read_transaction_status_change() could return `Rejected` status directly
231        // to waiters on GC'ed transactions.
232        // But it is necessary to track the number of fastpath certified status anyway for end of epoch.
233        // So rejecting every fastpath certified transaction here.
234        while let Some(position) = inner.fastpath_certified.first().cloned() {
235            if position.block.round + self.consensus_gc_depth <= leader_round {
236                // Reject GC'ed transactions that were previously fastpath certified.
237                self.set_transaction_status_inner(
238                    &mut inner,
239                    position,
240                    ConsensusTxStatus::Rejected,
241                );
242            } else {
243                break;
244            }
245        }
246
247        // Send update through watch channel.
248        let _ = self.last_committed_leader_round_tx.send(Some(leader_round));
249    }
250
251    pub(crate) fn get_last_committed_leader_round(&self) -> Option<u32> {
252        *self.last_committed_leader_round_rx.borrow()
253    }
254
255    pub(crate) fn get_num_fastpath_certified(&self) -> usize {
256        self.inner.read().fastpath_certified.len()
257    }
258
259    /// Returns true if the position is too far ahead of the last committed round.
260    pub(crate) fn check_position_too_ahead(&self, position: &ConsensusPosition) -> SuiResult<()> {
261        if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
262            && position.block.round
263                > last_committed_leader_round + CONSENSUS_STATUS_RETENTION_ROUNDS
264        {
265            return Err(SuiErrorKind::ValidatorConsensusLagging {
266                round: position.block.round,
267                last_committed_round: last_committed_leader_round,
268            }
269            .into());
270        }
271        Ok(())
272    }
273
274    #[cfg(test)]
275    pub(crate) fn get_transaction_status(
276        &self,
277        position: &ConsensusPosition,
278    ) -> Option<ConsensusTxStatus> {
279        let inner = self.inner.read();
280        inner.transaction_status.get(position).cloned()
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use std::{sync::Arc, time::Duration};
287
288    use super::*;
289    use consensus_types::block::{BlockRef, TransactionIndex};
290
291    fn create_test_tx_position(round: u64, index: u64) -> ConsensusPosition {
292        ConsensusPosition {
293            epoch: Default::default(),
294            block: BlockRef {
295                round: round as u32,
296                author: Default::default(),
297                digest: Default::default(),
298            },
299            index: index as TransactionIndex,
300        }
301    }
302
303    #[tokio::test]
304    async fn test_set_and_get_transaction_status() {
305        let cache = ConsensusTxStatusCache::new(60);
306        let tx_pos = create_test_tx_position(1, 0);
307
308        // Set initial status
309        cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
310
311        // Read status immediately
312        let result = cache
313            .notify_read_transaction_status_change(tx_pos, None)
314            .await;
315        assert!(matches!(
316            result,
317            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::FastpathCertified)
318        ));
319    }
320
321    #[tokio::test]
322    async fn test_status_notification() {
323        let cache = Arc::new(ConsensusTxStatusCache::new(60));
324        let tx_pos = create_test_tx_position(1, 0);
325
326        // Spawn a task that waits for status update
327        let cache_clone = cache.clone();
328        let handle = tokio::spawn(async move {
329            cache_clone
330                .notify_read_transaction_status_change(tx_pos, None)
331                .await
332        });
333
334        // Small delay to ensure the task is waiting
335        tokio::time::sleep(Duration::from_millis(10)).await;
336
337        // Set the status
338        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
339
340        // Verify the notification was received
341        let result = handle.await.unwrap();
342        assert!(matches!(
343            result,
344            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
345        ));
346    }
347
348    #[tokio::test]
349    async fn test_round_expiration() {
350        let cache = ConsensusTxStatusCache::new(60);
351        let tx_pos = create_test_tx_position(1, 0);
352
353        // Set initial status
354        cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
355
356        // Set initial leader round which doesn't GC anything.
357        cache
358            .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 1)
359            .await;
360
361        // Update with round that will trigger GC using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 1)
362        // This will expire transactions up to and including round 1
363        cache
364            .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
365            .await;
366
367        // Try to read status - should be expired
368        let result = cache
369            .notify_read_transaction_status_change(tx_pos, None)
370            .await;
371        assert!(matches!(
372            result,
373            NotifyReadConsensusTxStatusResult::Expired(_)
374        ));
375    }
376
377    #[tokio::test]
378    async fn test_multiple_status_updates() {
379        let cache = ConsensusTxStatusCache::new(60);
380        let tx_pos = create_test_tx_position(1, 0);
381
382        // Set initial status
383        cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
384
385        // Update status
386        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
387
388        // Read with old status
389        let result = cache
390            .notify_read_transaction_status_change(
391                tx_pos,
392                Some(ConsensusTxStatus::FastpathCertified),
393            )
394            .await;
395        assert!(matches!(
396            result,
397            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
398        ));
399    }
400
401    #[tokio::test]
402    async fn test_cleanup_expired_rounds() {
403        let cache = ConsensusTxStatusCache::new(60);
404
405        // Add transactions for multiple rounds
406        for round in 1..=5 {
407            let tx_pos = create_test_tx_position(round, 0);
408            cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
409        }
410
411        // Set initial leader round which doesn't GC anything.
412        cache
413            .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
414            .await;
415
416        // No rounds should be cleaned up yet since this was the initial update
417        {
418            let inner = cache.inner.read();
419            let rounds = inner
420                .transaction_status
421                .keys()
422                .map(|p| p.block.round)
423                .collect::<Vec<_>>();
424            assert_eq!(rounds, vec![1, 2, 3, 4, 5]);
425        }
426
427        // Update that triggers GC using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
428        // This will expire transactions up to and including round 2
429        cache
430            .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 3)
431            .await;
432
433        // Verify rounds 1-2 are cleaned up, 3-5 remain
434        {
435            let inner = cache.inner.read();
436            let rounds = inner
437                .transaction_status
438                .keys()
439                .map(|p| p.block.round)
440                .collect::<Vec<_>>();
441            assert_eq!(rounds, vec![3, 4, 5]);
442        }
443
444        // Another update using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 3) for GC
445        // This will expire transactions up to and including round 3
446        cache
447            .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 4)
448            .await;
449
450        // Verify rounds 1-3 are cleaned up, 4-5 remain
451        {
452            let inner = cache.inner.read();
453            let rounds = inner
454                .transaction_status
455                .keys()
456                .map(|p| p.block.round)
457                .collect::<Vec<_>>();
458            assert_eq!(rounds, vec![4, 5]);
459        }
460    }
461
462    #[tokio::test]
463    async fn test_concurrent_operations() {
464        let cache = Arc::new(ConsensusTxStatusCache::new(60));
465        let tx_pos = create_test_tx_position(1, 0);
466
467        // Spawn multiple tasks that wait for status
468        let mut handles = vec![];
469        for _ in 0..3 {
470            let cache_clone = cache.clone();
471            handles.push(tokio::spawn(async move {
472                cache_clone
473                    .notify_read_transaction_status_change(tx_pos, None)
474                    .await
475            }));
476        }
477
478        // Small delay to ensure tasks are waiting
479        tokio::time::sleep(Duration::from_millis(10)).await;
480
481        // Set the status
482        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
483
484        // Verify all notifications were received
485        for handle in handles {
486            let result = handle.await.unwrap();
487            assert!(matches!(
488                result,
489                NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
490            ));
491        }
492    }
493
494    #[tokio::test]
495    async fn test_out_of_order_status_updates() {
496        let cache = Arc::new(ConsensusTxStatusCache::new(60));
497        let tx_pos = create_test_tx_position(1, 0);
498
499        // First update status to Finalized.
500        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
501        let result = cache
502            .notify_read_transaction_status_change(tx_pos, None)
503            .await;
504        assert!(matches!(
505            result,
506            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
507        ));
508
509        let cache_clone = cache.clone();
510        let notify_read_task = tokio::spawn(async move {
511            cache_clone
512                .notify_read_transaction_status_change(tx_pos, Some(ConsensusTxStatus::Finalized))
513                .await
514        });
515
516        // We should never receive a new status update since the new status is older than the old status.
517        cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
518        let result = tokio::time::timeout(Duration::from_secs(3), notify_read_task).await;
519        assert!(result.is_err());
520        assert_eq!(
521            cache.get_transaction_status(&tx_pos),
522            Some(ConsensusTxStatus::Finalized)
523        );
524    }
525
526    #[tokio::test]
527    async fn test_fastpath_certified_tracking() {
528        let cache = Arc::new(ConsensusTxStatusCache::new(60));
529
530        // Initially, no fastpath certified transactions
531        assert_eq!(cache.get_num_fastpath_certified(), 0);
532
533        // Add fastpath certified transactions
534        let tx_pos1 = create_test_tx_position(100, 0);
535        let tx_pos2 = create_test_tx_position(100, 1);
536        let tx_pos3 = create_test_tx_position(101, 2);
537        let tx_pos4 = create_test_tx_position(102, 3);
538
539        cache.set_transaction_status(tx_pos1, ConsensusTxStatus::FastpathCertified);
540        assert_eq!(cache.get_num_fastpath_certified(), 1);
541
542        cache.set_transaction_status(tx_pos2, ConsensusTxStatus::FastpathCertified);
543        assert_eq!(cache.get_num_fastpath_certified(), 2);
544
545        cache.set_transaction_status(tx_pos3, ConsensusTxStatus::FastpathCertified);
546        assert_eq!(cache.get_num_fastpath_certified(), 3);
547
548        cache.set_transaction_status(tx_pos4, ConsensusTxStatus::FastpathCertified);
549        assert_eq!(cache.get_num_fastpath_certified(), 4);
550
551        // Add a non-fastpath certified transaction
552        let tx_pos5 = create_test_tx_position(103, 4);
553        cache.set_transaction_status(tx_pos5, ConsensusTxStatus::Finalized);
554        assert_eq!(cache.get_num_fastpath_certified(), 4);
555
556        // Transition one fastpath certified to finalized
557        cache.set_transaction_status(tx_pos1, ConsensusTxStatus::Finalized);
558        assert_eq!(cache.get_num_fastpath_certified(), 3);
559        assert_eq!(
560            cache.get_transaction_status(&tx_pos1),
561            Some(ConsensusTxStatus::Finalized)
562        );
563
564        // Transition another fastpath certified to rejected
565        cache.set_transaction_status(tx_pos2, ConsensusTxStatus::Rejected);
566        assert_eq!(cache.get_num_fastpath_certified(), 2);
567        assert_eq!(
568            cache.get_transaction_status(&tx_pos2),
569            Some(ConsensusTxStatus::Rejected)
570        );
571
572        // Test GC of fastpath certified transactions
573        // tx_pos3 is at round 101, with gc_depth=60, it will be GC'd when prev leader round >= 161
574        // tx_pos4 is at round 102, with gc_depth=60, it will be GC'd when prev leader round >= 162
575
576        // Set initial leader round which doesn't GC anything.
577        cache.update_last_committed_leader_round(160).await;
578        assert_eq!(cache.get_num_fastpath_certified(), 2);
579        assert_eq!(
580            cache.get_transaction_status(&tx_pos3),
581            Some(ConsensusTxStatus::FastpathCertified)
582        );
583        assert_eq!(
584            cache.get_transaction_status(&tx_pos4),
585            Some(ConsensusTxStatus::FastpathCertified)
586        );
587
588        // Update to 161: uses 160 for GC
589        // tx_pos3: 101 + 60 = 161, 161 <= 160 is false, so NOT GC'd yet
590        cache.update_last_committed_leader_round(161).await;
591        assert_eq!(cache.get_num_fastpath_certified(), 2);
592        assert_eq!(
593            cache.get_transaction_status(&tx_pos3),
594            Some(ConsensusTxStatus::FastpathCertified)
595        );
596        assert_eq!(
597            cache.get_transaction_status(&tx_pos4),
598            Some(ConsensusTxStatus::FastpathCertified)
599        );
600
601        // Update to 162: uses 161 for GC
602        // tx_pos3: 101 + 60 = 161, 161 <= 161 is true, so GC'd
603        // tx_pos4: 102 + 60 = 162, 162 <= 161 is false, so NOT GC'd
604        cache.update_last_committed_leader_round(162).await;
605        assert_eq!(cache.get_num_fastpath_certified(), 1);
606        assert_eq!(
607            cache.get_transaction_status(&tx_pos3),
608            Some(ConsensusTxStatus::Rejected)
609        );
610        assert_eq!(
611            cache.get_transaction_status(&tx_pos4),
612            Some(ConsensusTxStatus::FastpathCertified)
613        );
614
615        // Update to 163: uses 162 for GC
616        // tx_pos4: 102 + 60 = 162, 162 <= 162 is true, so GC'd
617        cache.update_last_committed_leader_round(163).await;
618        assert_eq!(cache.get_num_fastpath_certified(), 0);
619        assert_eq!(
620            cache.get_transaction_status(&tx_pos4),
621            Some(ConsensusTxStatus::Rejected)
622        );
623
624        // Test that setting a transaction directly to non-fastpath doesn't affect count
625        let tx_pos6 = create_test_tx_position(200, 5);
626        cache.set_transaction_status(tx_pos6, ConsensusTxStatus::Finalized);
627        assert_eq!(cache.get_num_fastpath_certified(), 0);
628
629        // Can't transition from finalized back to fastpath certified
630        cache.set_transaction_status(tx_pos6, ConsensusTxStatus::FastpathCertified);
631        assert_eq!(cache.get_num_fastpath_certified(), 0);
632        assert_eq!(
633            cache.get_transaction_status(&tx_pos6),
634            Some(ConsensusTxStatus::Finalized)
635        );
636    }
637}