sui_core/authority/
consensus_tx_status_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{BTreeMap, BTreeSet, btree_map::Entry};
5
6use consensus_types::block::Round;
7use mysten_common::sync::notify_read::NotifyRead;
8use parking_lot::{RwLock, RwLockWriteGuard};
9use sui_types::{
10    error::{SuiErrorKind, SuiResult},
11    messages_consensus::ConsensusPosition,
12};
13use tokio::sync::watch;
14use tracing::debug;
15
16/// The number of consensus rounds to retain transaction status information before garbage collection.
17/// Used to expire positions from old rounds, as well as to check if a transaction is too far ahead of the last committed round.
18/// Assuming a max round rate of 15/sec, this allows status updates to be valid within a window of ~25-30 seconds.
19pub(crate) const CONSENSUS_STATUS_RETENTION_ROUNDS: u32 = 400;
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22pub(crate) enum ConsensusTxStatus {
23    // Transaction is voted to accept by a quorum of validators on fastpath.
24    FastpathCertified,
25    // Transaction is rejected, either by a quorum of validators or indirectly post-commit.
26    Rejected,
27    // Transaction is finalized post commit.
28    Finalized,
29    // Transaction is dropped post-consensus.
30    // This decision must be consistent across all validators.
31    //
32    // Currently, only invalid owned object inputs (using stale versions)
33    // can cause a transaction to be dropped without execution.
34    Dropped,
35}
36
37#[derive(Debug, Clone)]
38pub(crate) enum NotifyReadConsensusTxStatusResult {
39    // The consensus position to be read has been updated with a new status.
40    Status(ConsensusTxStatus),
41    // The consensus position to be read has expired.
42    // Provided with the last committed round that was used to check for expiration.
43    Expired(u32),
44}
45
46pub(crate) struct ConsensusTxStatusCache {
47    // GC depth in consensus.
48    consensus_gc_depth: u32,
49
50    inner: RwLock<Inner>,
51
52    status_notify_read: NotifyRead<ConsensusPosition, ConsensusTxStatus>,
53    /// Watch channel for last committed leader round updates
54    last_committed_leader_round_tx: watch::Sender<Option<u32>>,
55    last_committed_leader_round_rx: watch::Receiver<Option<u32>>,
56}
57
58#[derive(Default)]
59struct Inner {
60    /// A map of transaction position to its status from consensus.
61    transaction_status: BTreeMap<ConsensusPosition, ConsensusTxStatus>,
62    /// Consensus positions that are currently in the fastpath certified state.
63    fastpath_certified: BTreeSet<ConsensusPosition>,
64    /// The last leader round updated in update_last_committed_leader_round().
65    last_committed_leader_round: Option<Round>,
66}
67
68impl ConsensusTxStatusCache {
69    pub(crate) fn new(consensus_gc_depth: Round) -> Self {
70        assert!(
71            consensus_gc_depth < CONSENSUS_STATUS_RETENTION_ROUNDS,
72            "{} vs {}",
73            consensus_gc_depth,
74            CONSENSUS_STATUS_RETENTION_ROUNDS
75        );
76        let (last_committed_leader_round_tx, last_committed_leader_round_rx) = watch::channel(None);
77        Self {
78            consensus_gc_depth,
79            inner: Default::default(),
80            status_notify_read: Default::default(),
81            last_committed_leader_round_tx,
82            last_committed_leader_round_rx,
83        }
84    }
85
86    pub(crate) fn set_transaction_status(&self, pos: ConsensusPosition, status: ConsensusTxStatus) {
87        if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
88            && pos.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= last_committed_leader_round
89        {
90            // Ignore stale status updates.
91            return;
92        }
93
94        let mut inner = self.inner.write();
95        self.set_transaction_status_inner(&mut inner, pos, status);
96    }
97
98    fn set_transaction_status_inner(
99        &self,
100        inner: &mut RwLockWriteGuard<Inner>,
101        pos: ConsensusPosition,
102        status: ConsensusTxStatus,
103    ) {
104        // Calls to set_transaction_status are async and can be out of order.
105        // Makes sure this is tolerated by handling state transitions properly.
106        let status_entry = inner.transaction_status.entry(pos);
107        match status_entry {
108            Entry::Vacant(entry) => {
109                // Set the status for the first time.
110                entry.insert(status);
111                if status == ConsensusTxStatus::FastpathCertified {
112                    // Only path where a status can be set to fastpath certified.
113                    assert!(inner.fastpath_certified.insert(pos));
114                }
115            }
116            Entry::Occupied(mut entry) => {
117                let old_status = *entry.get();
118                match (old_status, status) {
119                    // If the statuses are the same, no update is needed.
120                    (s1, s2) if s1 == s2 => return,
121                    // FastpathCertified is transient and can be updated to other statuses.
122                    (ConsensusTxStatus::FastpathCertified, _) => {
123                        entry.insert(status);
124                        if old_status == ConsensusTxStatus::FastpathCertified {
125                            // Only path where a status can transition out of fastpath certified.
126                            assert!(inner.fastpath_certified.remove(&pos));
127                        }
128                    }
129                    // This happens when statuses arrive out-of-order, and is a no-op.
130                    (
131                        ConsensusTxStatus::Rejected
132                        | ConsensusTxStatus::Dropped
133                        | ConsensusTxStatus::Finalized,
134                        ConsensusTxStatus::FastpathCertified,
135                    ) => {
136                        return;
137                    }
138                    // Transitions between terminal statuses are invalid.
139                    _ => {
140                        panic!(
141                            "Conflicting status updates for transaction {:?}: {:?} -> {:?}",
142                            pos, old_status, status
143                        );
144                    }
145                }
146            }
147        };
148
149        // All code paths leading to here should have set the status.
150        debug!("Transaction status is set for {}: {:?}", pos, status);
151        self.status_notify_read.notify(&pos, &status);
152    }
153
154    /// Given a known previous status provided by `old_status`, this function will return a new
155    /// status once the transaction status has changed, or if the consensus position has expired.
156    pub(crate) async fn notify_read_transaction_status_change(
157        &self,
158        consensus_position: ConsensusPosition,
159        old_status: Option<ConsensusTxStatus>,
160    ) -> NotifyReadConsensusTxStatusResult {
161        // TODO(fastpath): We should track the typical distance between the last committed round
162        // and the requested round notified as metrics.
163        let registration = self.status_notify_read.register_one(&consensus_position);
164        let mut round_rx = self.last_committed_leader_round_rx.clone();
165        {
166            let inner = self.inner.read();
167            if let Some(status) = inner.transaction_status.get(&consensus_position)
168                && Some(status) != old_status.as_ref()
169            {
170                if let Some(old_status) = old_status {
171                    // The only scenario where the status may change, is when the transaction
172                    // is initially fastpath certified, and then later finalized or rejected.
173                    assert_eq!(old_status, ConsensusTxStatus::FastpathCertified);
174                }
175                return NotifyReadConsensusTxStatusResult::Status(*status);
176            }
177            // Inner read lock dropped here.
178        }
179
180        let expiration_check = async {
181            loop {
182                if let Some(last_committed_leader_round) = *round_rx.borrow()
183                    && consensus_position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
184                        <= last_committed_leader_round
185                {
186                    return last_committed_leader_round;
187                }
188                // Channel closed - this should never happen in practice, so panic
189                round_rx
190                    .changed()
191                    .await
192                    .expect("last_committed_leader_round watch channel closed unexpectedly");
193            }
194        };
195        tokio::select! {
196            status = registration => NotifyReadConsensusTxStatusResult::Status(status),
197            last_committed_leader_round = expiration_check => NotifyReadConsensusTxStatusResult::Expired(last_committed_leader_round),
198        }
199    }
200
201    pub(crate) async fn update_last_committed_leader_round(
202        &self,
203        last_committed_leader_round: u32,
204    ) {
205        debug!(
206            "Updating last committed leader round: {}",
207            last_committed_leader_round
208        );
209
210        let mut inner = self.inner.write();
211
212        // Consensus only bumps GC round after generating a commit. So if we expire and GC transactions
213        // based on the latest committed leader round, we may expire transactions in the current commit, or
214        // make these transactions' statuses very short lived.
215        // So we only expire and GC transactions with the previous committed leader round.
216        let Some(leader_round) = inner
217            .last_committed_leader_round
218            .replace(last_committed_leader_round)
219        else {
220            // This is the first update. Do not expire or GC any transactions.
221            return;
222        };
223
224        // Remove transactions that are expired.
225        while let Some((position, _)) = inner.transaction_status.first_key_value() {
226            if position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= leader_round {
227                let (pos, status) = inner.transaction_status.pop_first().unwrap();
228                // Ensure the transaction is not in the fastpath certified set.
229                if status == ConsensusTxStatus::FastpathCertified {
230                    assert!(inner.fastpath_certified.remove(&pos));
231                }
232            } else {
233                break;
234            }
235        }
236
237        // GC fastpath certified transactions.
238        // In theory, notify_read_transaction_status_change() could return `Rejected` status directly
239        // to waiters on GC'ed transactions.
240        // But it is necessary to track the number of fastpath certified status anyway for end of epoch.
241        // So rejecting every fastpath certified transaction here.
242        while let Some(position) = inner.fastpath_certified.first().cloned() {
243            if position.block.round + self.consensus_gc_depth <= leader_round {
244                // Reject GC'ed transactions that were previously fastpath certified.
245                self.set_transaction_status_inner(
246                    &mut inner,
247                    position,
248                    ConsensusTxStatus::Rejected,
249                );
250            } else {
251                break;
252            }
253        }
254
255        // Send update through watch channel.
256        let _ = self.last_committed_leader_round_tx.send(Some(leader_round));
257    }
258
259    pub(crate) fn get_last_committed_leader_round(&self) -> Option<u32> {
260        *self.last_committed_leader_round_rx.borrow()
261    }
262
263    pub(crate) fn get_num_fastpath_certified(&self) -> usize {
264        self.inner.read().fastpath_certified.len()
265    }
266
267    /// Returns true if the position is too far ahead of the last committed round.
268    pub(crate) fn check_position_too_ahead(&self, position: &ConsensusPosition) -> SuiResult<()> {
269        if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
270            && position.block.round
271                > last_committed_leader_round + CONSENSUS_STATUS_RETENTION_ROUNDS
272        {
273            return Err(SuiErrorKind::ValidatorConsensusLagging {
274                round: position.block.round,
275                last_committed_round: last_committed_leader_round,
276            }
277            .into());
278        }
279        Ok(())
280    }
281
282    #[cfg(test)]
283    pub(crate) fn get_transaction_status(
284        &self,
285        position: &ConsensusPosition,
286    ) -> Option<ConsensusTxStatus> {
287        let inner = self.inner.read();
288        inner.transaction_status.get(position).cloned()
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use std::{sync::Arc, time::Duration};
295
296    use super::*;
297    use consensus_types::block::{BlockRef, TransactionIndex};
298
299    fn create_test_tx_position(round: u64, index: u64) -> ConsensusPosition {
300        ConsensusPosition {
301            epoch: Default::default(),
302            block: BlockRef {
303                round: round as u32,
304                author: Default::default(),
305                digest: Default::default(),
306            },
307            index: index as TransactionIndex,
308        }
309    }
310
311    #[tokio::test]
312    async fn test_set_and_get_transaction_status() {
313        let cache = ConsensusTxStatusCache::new(60);
314        let tx_pos = create_test_tx_position(1, 0);
315
316        // Set initial status
317        cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
318
319        // Read status immediately
320        let result = cache
321            .notify_read_transaction_status_change(tx_pos, None)
322            .await;
323        assert!(matches!(
324            result,
325            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::FastpathCertified)
326        ));
327    }
328
329    #[tokio::test]
330    async fn test_status_notification() {
331        let cache = Arc::new(ConsensusTxStatusCache::new(60));
332        let tx_pos = create_test_tx_position(1, 0);
333
334        // Spawn a task that waits for status update
335        let cache_clone = cache.clone();
336        let handle = tokio::spawn(async move {
337            cache_clone
338                .notify_read_transaction_status_change(tx_pos, None)
339                .await
340        });
341
342        // Small delay to ensure the task is waiting
343        tokio::time::sleep(Duration::from_millis(10)).await;
344
345        // Set the status
346        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
347
348        // Verify the notification was received
349        let result = handle.await.unwrap();
350        assert!(matches!(
351            result,
352            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
353        ));
354    }
355
356    #[tokio::test]
357    async fn test_round_expiration() {
358        let cache = ConsensusTxStatusCache::new(60);
359        let tx_pos = create_test_tx_position(1, 0);
360
361        // Set initial status
362        cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
363
364        // Set initial leader round which doesn't GC anything.
365        cache
366            .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 1)
367            .await;
368
369        // Update with round that will trigger GC using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 1)
370        // This will expire transactions up to and including round 1
371        cache
372            .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
373            .await;
374
375        // Try to read status - should be expired
376        let result = cache
377            .notify_read_transaction_status_change(tx_pos, None)
378            .await;
379        assert!(matches!(
380            result,
381            NotifyReadConsensusTxStatusResult::Expired(_)
382        ));
383    }
384
385    #[tokio::test]
386    async fn test_multiple_status_updates() {
387        let cache = ConsensusTxStatusCache::new(60);
388        let tx_pos = create_test_tx_position(1, 0);
389
390        // Set initial status
391        cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
392
393        // Update status
394        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
395
396        // Read with old status
397        let result = cache
398            .notify_read_transaction_status_change(
399                tx_pos,
400                Some(ConsensusTxStatus::FastpathCertified),
401            )
402            .await;
403        assert!(matches!(
404            result,
405            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
406        ));
407    }
408
409    #[tokio::test]
410    async fn test_cleanup_expired_rounds() {
411        let cache = ConsensusTxStatusCache::new(60);
412
413        // Add transactions for multiple rounds
414        for round in 1..=5 {
415            let tx_pos = create_test_tx_position(round, 0);
416            cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
417        }
418
419        // Set initial leader round which doesn't GC anything.
420        cache
421            .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
422            .await;
423
424        // No rounds should be cleaned up yet since this was the initial update
425        {
426            let inner = cache.inner.read();
427            let rounds = inner
428                .transaction_status
429                .keys()
430                .map(|p| p.block.round)
431                .collect::<Vec<_>>();
432            assert_eq!(rounds, vec![1, 2, 3, 4, 5]);
433        }
434
435        // Update that triggers GC using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
436        // This will expire transactions up to and including round 2
437        cache
438            .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 3)
439            .await;
440
441        // Verify rounds 1-2 are cleaned up, 3-5 remain
442        {
443            let inner = cache.inner.read();
444            let rounds = inner
445                .transaction_status
446                .keys()
447                .map(|p| p.block.round)
448                .collect::<Vec<_>>();
449            assert_eq!(rounds, vec![3, 4, 5]);
450        }
451
452        // Another update using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 3) for GC
453        // This will expire transactions up to and including round 3
454        cache
455            .update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 4)
456            .await;
457
458        // Verify rounds 1-3 are cleaned up, 4-5 remain
459        {
460            let inner = cache.inner.read();
461            let rounds = inner
462                .transaction_status
463                .keys()
464                .map(|p| p.block.round)
465                .collect::<Vec<_>>();
466            assert_eq!(rounds, vec![4, 5]);
467        }
468    }
469
470    #[tokio::test]
471    async fn test_concurrent_operations() {
472        let cache = Arc::new(ConsensusTxStatusCache::new(60));
473        let tx_pos = create_test_tx_position(1, 0);
474
475        // Spawn multiple tasks that wait for status
476        let mut handles = vec![];
477        for _ in 0..3 {
478            let cache_clone = cache.clone();
479            handles.push(tokio::spawn(async move {
480                cache_clone
481                    .notify_read_transaction_status_change(tx_pos, None)
482                    .await
483            }));
484        }
485
486        // Small delay to ensure tasks are waiting
487        tokio::time::sleep(Duration::from_millis(10)).await;
488
489        // Set the status
490        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
491
492        // Verify all notifications were received
493        for handle in handles {
494            let result = handle.await.unwrap();
495            assert!(matches!(
496                result,
497                NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
498            ));
499        }
500    }
501
502    #[tokio::test]
503    async fn test_out_of_order_status_updates() {
504        let cache = Arc::new(ConsensusTxStatusCache::new(60));
505        let tx_pos = create_test_tx_position(1, 0);
506
507        // First update status to Finalized.
508        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
509        let result = cache
510            .notify_read_transaction_status_change(tx_pos, None)
511            .await;
512        assert!(matches!(
513            result,
514            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
515        ));
516
517        let cache_clone = cache.clone();
518        let notify_read_task = tokio::spawn(async move {
519            cache_clone
520                .notify_read_transaction_status_change(tx_pos, Some(ConsensusTxStatus::Finalized))
521                .await
522        });
523
524        // We should never receive a new status update since the new status is older than the old status.
525        cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
526        let result = tokio::time::timeout(Duration::from_secs(3), notify_read_task).await;
527        assert!(result.is_err());
528        assert_eq!(
529            cache.get_transaction_status(&tx_pos),
530            Some(ConsensusTxStatus::Finalized)
531        );
532    }
533
534    #[tokio::test]
535    async fn test_fastpath_certified_tracking() {
536        let cache = Arc::new(ConsensusTxStatusCache::new(60));
537
538        // Initially, no fastpath certified transactions
539        assert_eq!(cache.get_num_fastpath_certified(), 0);
540
541        // Add fastpath certified transactions
542        let tx_pos1 = create_test_tx_position(100, 0);
543        let tx_pos2 = create_test_tx_position(100, 1);
544        let tx_pos3 = create_test_tx_position(101, 2);
545        let tx_pos4 = create_test_tx_position(102, 3);
546
547        cache.set_transaction_status(tx_pos1, ConsensusTxStatus::FastpathCertified);
548        assert_eq!(cache.get_num_fastpath_certified(), 1);
549
550        cache.set_transaction_status(tx_pos2, ConsensusTxStatus::FastpathCertified);
551        assert_eq!(cache.get_num_fastpath_certified(), 2);
552
553        cache.set_transaction_status(tx_pos3, ConsensusTxStatus::FastpathCertified);
554        assert_eq!(cache.get_num_fastpath_certified(), 3);
555
556        cache.set_transaction_status(tx_pos4, ConsensusTxStatus::FastpathCertified);
557        assert_eq!(cache.get_num_fastpath_certified(), 4);
558
559        // Add a non-fastpath certified transaction
560        let tx_pos5 = create_test_tx_position(103, 4);
561        cache.set_transaction_status(tx_pos5, ConsensusTxStatus::Finalized);
562        assert_eq!(cache.get_num_fastpath_certified(), 4);
563
564        // Transition one fastpath certified to finalized
565        cache.set_transaction_status(tx_pos1, ConsensusTxStatus::Finalized);
566        assert_eq!(cache.get_num_fastpath_certified(), 3);
567        assert_eq!(
568            cache.get_transaction_status(&tx_pos1),
569            Some(ConsensusTxStatus::Finalized)
570        );
571
572        // Transition another fastpath certified to rejected
573        cache.set_transaction_status(tx_pos2, ConsensusTxStatus::Rejected);
574        assert_eq!(cache.get_num_fastpath_certified(), 2);
575        assert_eq!(
576            cache.get_transaction_status(&tx_pos2),
577            Some(ConsensusTxStatus::Rejected)
578        );
579
580        // Test GC of fastpath certified transactions
581        // tx_pos3 is at round 101, with gc_depth=60, it will be GC'd when prev leader round >= 161
582        // tx_pos4 is at round 102, with gc_depth=60, it will be GC'd when prev leader round >= 162
583
584        // Set initial leader round which doesn't GC anything.
585        cache.update_last_committed_leader_round(160).await;
586        assert_eq!(cache.get_num_fastpath_certified(), 2);
587        assert_eq!(
588            cache.get_transaction_status(&tx_pos3),
589            Some(ConsensusTxStatus::FastpathCertified)
590        );
591        assert_eq!(
592            cache.get_transaction_status(&tx_pos4),
593            Some(ConsensusTxStatus::FastpathCertified)
594        );
595
596        // Update to 161: uses 160 for GC
597        // tx_pos3: 101 + 60 = 161, 161 <= 160 is false, so NOT GC'd yet
598        cache.update_last_committed_leader_round(161).await;
599        assert_eq!(cache.get_num_fastpath_certified(), 2);
600        assert_eq!(
601            cache.get_transaction_status(&tx_pos3),
602            Some(ConsensusTxStatus::FastpathCertified)
603        );
604        assert_eq!(
605            cache.get_transaction_status(&tx_pos4),
606            Some(ConsensusTxStatus::FastpathCertified)
607        );
608
609        // Update to 162: uses 161 for GC
610        // tx_pos3: 101 + 60 = 161, 161 <= 161 is true, so GC'd
611        // tx_pos4: 102 + 60 = 162, 162 <= 161 is false, so NOT GC'd
612        cache.update_last_committed_leader_round(162).await;
613        assert_eq!(cache.get_num_fastpath_certified(), 1);
614        assert_eq!(
615            cache.get_transaction_status(&tx_pos3),
616            Some(ConsensusTxStatus::Rejected)
617        );
618        assert_eq!(
619            cache.get_transaction_status(&tx_pos4),
620            Some(ConsensusTxStatus::FastpathCertified)
621        );
622
623        // Update to 163: uses 162 for GC
624        // tx_pos4: 102 + 60 = 162, 162 <= 162 is true, so GC'd
625        cache.update_last_committed_leader_round(163).await;
626        assert_eq!(cache.get_num_fastpath_certified(), 0);
627        assert_eq!(
628            cache.get_transaction_status(&tx_pos4),
629            Some(ConsensusTxStatus::Rejected)
630        );
631
632        // Test that setting a transaction directly to non-fastpath doesn't affect count
633        let tx_pos6 = create_test_tx_position(200, 5);
634        cache.set_transaction_status(tx_pos6, ConsensusTxStatus::Finalized);
635        assert_eq!(cache.get_num_fastpath_certified(), 0);
636
637        // Can't transition from finalized back to fastpath certified
638        cache.set_transaction_status(tx_pos6, ConsensusTxStatus::FastpathCertified);
639        assert_eq!(cache.get_num_fastpath_certified(), 0);
640        assert_eq!(
641            cache.get_transaction_status(&tx_pos6),
642            Some(ConsensusTxStatus::Finalized)
643        );
644    }
645}