sui_core/authority/
consensus_tx_status_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeMap;
5
6use consensus_types::block::Round;
7use mysten_common::sync::notify_read::NotifyRead;
8use parking_lot::RwLock;
9use sui_types::{
10    error::{SuiErrorKind, SuiResult},
11    messages_consensus::ConsensusPosition,
12};
13use tokio::sync::watch;
14use tracing::debug;
15
16/// The number of consensus rounds to retain transaction status information before garbage collection.
17/// Used to expire positions from old rounds, as well as to check if a transaction is too far ahead of the last committed round.
18/// Assuming a max round rate of 15/sec, this allows status updates to be valid within a window of ~25-30 seconds.
19pub(crate) const CONSENSUS_STATUS_RETENTION_ROUNDS: u32 = 400;
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22pub(crate) enum ConsensusTxStatus {
23    // Transaction is rejected, either by a quorum of validators or indirectly post-commit.
24    Rejected,
25    // Transaction is finalized post commit.
26    Finalized,
27    // Transaction is dropped post-consensus.
28    // This decision must be consistent across all validators.
29    //
30    // Currently, only invalid owned object inputs (using stale versions)
31    // can cause a transaction to be dropped without execution.
32    Dropped,
33}
34
35#[derive(Debug, Clone)]
36pub(crate) enum NotifyReadConsensusTxStatusResult {
37    // The consensus position to be read has been updated with a new status.
38    Status(ConsensusTxStatus),
39    // The consensus position to be read has expired.
40    // Provided with the last committed round that was used to check for expiration.
41    Expired(u32),
42}
43
44pub(crate) struct ConsensusTxStatusCache {
45    inner: RwLock<Inner>,
46
47    status_notify_read: NotifyRead<ConsensusPosition, ConsensusTxStatus>,
48    /// Watch channel for last committed leader round updates
49    last_committed_leader_round_tx: watch::Sender<Option<u32>>,
50    last_committed_leader_round_rx: watch::Receiver<Option<u32>>,
51}
52
53#[derive(Default)]
54struct Inner {
55    /// A map of transaction position to its status from consensus.
56    transaction_status: BTreeMap<ConsensusPosition, ConsensusTxStatus>,
57    /// The last leader round updated in update_last_committed_leader_round().
58    last_committed_leader_round: Option<Round>,
59}
60
61impl ConsensusTxStatusCache {
62    pub(crate) fn new(consensus_gc_depth: Round) -> Self {
63        assert!(
64            consensus_gc_depth < CONSENSUS_STATUS_RETENTION_ROUNDS,
65            "{} vs {}",
66            consensus_gc_depth,
67            CONSENSUS_STATUS_RETENTION_ROUNDS
68        );
69        let (last_committed_leader_round_tx, last_committed_leader_round_rx) = watch::channel(None);
70        Self {
71            inner: Default::default(),
72            status_notify_read: Default::default(),
73            last_committed_leader_round_tx,
74            last_committed_leader_round_rx,
75        }
76    }
77
78    pub(crate) fn set_transaction_status(&self, pos: ConsensusPosition, status: ConsensusTxStatus) {
79        if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
80            && pos.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= last_committed_leader_round
81        {
82            // Ignore stale status updates.
83            return;
84        }
85
86        let mut inner = self.inner.write();
87        let old_status = inner.transaction_status.insert(pos, status);
88        if let Some(old_status) = old_status
89            && old_status != status
90        {
91            panic!(
92                "Conflicting status updates for transaction {:?}: {:?} -> {:?}",
93                pos, old_status, status
94            );
95        }
96
97        // All code paths leading to here should have set the status.
98        debug!("Transaction status is set for {}: {:?}", pos, status);
99        self.status_notify_read.notify(&pos, &status);
100    }
101
102    /// Given a known previous status provided by `old_status`, this function will return a new
103    /// status once the transaction status has changed, or if the consensus position has expired.
104    pub(crate) async fn notify_read_transaction_status(
105        &self,
106        consensus_position: ConsensusPosition,
107    ) -> NotifyReadConsensusTxStatusResult {
108        let registration = self.status_notify_read.register_one(&consensus_position);
109        let mut round_rx = self.last_committed_leader_round_rx.clone();
110        {
111            let inner = self.inner.read();
112            if let Some(status) = inner.transaction_status.get(&consensus_position) {
113                return NotifyReadConsensusTxStatusResult::Status(*status);
114            }
115            // Inner read lock dropped here.
116        }
117        let expiration_check = async {
118            loop {
119                if let Some(last_committed_leader_round) = *round_rx.borrow()
120                    && consensus_position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
121                        <= last_committed_leader_round
122                {
123                    return last_committed_leader_round;
124                }
125                // Channel closed - this should never happen in practice, so panic
126                round_rx
127                    .changed()
128                    .await
129                    .expect("last_committed_leader_round watch channel closed unexpectedly");
130            }
131        };
132        tokio::select! {
133            status = registration => NotifyReadConsensusTxStatusResult::Status(status),
134            last_committed_leader_round = expiration_check => NotifyReadConsensusTxStatusResult::Expired(last_committed_leader_round),
135        }
136    }
137
138    pub(crate) fn update_last_committed_leader_round(&self, last_committed_leader_round: u32) {
139        debug!(
140            "Updating last committed leader round: {}",
141            last_committed_leader_round
142        );
143
144        let mut inner = self.inner.write();
145
146        // Consensus only bumps GC round after generating a commit. So if we expire and GC transactions
147        // based on the latest committed leader round, we may expire transactions in the current commit, or
148        // make these transactions' statuses very short lived.
149        // So we only expire and GC transactions with the previous committed leader round.
150        let Some(leader_round) = inner
151            .last_committed_leader_round
152            .replace(last_committed_leader_round)
153        else {
154            // This is the first update. Do not expire or GC any transactions.
155            return;
156        };
157
158        // Remove transactions that are expired.
159        while let Some((position, _)) = inner.transaction_status.first_key_value() {
160            if position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= leader_round {
161                inner.transaction_status.pop_first();
162            } else {
163                break;
164            }
165        }
166
167        // Send update through watch channel.
168        let _ = self.last_committed_leader_round_tx.send(Some(leader_round));
169    }
170
171    pub(crate) fn get_last_committed_leader_round(&self) -> Option<u32> {
172        *self.last_committed_leader_round_rx.borrow()
173    }
174
175    /// Returns true if the position is too far ahead of the last committed round.
176    pub(crate) fn check_position_too_ahead(&self, position: &ConsensusPosition) -> SuiResult<()> {
177        if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
178            && position.block.round
179                > last_committed_leader_round + CONSENSUS_STATUS_RETENTION_ROUNDS
180        {
181            return Err(SuiErrorKind::ValidatorConsensusLagging {
182                round: position.block.round,
183                last_committed_round: last_committed_leader_round,
184            }
185            .into());
186        }
187        Ok(())
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use std::{sync::Arc, time::Duration};
194
195    use super::*;
196    use consensus_types::block::{BlockRef, TransactionIndex};
197
198    fn create_test_tx_position(round: u64, index: u64) -> ConsensusPosition {
199        ConsensusPosition {
200            epoch: Default::default(),
201            block: BlockRef {
202                round: round as u32,
203                author: Default::default(),
204                digest: Default::default(),
205            },
206            index: index as TransactionIndex,
207        }
208    }
209
210    #[tokio::test]
211    async fn test_set_and_get_transaction_status() {
212        let cache = ConsensusTxStatusCache::new(60);
213        let tx_pos = create_test_tx_position(1, 0);
214
215        // Set initial status
216        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
217
218        // Read status immediately
219        let result = cache.notify_read_transaction_status(tx_pos).await;
220        assert!(matches!(
221            result,
222            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
223        ));
224    }
225
226    #[tokio::test]
227    async fn test_status_notification() {
228        let cache = Arc::new(ConsensusTxStatusCache::new(60));
229        let tx_pos = create_test_tx_position(1, 0);
230
231        // Spawn a task that waits for status update
232        let cache_clone = cache.clone();
233        let handle =
234            tokio::spawn(async move { cache_clone.notify_read_transaction_status(tx_pos).await });
235
236        // Small delay to ensure the task is waiting
237        tokio::time::sleep(Duration::from_millis(10)).await;
238
239        // Set the status
240        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
241
242        // Verify the notification was received
243        let result = handle.await.unwrap();
244        assert!(matches!(
245            result,
246            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
247        ));
248    }
249
250    #[tokio::test]
251    async fn test_round_expiration() {
252        let cache = ConsensusTxStatusCache::new(60);
253        let tx_pos = create_test_tx_position(1, 0);
254
255        // Set initial status
256        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
257
258        // Set initial leader round which doesn't GC anything.
259        cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 1);
260
261        // Update with round that will trigger GC using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 1)
262        // This will expire transactions up to and including round 1
263        cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2);
264
265        // Try to read status - should be expired
266        let result = cache.notify_read_transaction_status(tx_pos).await;
267        assert!(matches!(
268            result,
269            NotifyReadConsensusTxStatusResult::Expired(_)
270        ));
271    }
272
273    #[tokio::test]
274    async fn test_cleanup_expired_rounds() {
275        let cache = ConsensusTxStatusCache::new(60);
276
277        // Add transactions for multiple rounds
278        for round in 1..=5 {
279            let tx_pos = create_test_tx_position(round, 0);
280            cache.set_transaction_status(tx_pos, ConsensusTxStatus::Rejected);
281        }
282
283        // Set initial leader round which doesn't GC anything.
284        cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2);
285
286        // No rounds should be cleaned up yet since this was the initial update
287        {
288            let inner = cache.inner.read();
289            let rounds = inner
290                .transaction_status
291                .keys()
292                .map(|p| p.block.round)
293                .collect::<Vec<_>>();
294            assert_eq!(rounds, vec![1, 2, 3, 4, 5]);
295        }
296
297        // Update that triggers GC using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
298        // This will expire transactions up to and including round 2
299        cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 3);
300
301        // Verify rounds 1-2 are cleaned up, 3-5 remain
302        {
303            let inner = cache.inner.read();
304            let rounds = inner
305                .transaction_status
306                .keys()
307                .map(|p| p.block.round)
308                .collect::<Vec<_>>();
309            assert_eq!(rounds, vec![3, 4, 5]);
310        }
311
312        // Another update using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 3) for GC
313        // This will expire transactions up to and including round 3
314        cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 4);
315
316        // Verify rounds 1-3 are cleaned up, 4-5 remain
317        {
318            let inner = cache.inner.read();
319            let rounds = inner
320                .transaction_status
321                .keys()
322                .map(|p| p.block.round)
323                .collect::<Vec<_>>();
324            assert_eq!(rounds, vec![4, 5]);
325        }
326    }
327
328    #[tokio::test]
329    async fn test_concurrent_operations() {
330        let cache = Arc::new(ConsensusTxStatusCache::new(60));
331        let tx_pos = create_test_tx_position(1, 0);
332
333        // Spawn multiple tasks that wait for status
334        let mut handles = vec![];
335        for _ in 0..3 {
336            let cache_clone = cache.clone();
337            handles.push(tokio::spawn(async move {
338                cache_clone.notify_read_transaction_status(tx_pos).await
339            }));
340        }
341
342        // Small delay to ensure tasks are waiting
343        tokio::time::sleep(Duration::from_millis(10)).await;
344
345        // Set the status
346        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
347
348        // Verify all notifications were received
349        for handle in handles {
350            let result = handle.await.unwrap();
351            assert!(matches!(
352                result,
353                NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
354            ));
355        }
356    }
357
358    #[tokio::test]
359    #[should_panic(expected = "Conflicting status updates")]
360    async fn test_out_of_order_status_updates() {
361        let cache = Arc::new(ConsensusTxStatusCache::new(60));
362        let tx_pos = create_test_tx_position(1, 0);
363
364        // First update status to Finalized.
365        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
366
367        // This should cause a panic.
368        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Rejected);
369    }
370}