1use lru::LruCache;
5use parking_lot::RwLock;
6use prometheus::{
7 Histogram, IntCounter, IntGauge, Registry, register_histogram_with_registry,
8 register_int_counter_with_registry, register_int_gauge_with_registry,
9};
10use std::collections::BTreeSet;
11use std::net::IpAddr;
12use std::num::NonZeroUsize;
13use std::sync::Arc;
14use sui_types::digests::TransactionDigest;
15use sui_types::traffic_control::Weight;
16use tracing::debug;
17
18pub(crate) const DEFAULT_CACHE_CAPACITY: usize = 100_000;
19
20pub struct SubmittedTransactionCacheMetrics {
21 pub transactions_tracked: IntGauge,
22 pub spam_detected: IntCounter,
23 pub submission_count_exceeded: Histogram,
24 pub amplification_factor_distribution: Histogram,
25}
26
27impl SubmittedTransactionCacheMetrics {
28 pub fn new(registry: &Registry) -> Self {
29 Self {
30 transactions_tracked: register_int_gauge_with_registry!(
31 "submitted_transaction_cache_transactions_tracked",
32 "Number of transactions currently tracked in the submission cache",
33 registry,
34 )
35 .unwrap(),
36 spam_detected: register_int_counter_with_registry!(
37 "submitted_transaction_cache_spam_detected",
38 "Number of transactions that exceeded submission limits",
39 registry,
40 )
41 .unwrap(),
42 submission_count_exceeded: register_histogram_with_registry!(
43 "submitted_transaction_cache_submission_count_exceeded",
44 "Distribution of submission counts when spam is detected",
45 vec![
46 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0,
47 10000.0,
48 ],
49 registry,
50 )
51 .unwrap(),
52 amplification_factor_distribution: register_histogram_with_registry!(
53 "submitted_transaction_cache_amplification_factor_distribution",
54 "Distribution of amplification factors used for transaction submissions",
55 vec![
56 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0,
57 10000.0,
58 ],
59 registry,
60 )
61 .unwrap(),
62 }
63 }
64
65 #[cfg(test)]
66 pub(crate) fn new_test() -> Self {
67 Self::new(&Registry::new())
68 }
69}
70
71pub(crate) struct SubmittedTransactionCache {
75 inner: RwLock<Inner>,
76 metrics: Arc<SubmittedTransactionCacheMetrics>,
77}
78
79struct Inner {
80 transactions: LruCache<TransactionDigest, SubmissionMetadata>,
81}
82
83#[derive(Debug, Clone)]
84struct SubmissionMetadata {
85 submission_count: u32,
87 max_allowed_submissions: u32,
89 submitter_client_addrs: BTreeSet<IpAddr>,
91}
92
93impl SubmittedTransactionCache {
94 pub(crate) fn new(
95 cache_capacity: Option<usize>,
96 metrics: Arc<SubmittedTransactionCacheMetrics>,
97 ) -> Self {
98 let capacity = cache_capacity
99 .and_then(NonZeroUsize::new)
100 .unwrap_or_else(|| NonZeroUsize::new(DEFAULT_CACHE_CAPACITY).unwrap());
101
102 Self {
103 inner: RwLock::new(Inner {
104 transactions: LruCache::new(capacity),
105 }),
106 metrics,
107 }
108 }
109
110 pub(crate) fn metrics(&self) -> Arc<SubmittedTransactionCacheMetrics> {
111 self.metrics.clone()
112 }
113
114 pub(crate) fn record_submitted_tx(
115 &self,
116 digest: &TransactionDigest,
117 amplification_factor: u32,
118 submitter_client_addr: Option<IpAddr>,
119 ) {
120 let mut inner = self.inner.write();
121
122 let max_allowed_submissions = amplification_factor;
123
124 if let Some(metadata) = inner.transactions.get_mut(digest) {
125 if let Some(addr) = submitter_client_addr
127 && metadata.submitter_client_addrs.insert(addr)
128 {
129 debug!("Added new client address {addr} for transaction {digest}");
130 }
131 debug!("Transaction {digest} already tracked in submission cache");
132 } else {
133 let submitter_client_addrs = submitter_client_addr.into_iter().collect();
136 let metadata = SubmissionMetadata {
137 submission_count: 0,
138 max_allowed_submissions,
139 submitter_client_addrs,
140 };
141
142 inner.transactions.put(*digest, metadata);
143
144 self.metrics
145 .transactions_tracked
146 .set(inner.transactions.len() as i64);
147 self.metrics
148 .amplification_factor_distribution
149 .observe(amplification_factor as f64);
150
151 debug!(
152 "First submission of transaction {digest} (max_allowed: {max_allowed_submissions})",
153 );
154 }
155 }
156
157 pub(crate) fn increment_submission_count(
161 &self,
162 digest: &TransactionDigest,
163 ) -> Option<(Weight, BTreeSet<IpAddr>)> {
164 let mut inner = self.inner.write();
165
166 if let Some(metadata) = inner.transactions.get_mut(digest) {
167 metadata.submission_count += 1;
168
169 if metadata.submission_count > metadata.max_allowed_submissions {
170 let spam_weight = Weight::one();
171 self.metrics.spam_detected.inc();
172 self.metrics
173 .submission_count_exceeded
174 .observe(metadata.submission_count as f64);
175
176 debug!(
177 "Transaction {} seen in consensus {} times, exceeds limit {} (spam_weight: {:?})",
178 digest,
179 metadata.submission_count,
180 metadata.max_allowed_submissions,
181 spam_weight
182 );
183
184 return Some((spam_weight, metadata.submitter_client_addrs.clone()));
185 }
186 }
187 None
190 }
191
192 #[cfg(test)]
193 pub(crate) fn contains(&self, digest: &TransactionDigest) -> bool {
194 self.inner.read().transactions.contains(digest)
195 }
196
197 #[cfg(test)]
198 pub(crate) fn get_submission_count(&self, digest: &TransactionDigest) -> Option<u32> {
199 self.inner
200 .read()
201 .transactions
202 .peek(digest)
203 .map(|m| m.submission_count)
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use std::net::{IpAddr, Ipv4Addr};
211
212 fn create_test_digest(val: u8) -> TransactionDigest {
213 let mut bytes = [0u8; 32];
214 bytes[0] = val;
215 TransactionDigest::new(bytes)
216 }
217
218 #[test]
219 fn test_first_submission_allowed() {
220 let cache = SubmittedTransactionCache::new(
221 None,
222 Arc::new(SubmittedTransactionCacheMetrics::new_test()),
223 );
224 let digest = create_test_digest(1);
225
226 cache.record_submitted_tx(&digest, 1, None);
227 assert!(cache.contains(&digest));
228 assert_eq!(cache.get_submission_count(&digest), Some(0));
229
230 let spam_weight = cache.increment_submission_count(&digest);
231 assert_eq!(spam_weight, None);
232 assert_eq!(cache.get_submission_count(&digest), Some(1));
233 }
234
235 #[test]
236 fn test_amplification_factor() {
237 let cache = SubmittedTransactionCache::new(
238 None,
239 Arc::new(SubmittedTransactionCacheMetrics::new_test()),
240 );
241 let digest = create_test_digest(1);
242
243 cache.record_submitted_tx(&digest, 5, None);
245
246 for i in 0..5 {
248 let spam_weight = cache.increment_submission_count(&digest);
249 assert_eq!(spam_weight, None, "Submission {} should be allowed", i + 1);
250 }
251
252 let spam_weight = cache.increment_submission_count(&digest);
254 assert_eq!(spam_weight.map(|(w, _)| w), Some(Weight::one()));
255
256 for i in 6..10 {
258 let spam_weight = cache.increment_submission_count(&digest);
259 assert_eq!(
260 spam_weight.map(|(w, _)| w),
261 Some(Weight::one()),
262 "Submission {} should trigger spam weight",
263 i + 1
264 );
265 }
266 }
267
268 #[test]
269 fn test_lru_eviction() {
270 let cache = SubmittedTransactionCache::new(
272 Some(3),
273 Arc::new(SubmittedTransactionCacheMetrics::new_test()),
274 );
275
276 for i in 1..=3 {
278 let digest = create_test_digest(i);
279 cache.record_submitted_tx(&digest, 1, None);
280 }
281
282 for i in 1..=3 {
284 let digest = create_test_digest(i);
285 assert!(cache.contains(&digest));
286 }
287
288 let digest4 = create_test_digest(4);
290 cache.record_submitted_tx(&digest4, 1, None);
291
292 assert!(!cache.contains(&create_test_digest(1)));
294 assert!(cache.contains(&create_test_digest(2)));
296 assert!(cache.contains(&create_test_digest(3)));
297 assert!(cache.contains(&digest4));
298 }
299
300 #[test]
301 fn test_lru_access_updates_position() {
302 let cache = SubmittedTransactionCache::new(
304 Some(3),
305 Arc::new(SubmittedTransactionCacheMetrics::new_test()),
306 );
307
308 for i in 1..=3 {
310 let digest = create_test_digest(i);
311 cache.record_submitted_tx(&digest, 1, None);
312 }
313
314 let digest1 = create_test_digest(1);
316 cache.increment_submission_count(&digest1);
317
318 let digest4 = create_test_digest(4);
320 cache.record_submitted_tx(&digest4, 1, None);
321
322 assert!(!cache.contains(&create_test_digest(2)));
324 assert!(cache.contains(&digest1));
326 assert!(cache.contains(&create_test_digest(3)));
327 assert!(cache.contains(&digest4));
328 }
329
330 #[test]
331 fn test_multiple_client_addresses() {
332 let cache = SubmittedTransactionCache::new(
333 None,
334 Arc::new(SubmittedTransactionCacheMetrics::new_test()),
335 );
336 let digest = create_test_digest(1);
337 let addr1 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
338 let addr2 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2));
339 let addr3 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3));
340
341 cache.record_submitted_tx(&digest, 2, Some(addr1));
343
344 cache.record_submitted_tx(&digest, 2, Some(addr2));
346
347 cache.record_submitted_tx(&digest, 2, Some(addr1));
349
350 cache.record_submitted_tx(&digest, 2, Some(addr3));
352
353 cache.increment_submission_count(&digest);
355 cache.increment_submission_count(&digest);
356
357 let result = cache.increment_submission_count(&digest);
359 assert!(result.is_some());
360
361 let (spam_weight, addrs) = result.unwrap();
362 assert_eq!(spam_weight, Weight::one());
363 assert_eq!(addrs.len(), 3);
364 assert!(addrs.contains(&addr1));
365 assert!(addrs.contains(&addr2));
366 assert!(addrs.contains(&addr3));
367 }
368
369 #[test]
370 fn test_retry_tracking() {
371 let cache = SubmittedTransactionCache::new(
373 Some(3),
374 Arc::new(SubmittedTransactionCacheMetrics::new_test()),
375 );
376 let digest1 = create_test_digest(1);
377 let digest2 = create_test_digest(2);
378 let digest3 = create_test_digest(3);
379 let digest4 = create_test_digest(4);
380
381 cache.record_submitted_tx(&digest1, 1, None);
383 cache.record_submitted_tx(&digest2, 1, None);
384 cache.record_submitted_tx(&digest3, 1, None);
385
386 assert!(cache.contains(&digest1));
388 assert!(cache.contains(&digest2));
389 assert!(cache.contains(&digest3));
390
391 cache.record_submitted_tx(&digest1, 1, None);
393
394 cache.record_submitted_tx(&digest4, 1, None);
396
397 assert!(cache.contains(&digest1));
399 assert!(!cache.contains(&digest2));
401 assert!(cache.contains(&digest3));
403 assert!(cache.contains(&digest4));
404 }
405}