sui_core/authority/
submitted_transaction_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use lru::LruCache;
5use parking_lot::RwLock;
6use prometheus::{
7    Histogram, IntCounter, IntGauge, Registry, register_histogram_with_registry,
8    register_int_counter_with_registry, register_int_gauge_with_registry,
9};
10use std::collections::BTreeSet;
11use std::net::IpAddr;
12use std::num::NonZeroUsize;
13use std::sync::Arc;
14use sui_types::digests::TransactionDigest;
15use sui_types::traffic_control::Weight;
16use tracing::debug;
17
18pub(crate) const DEFAULT_CACHE_CAPACITY: usize = 100_000;
19
20pub struct SubmittedTransactionCacheMetrics {
21    pub transactions_tracked: IntGauge,
22    pub spam_detected: IntCounter,
23    pub submission_count_exceeded: Histogram,
24    pub amplification_factor_distribution: Histogram,
25}
26
27impl SubmittedTransactionCacheMetrics {
28    pub fn new(registry: &Registry) -> Self {
29        Self {
30            transactions_tracked: register_int_gauge_with_registry!(
31                "submitted_transaction_cache_transactions_tracked",
32                "Number of transactions currently tracked in the submission cache",
33                registry,
34            )
35            .unwrap(),
36            spam_detected: register_int_counter_with_registry!(
37                "submitted_transaction_cache_spam_detected",
38                "Number of transactions that exceeded submission limits",
39                registry,
40            )
41            .unwrap(),
42            submission_count_exceeded: register_histogram_with_registry!(
43                "submitted_transaction_cache_submission_count_exceeded",
44                "Distribution of submission counts when spam is detected",
45                vec![
46                    1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0,
47                    10000.0,
48                ],
49                registry,
50            )
51            .unwrap(),
52            amplification_factor_distribution: register_histogram_with_registry!(
53                "submitted_transaction_cache_amplification_factor_distribution",
54                "Distribution of amplification factors used for transaction submissions",
55                vec![
56                    1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0,
57                    10000.0,
58                ],
59                registry,
60            )
61            .unwrap(),
62        }
63    }
64
65    #[cfg(test)]
66    pub(crate) fn new_test() -> Self {
67        Self::new(&Registry::new())
68    }
69}
70
71/// Cache for tracking submitted transactions to prevent DoS through excessive resubmissions.
72/// Uses LRU eviction to automatically remove least recently used entries when at capacity.
73/// Tracks submission counts and enforces gas-price-based amplification limits.
74pub(crate) struct SubmittedTransactionCache {
75    inner: RwLock<Inner>,
76    metrics: Arc<SubmittedTransactionCacheMetrics>,
77}
78
79struct Inner {
80    transactions: LruCache<TransactionDigest, SubmissionMetadata>,
81}
82
83#[derive(Debug, Clone)]
84struct SubmissionMetadata {
85    /// Number of times this transaction has been submitted
86    submission_count: u32,
87    /// Maximum allowed submissions based on gas price amplification
88    max_allowed_submissions: u32,
89    /// Set of client IP addresses that have submitted this transaction
90    submitter_client_addrs: BTreeSet<IpAddr>,
91}
92
93impl SubmittedTransactionCache {
94    pub(crate) fn new(
95        cache_capacity: Option<usize>,
96        metrics: Arc<SubmittedTransactionCacheMetrics>,
97    ) -> Self {
98        let capacity = cache_capacity
99            .and_then(NonZeroUsize::new)
100            .unwrap_or_else(|| NonZeroUsize::new(DEFAULT_CACHE_CAPACITY).unwrap());
101
102        Self {
103            inner: RwLock::new(Inner {
104                transactions: LruCache::new(capacity),
105            }),
106            metrics,
107        }
108    }
109
110    pub(crate) fn metrics(&self) -> Arc<SubmittedTransactionCacheMetrics> {
111        self.metrics.clone()
112    }
113
114    pub(crate) fn record_submitted_tx(
115        &self,
116        digest: &TransactionDigest,
117        amplification_factor: u32,
118        submitter_client_addr: Option<IpAddr>,
119    ) {
120        let mut inner = self.inner.write();
121
122        let max_allowed_submissions = amplification_factor;
123
124        if let Some(metadata) = inner.transactions.get_mut(digest) {
125            // Track additional client addresses for resubmissions
126            if let Some(addr) = submitter_client_addr
127                && metadata.submitter_client_addrs.insert(addr)
128            {
129                debug!("Added new client address {addr} for transaction {digest}");
130            }
131            debug!("Transaction {digest} already tracked in submission cache");
132        } else {
133            // First time we're submitting this transaction, however we will wait till
134            // we see the transaction in consensus output to increment the submission count.
135            let submitter_client_addrs = submitter_client_addr.into_iter().collect();
136            let metadata = SubmissionMetadata {
137                submission_count: 0,
138                max_allowed_submissions,
139                submitter_client_addrs,
140            };
141
142            inner.transactions.put(*digest, metadata);
143
144            self.metrics
145                .transactions_tracked
146                .set(inner.transactions.len() as i64);
147            self.metrics
148                .amplification_factor_distribution
149                .observe(amplification_factor as f64);
150
151            debug!(
152                "First submission of transaction {digest} (max_allowed: {max_allowed_submissions})",
153            );
154        }
155    }
156
157    /// Increments the submission count when we see a transaction in consensus output.
158    /// This tracks how many times the transaction has appeared in consensus (from any validator).
159    /// Returns the spam weight and set of submitter client addresses if the transaction exceeds allowed submissions.
160    pub(crate) fn increment_submission_count(
161        &self,
162        digest: &TransactionDigest,
163    ) -> Option<(Weight, BTreeSet<IpAddr>)> {
164        let mut inner = self.inner.write();
165
166        if let Some(metadata) = inner.transactions.get_mut(digest) {
167            metadata.submission_count += 1;
168
169            if metadata.submission_count > metadata.max_allowed_submissions {
170                let spam_weight = Weight::one();
171                self.metrics.spam_detected.inc();
172                self.metrics
173                    .submission_count_exceeded
174                    .observe(metadata.submission_count as f64);
175
176                debug!(
177                    "Transaction {} seen in consensus {} times, exceeds limit {} (spam_weight: {:?})",
178                    digest,
179                    metadata.submission_count,
180                    metadata.max_allowed_submissions,
181                    spam_weight
182                );
183
184                return Some((spam_weight, metadata.submitter_client_addrs.clone()));
185            }
186        }
187        // If we don't know about this transaction, it was submitted by another validator
188        // We don't track spam weight for transactions we didn't submit
189        None
190    }
191
192    #[cfg(test)]
193    pub(crate) fn contains(&self, digest: &TransactionDigest) -> bool {
194        self.inner.read().transactions.contains(digest)
195    }
196
197    #[cfg(test)]
198    pub(crate) fn get_submission_count(&self, digest: &TransactionDigest) -> Option<u32> {
199        self.inner
200            .read()
201            .transactions
202            .peek(digest)
203            .map(|m| m.submission_count)
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use std::net::{IpAddr, Ipv4Addr};
211
212    fn create_test_digest(val: u8) -> TransactionDigest {
213        let mut bytes = [0u8; 32];
214        bytes[0] = val;
215        TransactionDigest::new(bytes)
216    }
217
218    #[test]
219    fn test_first_submission_allowed() {
220        let cache = SubmittedTransactionCache::new(
221            None,
222            Arc::new(SubmittedTransactionCacheMetrics::new_test()),
223        );
224        let digest = create_test_digest(1);
225
226        cache.record_submitted_tx(&digest, 1, None);
227        assert!(cache.contains(&digest));
228        assert_eq!(cache.get_submission_count(&digest), Some(0));
229
230        let spam_weight = cache.increment_submission_count(&digest);
231        assert_eq!(spam_weight, None);
232        assert_eq!(cache.get_submission_count(&digest), Some(1));
233    }
234
235    #[test]
236    fn test_amplification_factor() {
237        let cache = SubmittedTransactionCache::new(
238            None,
239            Arc::new(SubmittedTransactionCacheMetrics::new_test()),
240        );
241        let digest = create_test_digest(1);
242
243        // Record with amplification_factor=5, should allow 5 submissions
244        cache.record_submitted_tx(&digest, 5, None);
245
246        // Should allow 5 submissions
247        for i in 0..5 {
248            let spam_weight = cache.increment_submission_count(&digest);
249            assert_eq!(spam_weight, None, "Submission {} should be allowed", i + 1);
250        }
251
252        // 6th submission should trigger spam weight
253        let spam_weight = cache.increment_submission_count(&digest);
254        assert_eq!(spam_weight.map(|(w, _)| w), Some(Weight::one()));
255
256        // Additional submissions should also trigger spam weight
257        for i in 6..10 {
258            let spam_weight = cache.increment_submission_count(&digest);
259            assert_eq!(
260                spam_weight.map(|(w, _)| w),
261                Some(Weight::one()),
262                "Submission {} should trigger spam weight",
263                i + 1
264            );
265        }
266    }
267
268    #[test]
269    fn test_lru_eviction() {
270        // Create a cache with capacity for only 3 transactions
271        let cache = SubmittedTransactionCache::new(
272            Some(3),
273            Arc::new(SubmittedTransactionCacheMetrics::new_test()),
274        );
275
276        // Add 3 transactions
277        for i in 1..=3 {
278            let digest = create_test_digest(i);
279            cache.record_submitted_tx(&digest, 1, None);
280        }
281
282        // Verify all 3 transactions are in cache
283        for i in 1..=3 {
284            let digest = create_test_digest(i);
285            assert!(cache.contains(&digest));
286        }
287
288        // Add a 4th transaction, which should evict the least recently used (digest 1)
289        let digest4 = create_test_digest(4);
290        cache.record_submitted_tx(&digest4, 1, None);
291
292        // Transaction 1 should be evicted (least recently used)
293        assert!(!cache.contains(&create_test_digest(1)));
294        // Transactions 2, 3, and 4 should still be in cache
295        assert!(cache.contains(&create_test_digest(2)));
296        assert!(cache.contains(&create_test_digest(3)));
297        assert!(cache.contains(&digest4));
298    }
299
300    #[test]
301    fn test_lru_access_updates_position() {
302        // Create a cache with capacity for only 3 transactions
303        let cache = SubmittedTransactionCache::new(
304            Some(3),
305            Arc::new(SubmittedTransactionCacheMetrics::new_test()),
306        );
307
308        // Add 3 transactions
309        for i in 1..=3 {
310            let digest = create_test_digest(i);
311            cache.record_submitted_tx(&digest, 1, None);
312        }
313
314        // Access transaction 1 (moves it to front of LRU)
315        let digest1 = create_test_digest(1);
316        cache.increment_submission_count(&digest1);
317
318        // Add a 4th transaction, which should now evict transaction 2 (now least recently used)
319        let digest4 = create_test_digest(4);
320        cache.record_submitted_tx(&digest4, 1, None);
321
322        // Transaction 2 should be evicted
323        assert!(!cache.contains(&create_test_digest(2)));
324        // Transactions 1, 3, and 4 should still be in cache
325        assert!(cache.contains(&digest1));
326        assert!(cache.contains(&create_test_digest(3)));
327        assert!(cache.contains(&digest4));
328    }
329
330    #[test]
331    fn test_multiple_client_addresses() {
332        let cache = SubmittedTransactionCache::new(
333            None,
334            Arc::new(SubmittedTransactionCacheMetrics::new_test()),
335        );
336        let digest = create_test_digest(1);
337        let addr1 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
338        let addr2 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2));
339        let addr3 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3));
340
341        // First submission with addr1
342        cache.record_submitted_tx(&digest, 2, Some(addr1));
343
344        // Resubmission with addr2 - should track both addresses
345        cache.record_submitted_tx(&digest, 2, Some(addr2));
346
347        // Resubmission with addr1 again - should not duplicate
348        cache.record_submitted_tx(&digest, 2, Some(addr1));
349
350        // Resubmission with addr3 - should track all three
351        cache.record_submitted_tx(&digest, 2, Some(addr3));
352
353        // Increment submission count twice to exceed limit
354        cache.increment_submission_count(&digest);
355        cache.increment_submission_count(&digest);
356
357        // Third submission should trigger spam weight for all addresses
358        let result = cache.increment_submission_count(&digest);
359        assert!(result.is_some());
360
361        let (spam_weight, addrs) = result.unwrap();
362        assert_eq!(spam_weight, Weight::one());
363        assert_eq!(addrs.len(), 3);
364        assert!(addrs.contains(&addr1));
365        assert!(addrs.contains(&addr2));
366        assert!(addrs.contains(&addr3));
367    }
368
369    #[test]
370    fn test_retry_tracking() {
371        // Create a cache with capacity for only 3 transactions
372        let cache = SubmittedTransactionCache::new(
373            Some(3),
374            Arc::new(SubmittedTransactionCacheMetrics::new_test()),
375        );
376        let digest1 = create_test_digest(1);
377        let digest2 = create_test_digest(2);
378        let digest3 = create_test_digest(3);
379        let digest4 = create_test_digest(4);
380
381        // Add 3 transactions
382        cache.record_submitted_tx(&digest1, 1, None);
383        cache.record_submitted_tx(&digest2, 1, None);
384        cache.record_submitted_tx(&digest3, 1, None);
385
386        // Verify all 3 transactions are in cache
387        assert!(cache.contains(&digest1));
388        assert!(cache.contains(&digest2));
389        assert!(cache.contains(&digest3));
390
391        // Retry digest1 - this should move it to the front of LRU
392        cache.record_submitted_tx(&digest1, 1, None);
393
394        // Add a 4th transaction, which should evict the least recently used (digest2)
395        cache.record_submitted_tx(&digest4, 1, None);
396
397        // digest1 should still be in cache (moved to front by retry)
398        assert!(cache.contains(&digest1));
399        // digest2 should be evicted (was least recently used)
400        assert!(!cache.contains(&digest2));
401        // digest3 and digest4 should still be in cache
402        assert!(cache.contains(&digest3));
403        assert!(cache.contains(&digest4));
404    }
405}