sui_core/authority/
consensus_tx_status_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeMap;
5
6use consensus_types::block::Round;
7use mysten_common::sync::notify_read::NotifyRead;
8use parking_lot::RwLock;
9use sui_types::{
10    error::{SuiErrorKind, SuiResult},
11    messages_consensus::ConsensusPosition,
12};
13use tokio::sync::watch;
14use tracing::debug;
15
16/// The number of consensus rounds to retain transaction status information before garbage collection.
17/// Used to expire positions from old rounds, as well as to check if a transaction is too far ahead of the last committed round.
18/// Assuming a max round rate of 15/sec, this allows status updates to be valid within a window of ~25-30 seconds.
19pub(crate) const CONSENSUS_STATUS_RETENTION_ROUNDS: u32 = 400;
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22pub(crate) enum ConsensusTxStatus {
23    // Transaction is rejected, either by a quorum of validators or indirectly post-commit.
24    Rejected,
25    // Transaction is finalized post commit.
26    Finalized,
27    // Transaction is dropped post-consensus.
28    // This decision must be consistent across all validators.
29    //
30    // Currently, only invalid owned object inputs (using stale versions)
31    // can cause a transaction to be dropped without execution.
32    Dropped,
33}
34
35#[derive(Debug, Clone)]
36pub(crate) enum NotifyReadConsensusTxStatusResult {
37    // The consensus position to be read has been updated with a new status.
38    Status(ConsensusTxStatus),
39    // The consensus position to be read has expired.
40    // Provided with the last committed round that was used to check for expiration.
41    Expired(u32),
42}
43
44pub(crate) struct ConsensusTxStatusCache {
45    inner: RwLock<Inner>,
46
47    status_notify_read: NotifyRead<ConsensusPosition, ConsensusTxStatus>,
48    /// Watch channel for last committed leader round updates
49    last_committed_leader_round_tx: watch::Sender<Option<u32>>,
50    last_committed_leader_round_rx: watch::Receiver<Option<u32>>,
51}
52
53#[derive(Default)]
54struct Inner {
55    /// A map of transaction position to its status from consensus.
56    transaction_status: BTreeMap<ConsensusPosition, ConsensusTxStatus>,
57    /// The last leader round updated in update_last_committed_leader_round().
58    last_committed_leader_round: Option<Round>,
59}
60
61impl ConsensusTxStatusCache {
62    pub(crate) fn new(consensus_gc_depth: Round) -> Self {
63        assert!(
64            consensus_gc_depth < CONSENSUS_STATUS_RETENTION_ROUNDS,
65            "{} vs {}",
66            consensus_gc_depth,
67            CONSENSUS_STATUS_RETENTION_ROUNDS
68        );
69        let (last_committed_leader_round_tx, last_committed_leader_round_rx) = watch::channel(None);
70        Self {
71            inner: Default::default(),
72            status_notify_read: Default::default(),
73            last_committed_leader_round_tx,
74            last_committed_leader_round_rx,
75        }
76    }
77
78    /// Single-update convenience wrapper around [`Self::set_transaction_statuses`].
79    /// Production code uses the batched form; this is retained for tests.
80    #[cfg(test)]
81    pub(crate) fn set_transaction_status(&self, pos: ConsensusPosition, status: ConsensusTxStatus) {
82        self.set_transaction_statuses(vec![(pos, status)]);
83    }
84
85    /// Batched form of `set_transaction_status`: applies all updates under a
86    /// single write lock and issues the notifications after the lock is released.
87    /// The consensus commit handler uses this to replace a per-transaction lock
88    /// acquisition (and a notify held across the write lock) with one acquisition
89    /// per commit. Semantics are otherwise identical: stale updates are dropped, a
90    /// conflicting status for an already-recorded position panics, and every applied
91    /// update is notified.
92    pub(crate) fn set_transaction_statuses(
93        &self,
94        updates: Vec<(ConsensusPosition, ConsensusTxStatus)>,
95    ) {
96        // The committed leader round is constant for the duration of a commit, so
97        // read it once for the whole batch rather than per update.
98        let last_committed_leader_round = *self.last_committed_leader_round_rx.borrow();
99        let mut to_notify = Vec::with_capacity(updates.len());
100        {
101            let mut inner = self.inner.write();
102            for (pos, status) in updates {
103                if let Some(last_committed_leader_round) = last_committed_leader_round
104                    && pos.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
105                        <= last_committed_leader_round
106                {
107                    // Ignore stale status updates.
108                    continue;
109                }
110                let old_status = inner.transaction_status.insert(pos, status);
111                if let Some(old_status) = old_status
112                    && old_status != status
113                {
114                    panic!(
115                        "Conflicting status updates for transaction {:?}: {:?} -> {:?}",
116                        pos, old_status, status
117                    );
118                }
119                debug!("Transaction status is set for {}: {:?}", pos, status);
120                to_notify.push((pos, status));
121            }
122        }
123        for (pos, status) in to_notify {
124            self.status_notify_read.notify(&pos, &status);
125        }
126    }
127
128    /// Given a known previous status provided by `old_status`, this function will return a new
129    /// status once the transaction status has changed, or if the consensus position has expired.
130    pub(crate) async fn notify_read_transaction_status(
131        &self,
132        consensus_position: ConsensusPosition,
133    ) -> NotifyReadConsensusTxStatusResult {
134        let registration = self.status_notify_read.register_one(&consensus_position);
135        let mut round_rx = self.last_committed_leader_round_rx.clone();
136        {
137            let inner = self.inner.read();
138            if let Some(status) = inner.transaction_status.get(&consensus_position) {
139                return NotifyReadConsensusTxStatusResult::Status(*status);
140            }
141            // Inner read lock dropped here.
142        }
143        let expiration_check = async {
144            loop {
145                if let Some(last_committed_leader_round) = *round_rx.borrow()
146                    && consensus_position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
147                        <= last_committed_leader_round
148                {
149                    return last_committed_leader_round;
150                }
151                // Channel closed - this should never happen in practice, so panic
152                round_rx
153                    .changed()
154                    .await
155                    .expect("last_committed_leader_round watch channel closed unexpectedly");
156            }
157        };
158        tokio::select! {
159            status = registration => NotifyReadConsensusTxStatusResult::Status(status),
160            last_committed_leader_round = expiration_check => NotifyReadConsensusTxStatusResult::Expired(last_committed_leader_round),
161        }
162    }
163
164    pub(crate) fn update_last_committed_leader_round(&self, last_committed_leader_round: u32) {
165        debug!(
166            "Updating last committed leader round: {}",
167            last_committed_leader_round
168        );
169
170        let mut inner = self.inner.write();
171
172        // Consensus only bumps GC round after generating a commit. So if we expire and GC transactions
173        // based on the latest committed leader round, we may expire transactions in the current commit, or
174        // make these transactions' statuses very short lived.
175        // So we only expire and GC transactions with the previous committed leader round.
176        let Some(leader_round) = inner
177            .last_committed_leader_round
178            .replace(last_committed_leader_round)
179        else {
180            // This is the first update. Do not expire or GC any transactions.
181            return;
182        };
183
184        // Remove transactions that are expired.
185        while let Some((position, _)) = inner.transaction_status.first_key_value() {
186            if position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= leader_round {
187                inner.transaction_status.pop_first();
188            } else {
189                break;
190            }
191        }
192
193        // Send update through watch channel.
194        let _ = self.last_committed_leader_round_tx.send(Some(leader_round));
195    }
196
197    pub(crate) fn get_last_committed_leader_round(&self) -> Option<u32> {
198        *self.last_committed_leader_round_rx.borrow()
199    }
200
201    /// Returns true if the position is too far ahead of the last committed round.
202    pub(crate) fn check_position_too_ahead(&self, position: &ConsensusPosition) -> SuiResult<()> {
203        if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow()
204            && position.block.round
205                > last_committed_leader_round + CONSENSUS_STATUS_RETENTION_ROUNDS
206        {
207            return Err(SuiErrorKind::ValidatorConsensusLagging {
208                round: position.block.round,
209                last_committed_round: last_committed_leader_round,
210            }
211            .into());
212        }
213        Ok(())
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use std::{sync::Arc, time::Duration};
220
221    use super::*;
222    use consensus_types::block::{BlockRef, TransactionIndex};
223
224    fn create_test_tx_position(round: u64, index: u64) -> ConsensusPosition {
225        ConsensusPosition {
226            epoch: Default::default(),
227            block: BlockRef {
228                round: round as u32,
229                author: Default::default(),
230                digest: Default::default(),
231            },
232            index: index as TransactionIndex,
233        }
234    }
235
236    #[tokio::test]
237    async fn test_set_and_get_transaction_status() {
238        let cache = ConsensusTxStatusCache::new(60);
239        let tx_pos = create_test_tx_position(1, 0);
240
241        // Set initial status
242        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
243
244        // Read status immediately
245        let result = cache.notify_read_transaction_status(tx_pos).await;
246        assert!(matches!(
247            result,
248            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
249        ));
250    }
251
252    #[tokio::test]
253    async fn test_status_notification() {
254        let cache = Arc::new(ConsensusTxStatusCache::new(60));
255        let tx_pos = create_test_tx_position(1, 0);
256
257        // Spawn a task that waits for status update
258        let cache_clone = cache.clone();
259        let handle =
260            tokio::spawn(async move { cache_clone.notify_read_transaction_status(tx_pos).await });
261
262        // Small delay to ensure the task is waiting
263        tokio::time::sleep(Duration::from_millis(10)).await;
264
265        // Set the status
266        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
267
268        // Verify the notification was received
269        let result = handle.await.unwrap();
270        assert!(matches!(
271            result,
272            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
273        ));
274    }
275
276    #[tokio::test]
277    async fn test_round_expiration() {
278        let cache = ConsensusTxStatusCache::new(60);
279        let tx_pos = create_test_tx_position(1, 0);
280
281        // Set initial status
282        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
283
284        // Set initial leader round which doesn't GC anything.
285        cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 1);
286
287        // Update with round that will trigger GC using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 1)
288        // This will expire transactions up to and including round 1
289        cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2);
290
291        // Try to read status - should be expired
292        let result = cache.notify_read_transaction_status(tx_pos).await;
293        assert!(matches!(
294            result,
295            NotifyReadConsensusTxStatusResult::Expired(_)
296        ));
297    }
298
299    #[tokio::test]
300    async fn test_cleanup_expired_rounds() {
301        let cache = ConsensusTxStatusCache::new(60);
302
303        // Add transactions for multiple rounds
304        for round in 1..=5 {
305            let tx_pos = create_test_tx_position(round, 0);
306            cache.set_transaction_status(tx_pos, ConsensusTxStatus::Rejected);
307        }
308
309        // Set initial leader round which doesn't GC anything.
310        cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2);
311
312        // No rounds should be cleaned up yet since this was the initial update
313        {
314            let inner = cache.inner.read();
315            let rounds = inner
316                .transaction_status
317                .keys()
318                .map(|p| p.block.round)
319                .collect::<Vec<_>>();
320            assert_eq!(rounds, vec![1, 2, 3, 4, 5]);
321        }
322
323        // Update that triggers GC using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
324        // This will expire transactions up to and including round 2
325        cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 3);
326
327        // Verify rounds 1-2 are cleaned up, 3-5 remain
328        {
329            let inner = cache.inner.read();
330            let rounds = inner
331                .transaction_status
332                .keys()
333                .map(|p| p.block.round)
334                .collect::<Vec<_>>();
335            assert_eq!(rounds, vec![3, 4, 5]);
336        }
337
338        // Another update using previous round (CONSENSUS_STATUS_RETENTION_ROUNDS + 3) for GC
339        // This will expire transactions up to and including round 3
340        cache.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 4);
341
342        // Verify rounds 1-3 are cleaned up, 4-5 remain
343        {
344            let inner = cache.inner.read();
345            let rounds = inner
346                .transaction_status
347                .keys()
348                .map(|p| p.block.round)
349                .collect::<Vec<_>>();
350            assert_eq!(rounds, vec![4, 5]);
351        }
352    }
353
354    #[tokio::test]
355    async fn test_concurrent_operations() {
356        let cache = Arc::new(ConsensusTxStatusCache::new(60));
357        let tx_pos = create_test_tx_position(1, 0);
358
359        // Spawn multiple tasks that wait for status
360        let mut handles = vec![];
361        for _ in 0..3 {
362            let cache_clone = cache.clone();
363            handles.push(tokio::spawn(async move {
364                cache_clone.notify_read_transaction_status(tx_pos).await
365            }));
366        }
367
368        // Small delay to ensure tasks are waiting
369        tokio::time::sleep(Duration::from_millis(10)).await;
370
371        // Set the status
372        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
373
374        // Verify all notifications were received
375        for handle in handles {
376            let result = handle.await.unwrap();
377            assert!(matches!(
378                result,
379                NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
380            ));
381        }
382    }
383
384    #[tokio::test]
385    #[should_panic(expected = "Conflicting status updates")]
386    async fn test_out_of_order_status_updates() {
387        let cache = Arc::new(ConsensusTxStatusCache::new(60));
388        let tx_pos = create_test_tx_position(1, 0);
389
390        // First update status to Finalized.
391        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
392
393        // This should cause a panic.
394        cache.set_transaction_status(tx_pos, ConsensusTxStatus::Rejected);
395    }
396}