sui_core/authority/
finalized_transactions_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashSet;
5
6use parking_lot::RwLock;
7use sui_types::digests::TransactionDigest;
8
9/// An advisory set of recently finalized transaction digests, used by the
10/// transaction voting path to short-circuit re-voting on already-finalized
11/// transactions (`AuthorityPerEpochStore::is_recently_finalized`).
12///
13/// Workload: a single writer (the consensus commit handler, ~one insert per
14/// finalized transaction) and many concurrent readers (the voting path, one
15/// `contains` per submitted transaction across the whole RPC/network worker
16/// pool). This is the inverse of what a concurrent cache like `moka` optimizes
17/// for cheaply: `moka`'s per-insert maintenance dominated the single-threaded
18/// handler at high TPS, while its lock-free reads were not the constraint.
19///
20/// Membership is all the read path needs, so entries never need recency
21/// promotion on read — that lets readers take a shared lock and run
22/// concurrently. Eviction is generational (FIFO): inserts fill `current`; once
23/// it reaches the per-generation capacity, `current` ages into `previous` (the
24/// prior `previous` is dropped) and a fresh `current` begins. `contains` checks
25/// both generations, so the most recent `generation_capacity` digests are always
26/// retained, and at most `2 * generation_capacity` are held at once. FIFO is also
27/// a better fit than LRU here: a transaction is finalized once, so being
28/// repeatedly voted on should not extend how long it is "recently finalized".
29///
30/// The cache is advisory — `is_recently_finalized` falls back to the executed-tx
31/// check — so the eventual-consistency and bounded-window behavior here is safe.
32pub struct FinalizedTransactionsCache {
33    inner: RwLock<Generations>,
34    /// Age `current` into `previous` once it reaches this many entries.
35    generation_capacity: usize,
36}
37
38struct Generations {
39    current: HashSet<TransactionDigest>,
40    previous: HashSet<TransactionDigest>,
41}
42
43impl FinalizedTransactionsCache {
44    /// `capacity` is the target total number of digests retained; it is split
45    /// across two generations so the live set stays within `capacity`.
46    pub fn new(capacity: usize) -> Self {
47        let generation_capacity = (capacity / 2).max(1);
48        Self {
49            inner: RwLock::new(Generations {
50                current: HashSet::with_capacity(generation_capacity),
51                previous: HashSet::new(),
52            }),
53            generation_capacity,
54        }
55    }
56
57    pub fn insert(&self, tx_digest: TransactionDigest) {
58        // Hold the write lock only for the O(1) in-memory mutation. On rotation we
59        // move the aged-out generation *out* (rather than letting `inner.previous =
60        // full` drop it in place) so its deallocation happens after the block — i.e.
61        // after the write guard is released — keeping it off the readers' critical
62        // path.
63        let _aged_out = {
64            let mut inner = self.inner.write();
65            let aged_out = if inner.current.len() >= self.generation_capacity {
66                let full = std::mem::replace(
67                    &mut inner.current,
68                    HashSet::with_capacity(self.generation_capacity),
69                );
70                Some(std::mem::replace(&mut inner.previous, full))
71            } else {
72                None
73            };
74            inner.current.insert(tx_digest);
75            aged_out
76        };
77    }
78
79    pub fn contains(&self, tx_digest: &TransactionDigest) -> bool {
80        let inner = self.inner.read();
81        inner.current.contains(tx_digest) || inner.previous.contains(tx_digest)
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88
89    #[test]
90    fn insert_and_contains() {
91        let cache = FinalizedTransactionsCache::new(100);
92        let a = TransactionDigest::random();
93        let b = TransactionDigest::random();
94        assert!(!cache.contains(&a));
95        cache.insert(a);
96        assert!(cache.contains(&a));
97        assert!(!cache.contains(&b));
98    }
99
100    #[test]
101    fn retains_across_one_rotation_then_evicts() {
102        // generation_capacity = 1: each generation holds one entry, so an entry
103        // survives exactly one rotation before being dropped.
104        let cache = FinalizedTransactionsCache::new(2);
105        let first = TransactionDigest::random();
106        cache.insert(first); // current = {first}
107
108        let second = TransactionDigest::random();
109        cache.insert(second); // rotate: previous = {first}, current = {second}
110        assert!(cache.contains(&first), "survives one rotation");
111        assert!(cache.contains(&second));
112
113        let third = TransactionDigest::random();
114        cache.insert(third); // rotate: previous = {second}, current = {third}
115        assert!(!cache.contains(&first), "evicted after second rotation");
116        assert!(cache.contains(&second));
117        assert!(cache.contains(&third));
118    }
119
120    #[test]
121    fn rotation_boundary_with_larger_generation() {
122        // capacity = 100 => generation_capacity = 50; exercises the >= boundary
123        // for G > 1 (the single-rotation test above only covers G = 1).
124        let cache = FinalizedTransactionsCache::new(100);
125        let batch_a: Vec<_> = (0..50).map(|_| TransactionDigest::random()).collect();
126        let batch_b: Vec<_> = (0..50).map(|_| TransactionDigest::random()).collect();
127
128        // Fill `current` exactly to capacity; no rotation yet, all of A present.
129        for d in &batch_a {
130            cache.insert(*d);
131        }
132        assert!(batch_a.iter().all(|d| cache.contains(d)));
133
134        // The first B insert rotates A into `previous`, where it survives the full
135        // generation; afterwards both batches are retained (2 * generation_capacity).
136        for d in &batch_b {
137            cache.insert(*d);
138        }
139        assert!(
140            batch_a.iter().all(|d| cache.contains(d)),
141            "A survives one rotation"
142        );
143        assert!(batch_b.iter().all(|d| cache.contains(d)));
144
145        // One more insert rotates again and evicts A; B and the new entry remain.
146        let extra = TransactionDigest::random();
147        cache.insert(extra);
148        assert!(
149            batch_a.iter().all(|d| !cache.contains(d)),
150            "A evicted on next rotation"
151        );
152        assert!(batch_b.iter().all(|d| cache.contains(d)));
153        assert!(cache.contains(&extra));
154    }
155}