sui_core/
overload_monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityState;
5use mysten_metrics::monitored_scope;
6use std::cmp::{max, min};
7use std::hash::Hasher;
8use std::sync::Weak;
9use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use sui_config::node::AuthorityOverloadConfig;
13use sui_types::digests::TransactionDigest;
14use sui_types::error::SuiErrorKind;
15use sui_types::error::SuiResult;
16use sui_types::fp_bail;
17use tokio::time::sleep;
18use tracing::{debug, info};
19use twox_hash::XxHash64;
20
21#[derive(Default)]
22pub struct AuthorityOverloadInfo {
23    /// Whether the authority is overloaded.
24    pub is_overload: AtomicBool,
25
26    /// The calculated percentage of transactions to drop.
27    pub load_shedding_percentage: AtomicU32,
28}
29
30impl AuthorityOverloadInfo {
31    pub fn set_overload(&self, load_shedding_percentage: u32) {
32        self.is_overload.store(true, Ordering::Relaxed);
33        self.load_shedding_percentage
34            .store(min(load_shedding_percentage, 100), Ordering::Relaxed);
35    }
36
37    pub fn clear_overload(&self) {
38        self.is_overload.store(false, Ordering::Relaxed);
39        self.load_shedding_percentage.store(0, Ordering::Relaxed);
40    }
41}
42
43const STEADY_OVERLOAD_REDUCTION_PERCENTAGE: u32 = 10;
44const EXECUTION_RATE_RATIO_FOR_COMPARISON: f64 = 0.95;
45const ADDITIONAL_LOAD_SHEDDING: f64 = 0.02;
46
47// The update interval of the random seed used to determine whether a txn should be rejected.
48const SEED_UPDATE_DURATION_SECS: u64 = 30;
49
50// Monitors the overload signals in `authority_state` periodically, and updates its `overload_info`
51// when the signals indicates overload.
52pub async fn overload_monitor(
53    authority_state: Weak<AuthorityState>,
54    config: AuthorityOverloadConfig,
55) {
56    info!("Starting system overload monitor.");
57
58    loop {
59        let authority_exist = check_authority_overload(&authority_state, &config);
60        if !authority_exist {
61            // `authority_state` doesn't exist anymore. Quit overload monitor.
62            break;
63        }
64        sleep(config.overload_monitor_interval).await;
65    }
66
67    info!("Shut down system overload monitor.");
68}
69
70// Checks authority overload signals, and updates authority's `overload_info`.
71// Returns whether the authority state exists.
72fn check_authority_overload(
73    authority_state: &Weak<AuthorityState>,
74    config: &AuthorityOverloadConfig,
75) -> bool {
76    let _scope = monitored_scope("OverloadMonitor::check_authority_overload");
77    let authority_arc = authority_state.upgrade();
78    if authority_arc.is_none() {
79        // `authority_state` doesn't exist anymore.
80        return false;
81    }
82
83    let authority = authority_arc.unwrap();
84    let queueing_latency = authority
85        .metrics
86        .execution_queueing_latency
87        .latency()
88        .unwrap_or_default();
89    let txn_ready_rate = authority.metrics.txn_ready_rate_tracker.lock().rate();
90    let execution_rate = authority.metrics.execution_rate_tracker.lock().rate();
91
92    debug!(
93        "Check authority overload signal, queueing latency {:?}, ready rate {:?}, execution rate {:?}.",
94        queueing_latency, txn_ready_rate, execution_rate
95    );
96
97    let (is_overload, load_shedding_percentage) = check_overload_signals(
98        config,
99        authority
100            .overload_info
101            .load_shedding_percentage
102            .load(Ordering::Relaxed),
103        queueing_latency,
104        txn_ready_rate,
105        execution_rate,
106    );
107
108    if is_overload {
109        authority
110            .overload_info
111            .set_overload(load_shedding_percentage);
112    } else {
113        authority.overload_info.clear_overload();
114    }
115
116    authority
117        .metrics
118        .authority_overload_status
119        .set(is_overload as i64);
120    authority
121        .metrics
122        .authority_load_shedding_percentage
123        .set(load_shedding_percentage as i64);
124    true
125}
126
127// Calculates the percentage of transactions to drop in order to reduce execution queue.
128// Returns the integer percentage between 0 and 100.
129fn calculate_load_shedding_percentage(txn_ready_rate: f64, execution_rate: f64) -> u32 {
130    // When transaction ready rate is practically 0, we aren't adding more load to the
131    // execution driver, so no shedding.
132    // TODO: consensus handler or transaction manager can also be overloaded.
133    if txn_ready_rate < 1e-10 {
134        return 0;
135    }
136
137    // Deflate the execution rate to account for the case that execution_rate is close to
138    // txn_ready_rate.
139    if execution_rate * EXECUTION_RATE_RATIO_FOR_COMPARISON > txn_ready_rate {
140        return 0;
141    }
142
143    // In order to maintain execution queue length, we need to drop at least (1 - executionRate / readyRate).
144    // To reduce the queue length, here we add 10% more transactions to drop.
145    (((1.0 - execution_rate * EXECUTION_RATE_RATIO_FOR_COMPARISON / txn_ready_rate)
146        + ADDITIONAL_LOAD_SHEDDING)
147        .min(1.0)
148        * 100.0)
149        .round() as u32
150}
151
152// Given overload signals (`queueing_latency`, `txn_ready_rate`, `execution_rate`), return whether
153// the authority server should enter load shedding mode, and how much percentage of transactions to drop.
154// Note that the final load shedding percentage should also take the current load shedding percentage
155// into consideration. If we are already shedding 40% load, based on the current txn_ready_rate
156// and execution_rate, we need to shed 10% more, the outcome is that we need to shed
157// 40% + (1 - 40%) * 10% = 46%.
158// When txn_ready_rate is less than execution_rate, we gradually reduce load shedding percentage until
159// the queueing latency is back to normal.
160fn check_overload_signals(
161    config: &AuthorityOverloadConfig,
162    current_load_shedding_percentage: u32,
163    queueing_latency: Duration,
164    txn_ready_rate: f64,
165    execution_rate: f64,
166) -> (bool, u32) {
167    // First, we calculate based on the current `txn_ready_rate` and `execution_rate`,
168    // what's the percentage of traffic to shed from `txn_ready_rate`.
169    let additional_load_shedding_percentage;
170    if queueing_latency > config.execution_queue_latency_hard_limit {
171        let calculated_load_shedding_percentage =
172            calculate_load_shedding_percentage(txn_ready_rate, execution_rate);
173
174        additional_load_shedding_percentage = if calculated_load_shedding_percentage > 0
175            || txn_ready_rate >= config.safe_transaction_ready_rate as f64
176        {
177            max(
178                calculated_load_shedding_percentage,
179                config.min_load_shedding_percentage_above_hard_limit,
180            )
181        } else {
182            0
183        };
184    } else if queueing_latency > config.execution_queue_latency_soft_limit {
185        additional_load_shedding_percentage =
186            calculate_load_shedding_percentage(txn_ready_rate, execution_rate);
187    } else {
188        additional_load_shedding_percentage = 0;
189    }
190
191    // Next, we calculate the new load shedding percentage.
192    let load_shedding_percentage = if additional_load_shedding_percentage > 0 {
193        // When we need to shed more load, since the `txn_ready_rate` is already influenced
194        // by `current_load_shedding_percentage`, we need to calculate the new load shedding
195        // percentage from `current_load_shedding_percentage` and
196        // `additional_load_shedding_percentage`.
197        current_load_shedding_percentage
198            + (100 - current_load_shedding_percentage) * additional_load_shedding_percentage / 100
199    } else if txn_ready_rate > config.safe_transaction_ready_rate as f64
200        && current_load_shedding_percentage > 10
201    {
202        // We don't need to shed more load. However, the enqueue rate is still not minimal.
203        // We gradually reduce load shedding percentage (10% at a time) to gracefully accept
204        // more load.
205        current_load_shedding_percentage - STEADY_OVERLOAD_REDUCTION_PERCENTAGE
206    } else {
207        // The current transaction ready rate is considered very low. Turn off load shedding mode.
208        0
209    };
210
211    let load_shedding_percentage = min(
212        load_shedding_percentage,
213        config.max_load_shedding_percentage,
214    );
215    let overload_status = load_shedding_percentage > 0;
216    (overload_status, load_shedding_percentage)
217}
218
219// Return true if we should reject the txn with `tx_digest`.
220fn should_reject_tx(
221    load_shedding_percentage: u32,
222    tx_digest: TransactionDigest,
223    temporal_seed: u64,
224) -> bool {
225    // TODO: we also need to add a secret salt (e.g. first consensus commit in the current epoch),
226    // to prevent gaming the system.
227    let mut hasher = XxHash64::with_seed(temporal_seed);
228    hasher.write(tx_digest.inner());
229    let value = hasher.finish();
230    value % 100 < load_shedding_percentage as u64
231}
232
233// Checks if we can accept the transaction with `tx_digest`.
234pub fn overload_monitor_accept_tx(
235    load_shedding_percentage: u32,
236    tx_digest: TransactionDigest,
237) -> SuiResult {
238    // Derive a random seed from the epoch time for transaction selection. Changing the seed every
239    // `SEED_UPDATE_DURATION_SECS` interval allows rejected transaction's retry to have a chance
240    // to go through in the future.
241    // Also, using the epoch time instead of randomly generating a seed allows that all validators
242    // makes the same decision.
243    let temporal_seed = SystemTime::now()
244        .duration_since(UNIX_EPOCH)
245        .expect("Sui did not exist prior to 1970")
246        .as_secs()
247        / SEED_UPDATE_DURATION_SECS;
248
249    if should_reject_tx(load_shedding_percentage, tx_digest, temporal_seed) {
250        // TODO: using `SEED_UPDATE_DURATION_SECS` is a safe suggestion that the time based seed
251        // is definitely different by then. However, a shorter suggestion may be available.
252        fp_bail!(
253            SuiErrorKind::ValidatorOverloadedRetryAfter {
254                retry_after_secs: SEED_UPDATE_DURATION_SECS
255            }
256            .into()
257        );
258    }
259    Ok(())
260}
261
262#[cfg(test)]
263#[allow(clippy::disallowed_methods)] // allow unbounded_channel() since tests are simulating txn manager execution driver interaction.
264mod tests {
265    use super::*;
266
267    use crate::authority::test_authority_builder::TestAuthorityBuilder;
268    use rand::{
269        Rng, SeedableRng,
270        rngs::{OsRng, StdRng},
271    };
272    use std::sync::Arc;
273    use sui_macros::sim_test;
274    use tokio::sync::mpsc::UnboundedReceiver;
275    use tokio::sync::mpsc::UnboundedSender;
276    use tokio::sync::mpsc::unbounded_channel;
277    use tokio::sync::oneshot;
278    use tokio::task::JoinHandle;
279    use tokio::time::{Instant, MissedTickBehavior, interval};
280
281    #[test]
282    pub fn test_authority_overload_info() {
283        let overload_info = AuthorityOverloadInfo::default();
284        assert!(!overload_info.is_overload.load(Ordering::Relaxed));
285        assert_eq!(
286            overload_info
287                .load_shedding_percentage
288                .load(Ordering::Relaxed),
289            0
290        );
291
292        {
293            overload_info.set_overload(20);
294            assert!(overload_info.is_overload.load(Ordering::Relaxed));
295            assert_eq!(
296                overload_info
297                    .load_shedding_percentage
298                    .load(Ordering::Relaxed),
299                20
300            );
301        }
302
303        // Tests that load shedding percentage can't go beyond 100%.
304        {
305            overload_info.set_overload(110);
306            assert!(overload_info.is_overload.load(Ordering::Relaxed));
307            assert_eq!(
308                overload_info
309                    .load_shedding_percentage
310                    .load(Ordering::Relaxed),
311                100
312            );
313        }
314
315        {
316            overload_info.clear_overload();
317            assert!(!overload_info.is_overload.load(Ordering::Relaxed));
318            assert_eq!(
319                overload_info
320                    .load_shedding_percentage
321                    .load(Ordering::Relaxed),
322                0
323            );
324        }
325    }
326
327    #[test]
328    pub fn test_calculate_load_shedding_ratio() {
329        assert_eq!(calculate_load_shedding_percentage(95.0, 100.1), 0);
330        assert_eq!(calculate_load_shedding_percentage(95.0, 100.0), 2);
331        assert_eq!(calculate_load_shedding_percentage(100.0, 100.0), 7);
332        assert_eq!(calculate_load_shedding_percentage(110.0, 100.0), 16);
333        assert_eq!(calculate_load_shedding_percentage(180.0, 100.0), 49);
334        assert_eq!(calculate_load_shedding_percentage(100.0, 0.0), 100);
335        assert_eq!(calculate_load_shedding_percentage(0.0, 1.0), 0);
336    }
337
338    #[test]
339    pub fn test_check_overload_signals() {
340        let config = AuthorityOverloadConfig {
341            execution_queue_latency_hard_limit: Duration::from_secs(10),
342            execution_queue_latency_soft_limit: Duration::from_secs(1),
343            max_load_shedding_percentage: 90,
344            ..Default::default()
345        };
346
347        // When execution queueing latency is within soft limit, don't start overload protection.
348        assert_eq!(
349            check_overload_signals(&config, 0, Duration::from_millis(500), 1000.0, 10.0),
350            (false, 0)
351        );
352
353        // When execution queueing latency hits soft limit and execution rate is higher, don't
354        // start overload protection.
355        assert_eq!(
356            check_overload_signals(&config, 0, Duration::from_secs(2), 100.0, 120.0),
357            (false, 0)
358        );
359
360        // When execution queueing latency hits soft limit, but not hard limit, start overload
361        // protection.
362        assert_eq!(
363            check_overload_signals(&config, 0, Duration::from_secs(2), 100.0, 100.0),
364            (true, 7)
365        );
366
367        // When execution queueing latency hits hard limit, start more aggressive overload
368        // protection.
369        assert_eq!(
370            check_overload_signals(&config, 0, Duration::from_secs(11), 100.0, 100.0),
371            (true, 50)
372        );
373
374        // When execution queueing latency hits hard limit and calculated shedding percentage
375        // is higher than min_load_shedding_percentage_above_hard_limit.
376        assert_eq!(
377            check_overload_signals(&config, 0, Duration::from_secs(11), 240.0, 100.0),
378            (true, 62)
379        );
380
381        // When execution queueing latency hits hard limit, but transaction ready rate
382        // is within safe_transaction_ready_rate, don't start overload protection.
383        assert_eq!(
384            check_overload_signals(&config, 0, Duration::from_secs(11), 20.0, 100.0),
385            (false, 0)
386        );
387
388        // Maximum transactions shed is cap by `max_load_shedding_percentage` config.
389        assert_eq!(
390            check_overload_signals(&config, 0, Duration::from_secs(11), 100.0, 0.0),
391            (true, 90)
392        );
393
394        // When the system is already shedding 50% of load, and the current txn ready rate
395        // and execution rate require another 20%, the final shedding rate is 60%.
396        assert_eq!(
397            check_overload_signals(&config, 50, Duration::from_secs(2), 116.0, 100.0),
398            (true, 60)
399        );
400
401        // Load shedding percentage is gradually reduced when txn ready rate is lower than
402        // execution rate.
403        assert_eq!(
404            check_overload_signals(&config, 90, Duration::from_secs(2), 200.0, 300.0),
405            (true, 80)
406        );
407
408        // When queueing delay is above hard limit, we shed additional 50% every time.
409        assert_eq!(
410            check_overload_signals(&config, 50, Duration::from_secs(11), 100.0, 100.0),
411            (true, 75)
412        );
413    }
414
415    #[tokio::test(flavor = "current_thread")]
416    pub async fn test_check_authority_overload() {
417        telemetry_subscribers::init_for_testing();
418
419        let config = AuthorityOverloadConfig {
420            safe_transaction_ready_rate: 0,
421            ..Default::default()
422        };
423        let state = TestAuthorityBuilder::new()
424            .with_authority_overload_config(config.clone())
425            .build()
426            .await;
427
428        // Initialize latency reporter.
429        for _ in 0..1000 {
430            state
431                .metrics
432                .execution_queueing_latency
433                .report(Duration::from_secs(20));
434        }
435
436        // Creates a simple case to see if authority state overload_info can be updated
437        // correctly by check_authority_overload.
438        let authority = Arc::downgrade(&state);
439        assert!(check_authority_overload(&authority, &config));
440        assert!(state.overload_info.is_overload.load(Ordering::Relaxed));
441        assert_eq!(
442            state
443                .overload_info
444                .load_shedding_percentage
445                .load(Ordering::Relaxed),
446            config.min_load_shedding_percentage_above_hard_limit
447        );
448
449        // Checks that check_authority_overload should return false when the input
450        // authority state doesn't exist.
451        let authority = Arc::downgrade(&state);
452        drop(state);
453        assert!(!check_authority_overload(&authority, &config));
454    }
455
456    // Creates an AuthorityState and starts an overload monitor that monitors its metrics.
457    async fn start_overload_monitor() -> (Arc<AuthorityState>, JoinHandle<()>) {
458        let overload_config = AuthorityOverloadConfig::default();
459        let state = TestAuthorityBuilder::new()
460            .with_authority_overload_config(overload_config.clone())
461            .build()
462            .await;
463        let authority_state = Arc::downgrade(&state);
464        let monitor_handle = tokio::spawn(async move {
465            overload_monitor(authority_state, overload_config).await;
466        });
467        (state, monitor_handle)
468    }
469
470    // Starts a load generator that generates a steady workload, and also allow it to accept
471    // burst of request through `burst_rx`.
472    // Request tracking is done by the overload monitor inside `authority`.
473    fn start_load_generator(
474        steady_rate: f64,
475        tx: UnboundedSender<Instant>,
476        mut burst_rx: UnboundedReceiver<u32>,
477        authority: Arc<AuthorityState>,
478        enable_load_shedding: bool,
479        total_requests_arc: Arc<AtomicU32>,
480        dropped_requests_arc: Arc<AtomicU32>,
481    ) -> JoinHandle<()> {
482        tokio::spawn(async move {
483            let mut interval = interval(Duration::from_secs_f64(1.0 / steady_rate));
484            let mut rng = StdRng::from_rng(&mut OsRng).unwrap();
485            let mut total_requests: u32 = 0;
486            let mut total_dropped_requests: u32 = 0;
487
488            // Helper function to check whether we should send a request.
489            let mut do_send =
490                |enable_load_shedding: bool, authority: Arc<AuthorityState>| -> bool {
491                    if enable_load_shedding {
492                        let shedding_percentage = authority
493                            .overload_info
494                            .load_shedding_percentage
495                            .load(Ordering::Relaxed);
496                        !(shedding_percentage > 0 && rng.gen_range(0..100) < shedding_percentage)
497                    } else {
498                        true
499                    }
500                };
501
502            loop {
503                tokio::select! {
504                    now = interval.tick() => {
505                        total_requests += 1;
506                        if do_send(enable_load_shedding, authority.clone()) {
507                            if tx.send(now).is_err() {
508                                info!("Load generator stopping. Total requests {:?}, total dropped requests {:?}.", total_requests, total_dropped_requests);
509                                total_requests_arc.store(total_requests, Ordering::SeqCst);
510                                dropped_requests_arc.store(total_dropped_requests, Ordering::SeqCst);
511                                return;
512                            }
513                            authority.metrics.txn_ready_rate_tracker.lock().record();
514                        } else {
515                            total_dropped_requests += 1;
516                        }
517                    }
518                    Some(burst) = burst_rx.recv() => {
519                        let now = Instant::now();
520                        total_requests += burst;
521                        for _ in 0..burst {
522                            if do_send(enable_load_shedding, authority.clone()) {
523                                if tx.send(now).is_err() {
524                                    info!("Load generator stopping. Total requests {:?}, total dropped requests {:?}.", total_requests, total_dropped_requests);
525                                    total_requests_arc.store(total_requests, Ordering::SeqCst);
526                                    dropped_requests_arc.store(total_dropped_requests, Ordering::SeqCst);
527                                    return;
528                                }
529                                authority.metrics.txn_ready_rate_tracker.lock().record();
530                            } else {
531                                total_dropped_requests += 1;
532                            }
533                        }
534                    }
535                }
536            }
537        })
538    }
539
540    // Starts a request executor that can consume request based on `execution_rate`.
541    // Request tracking is done by the overload monitor inside `authority`.
542    fn start_executor(
543        execution_rate: f64,
544        mut rx: UnboundedReceiver<Instant>,
545        mut stop_rx: oneshot::Receiver<()>,
546        authority: Arc<AuthorityState>,
547    ) -> JoinHandle<()> {
548        tokio::spawn(async move {
549            let mut interval = interval(Duration::from_secs_f64(1.0 / execution_rate));
550            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
551            loop {
552                tokio::select! {
553                    Some(start_time) = rx.recv() => {
554                        authority.metrics.execution_rate_tracker.lock().record();
555                        authority.metrics.execution_queueing_latency.report(start_time.elapsed());
556                        interval.tick().await;
557                    }
558                    _ = &mut stop_rx => {
559                        info!("Executor stopping");
560                        return;
561                    }
562                }
563            }
564        })
565    }
566
567    // Helper fundtion to periodically print the current overload info.
568    async fn sleep_and_print_stats(state: Arc<AuthorityState>, seconds: u32) {
569        for _ in 0..seconds {
570            info!(
571                "Overload: {:?}. Shedding percentage: {:?}. Queue: {:?}, Ready rate: {:?}. Exec rate: {:?}.",
572                state.overload_info.is_overload.load(Ordering::Relaxed),
573                state
574                    .overload_info
575                    .load_shedding_percentage
576                    .load(Ordering::Relaxed),
577                state.metrics.execution_queueing_latency.latency(),
578                state.metrics.txn_ready_rate_tracker.lock().rate(),
579                state.metrics.execution_rate_tracker.lock().rate(),
580            );
581            sleep(Duration::from_secs(1)).await;
582        }
583    }
584
585    // Running a workload with consistent steady `generator_rate` and `executor_rate`.
586    // It checks that the dropped requests should in between min_dropping_rate and max_dropping_rate.
587    async fn run_consistent_workload_test(
588        generator_rate: f64,
589        executor_rate: f64,
590        min_dropping_rate: f64,
591        max_dropping_rate: f64,
592    ) {
593        let (state, monitor_handle) = start_overload_monitor().await;
594
595        let (tx, rx) = unbounded_channel();
596        let (_burst_tx, burst_rx) = unbounded_channel();
597        let total_requests = Arc::new(AtomicU32::new(0));
598        let dropped_requests = Arc::new(AtomicU32::new(0));
599        let load_generator = start_load_generator(
600            generator_rate,
601            tx.clone(),
602            burst_rx,
603            state.clone(),
604            true,
605            total_requests.clone(),
606            dropped_requests.clone(),
607        );
608
609        let (stop_tx, stop_rx) = oneshot::channel();
610        let executor = start_executor(executor_rate, rx, stop_rx, state.clone());
611
612        sleep_and_print_stats(state.clone(), 300).await;
613
614        stop_tx.send(()).unwrap();
615        let _ = tokio::join!(load_generator, executor);
616
617        let dropped_ratio = dropped_requests.load(Ordering::SeqCst) as f64
618            / total_requests.load(Ordering::SeqCst) as f64;
619        assert!(min_dropping_rate <= dropped_ratio);
620        assert!(dropped_ratio <= max_dropping_rate);
621
622        monitor_handle.abort();
623        let _ = monitor_handle.await;
624    }
625
626    // Tests that when request generation rate is slower than execution rate, no requests should be dropped.
627    #[tokio::test(flavor = "current_thread", start_paused = true)]
628    pub async fn test_workload_consistent_no_overload() {
629        telemetry_subscribers::init_for_testing();
630        run_consistent_workload_test(900.0, 1000.0, 0.0, 0.0).await;
631    }
632
633    // Tests that when request generation rate is slightly above execution rate, a small portion of
634    // requests should be dropped.
635    #[tokio::test(flavor = "current_thread", start_paused = true)]
636    pub async fn test_workload_consistent_slightly_overload() {
637        telemetry_subscribers::init_for_testing();
638        // Dropping rate should be around 15%.
639        run_consistent_workload_test(1100.0, 1000.0, 0.05, 0.25).await;
640    }
641
642    // Tests that when request generation rate is much higher than execution rate, a large portion of
643    // requests should be dropped.
644    #[tokio::test(flavor = "current_thread", start_paused = true)]
645    pub async fn test_workload_consistent_overload() {
646        telemetry_subscribers::init_for_testing();
647        // Dropping rate should be around 70%.
648        run_consistent_workload_test(3000.0, 1000.0, 0.6, 0.8).await;
649    }
650
651    // Tests that when there is a very short single spike, no request should be dropped.
652    #[tokio::test(flavor = "current_thread", start_paused = true)]
653    pub async fn test_workload_single_spike() {
654        telemetry_subscribers::init_for_testing();
655        let (state, monitor_handle) = start_overload_monitor().await;
656
657        let (tx, rx) = unbounded_channel();
658        let (burst_tx, burst_rx) = unbounded_channel();
659        let total_requests = Arc::new(AtomicU32::new(0));
660        let dropped_requests = Arc::new(AtomicU32::new(0));
661        let load_generator = start_load_generator(
662            10.0,
663            tx.clone(),
664            burst_rx,
665            state.clone(),
666            true,
667            total_requests.clone(),
668            dropped_requests.clone(),
669        );
670
671        let (stop_tx, stop_rx) = oneshot::channel();
672        let executor = start_executor(1000.0, rx, stop_rx, state.clone());
673
674        sleep_and_print_stats(state.clone(), 10).await;
675        // Send out a burst of 5000 requests.
676        burst_tx.send(5000).unwrap();
677        sleep_and_print_stats(state.clone(), 20).await;
678
679        stop_tx.send(()).unwrap();
680        let _ = tokio::join!(load_generator, executor);
681
682        // No requests should be dropped.
683        assert_eq!(dropped_requests.load(Ordering::SeqCst), 0);
684
685        monitor_handle.abort();
686        let _ = monitor_handle.await;
687    }
688
689    // Tests that when there are regular spikes that keep queueing latency consistently high,
690    // overload monitor should kick in and shed load.
691    #[tokio::test(flavor = "current_thread", start_paused = true)]
692    pub async fn test_workload_consistent_short_spike() {
693        telemetry_subscribers::init_for_testing();
694        let (state, monitor_handle) = start_overload_monitor().await;
695
696        let (tx, rx) = unbounded_channel();
697        let (burst_tx, burst_rx) = unbounded_channel();
698        let total_requests = Arc::new(AtomicU32::new(0));
699        let dropped_requests = Arc::new(AtomicU32::new(0));
700        let load_generator = start_load_generator(
701            10.0,
702            tx.clone(),
703            burst_rx,
704            state.clone(),
705            true,
706            total_requests.clone(),
707            dropped_requests.clone(),
708        );
709
710        let (stop_tx, stop_rx) = oneshot::channel();
711        let executor = start_executor(1000.0, rx, stop_rx, state.clone());
712
713        sleep_and_print_stats(state.clone(), 15).await;
714        for _ in 0..16 {
715            // Regularly send out a burst of request.
716            burst_tx.send(10000).unwrap();
717            sleep_and_print_stats(state.clone(), 5).await;
718        }
719
720        stop_tx.send(()).unwrap();
721        let _ = tokio::join!(load_generator, executor);
722        let dropped_ratio = dropped_requests.load(Ordering::SeqCst) as f64
723            / total_requests.load(Ordering::SeqCst) as f64;
724
725        // We should drop about 50% of request because the burst throughput is about 2x of
726        // execution rate.
727        assert!(0.4 < dropped_ratio);
728        assert!(dropped_ratio < 0.6);
729
730        monitor_handle.abort();
731        let _ = monitor_handle.await;
732    }
733
734    // Tests that the ratio of rejected transactions created randomly matches load shedding percentage in
735    // the overload monitor.
736    #[test]
737    fn test_txn_rejection_rate() {
738        for rejection_percentage in 0..=100 {
739            let mut reject_count = 0;
740            for _ in 0..10000 {
741                let digest = TransactionDigest::random();
742                if should_reject_tx(rejection_percentage, digest, 28455473) {
743                    reject_count += 1;
744                }
745            }
746
747            debug!(
748                "Rejection percentage: {:?}, reject count: {:?}.",
749                rejection_percentage, reject_count
750            );
751            // Give it a 3% fluctuation.
752            assert!(rejection_percentage as f32 / 100.0 - 0.03 < reject_count as f32 / 10000.0);
753            assert!(reject_count as f32 / 10000.0 < rejection_percentage as f32 / 100.0 + 0.03);
754        }
755    }
756
757    // Tests that rejected transaction will have a chance to be accepted in the future.
758    #[sim_test]
759    async fn test_txn_rejection_over_time() {
760        let start_time = Instant::now();
761        let mut digest = TransactionDigest::random();
762        let mut temporal_seed = 1708108277 / SEED_UPDATE_DURATION_SECS;
763        let load_shedding_percentage = 50;
764
765        // Find a rejected transaction with 50% rejection rate.
766        while !should_reject_tx(load_shedding_percentage, digest, temporal_seed)
767            && start_time.elapsed() < Duration::from_secs(30)
768        {
769            digest = TransactionDigest::random();
770        }
771
772        // It should always be rejected using the current temporal_seed.
773        for _ in 0..100 {
774            assert!(should_reject_tx(
775                load_shedding_percentage,
776                digest,
777                temporal_seed
778            ));
779        }
780
781        // It will be accepted in the future.
782        temporal_seed += 1;
783        while should_reject_tx(load_shedding_percentage, digest, temporal_seed)
784            && start_time.elapsed() < Duration::from_secs(30)
785        {
786            temporal_seed += 1;
787        }
788
789        // Make sure that the tests can finish within 30 seconds.
790        assert!(start_time.elapsed() < Duration::from_secs(30));
791    }
792}