1use crate::authority::AuthorityState;
5use mysten_metrics::monitored_scope;
6use std::cmp::{max, min};
7use std::hash::Hasher;
8use std::sync::Weak;
9use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use sui_config::node::AuthorityOverloadConfig;
13use sui_types::digests::TransactionDigest;
14use sui_types::error::SuiErrorKind;
15use sui_types::error::SuiResult;
16use sui_types::fp_bail;
17use tokio::time::sleep;
18use tracing::{debug, info};
19use twox_hash::XxHash64;
20
21#[derive(Default)]
22pub struct AuthorityOverloadInfo {
23 pub is_overload: AtomicBool,
25
26 pub load_shedding_percentage: AtomicU32,
28}
29
30impl AuthorityOverloadInfo {
31 pub fn set_overload(&self, load_shedding_percentage: u32) {
32 self.is_overload.store(true, Ordering::Relaxed);
33 self.load_shedding_percentage
34 .store(min(load_shedding_percentage, 100), Ordering::Relaxed);
35 }
36
37 pub fn clear_overload(&self) {
38 self.is_overload.store(false, Ordering::Relaxed);
39 self.load_shedding_percentage.store(0, Ordering::Relaxed);
40 }
41}
42
43const STEADY_OVERLOAD_REDUCTION_PERCENTAGE: u32 = 10;
44const EXECUTION_RATE_RATIO_FOR_COMPARISON: f64 = 0.95;
45const ADDITIONAL_LOAD_SHEDDING: f64 = 0.02;
46
47const SEED_UPDATE_DURATION_SECS: u64 = 30;
49
50pub async fn overload_monitor(
53 authority_state: Weak<AuthorityState>,
54 config: AuthorityOverloadConfig,
55) {
56 info!("Starting system overload monitor.");
57
58 loop {
59 let authority_exist = check_authority_overload(&authority_state, &config);
60 if !authority_exist {
61 break;
63 }
64 sleep(config.overload_monitor_interval).await;
65 }
66
67 info!("Shut down system overload monitor.");
68}
69
70fn check_authority_overload(
73 authority_state: &Weak<AuthorityState>,
74 config: &AuthorityOverloadConfig,
75) -> bool {
76 let _scope = monitored_scope("OverloadMonitor::check_authority_overload");
77 let authority_arc = authority_state.upgrade();
78 if authority_arc.is_none() {
79 return false;
81 }
82
83 let authority = authority_arc.unwrap();
84 let queueing_latency = authority
85 .metrics
86 .execution_queueing_latency
87 .latency()
88 .unwrap_or_default();
89 let txn_ready_rate = authority.metrics.txn_ready_rate_tracker.lock().rate();
90 let execution_rate = authority.metrics.execution_rate_tracker.lock().rate();
91
92 debug!(
93 "Check authority overload signal, queueing latency {:?}, ready rate {:?}, execution rate {:?}.",
94 queueing_latency, txn_ready_rate, execution_rate
95 );
96
97 let (is_overload, load_shedding_percentage) = check_overload_signals(
98 config,
99 authority
100 .overload_info
101 .load_shedding_percentage
102 .load(Ordering::Relaxed),
103 queueing_latency,
104 txn_ready_rate,
105 execution_rate,
106 );
107
108 if is_overload {
109 authority
110 .overload_info
111 .set_overload(load_shedding_percentage);
112 } else {
113 authority.overload_info.clear_overload();
114 }
115
116 authority
117 .metrics
118 .authority_overload_status
119 .set(is_overload as i64);
120 authority
121 .metrics
122 .authority_load_shedding_percentage
123 .set(load_shedding_percentage as i64);
124 true
125}
126
127fn calculate_load_shedding_percentage(txn_ready_rate: f64, execution_rate: f64) -> u32 {
130 if txn_ready_rate < 1e-10 {
134 return 0;
135 }
136
137 if execution_rate * EXECUTION_RATE_RATIO_FOR_COMPARISON > txn_ready_rate {
140 return 0;
141 }
142
143 (((1.0 - execution_rate * EXECUTION_RATE_RATIO_FOR_COMPARISON / txn_ready_rate)
146 + ADDITIONAL_LOAD_SHEDDING)
147 .min(1.0)
148 * 100.0)
149 .round() as u32
150}
151
152fn check_overload_signals(
161 config: &AuthorityOverloadConfig,
162 current_load_shedding_percentage: u32,
163 queueing_latency: Duration,
164 txn_ready_rate: f64,
165 execution_rate: f64,
166) -> (bool, u32) {
167 let additional_load_shedding_percentage;
170 if queueing_latency > config.execution_queue_latency_hard_limit {
171 let calculated_load_shedding_percentage =
172 calculate_load_shedding_percentage(txn_ready_rate, execution_rate);
173
174 additional_load_shedding_percentage = if calculated_load_shedding_percentage > 0
175 || txn_ready_rate >= config.safe_transaction_ready_rate as f64
176 {
177 max(
178 calculated_load_shedding_percentage,
179 config.min_load_shedding_percentage_above_hard_limit,
180 )
181 } else {
182 0
183 };
184 } else if queueing_latency > config.execution_queue_latency_soft_limit {
185 additional_load_shedding_percentage =
186 calculate_load_shedding_percentage(txn_ready_rate, execution_rate);
187 } else {
188 additional_load_shedding_percentage = 0;
189 }
190
191 let load_shedding_percentage = if additional_load_shedding_percentage > 0 {
193 current_load_shedding_percentage
198 + (100 - current_load_shedding_percentage) * additional_load_shedding_percentage / 100
199 } else if txn_ready_rate > config.safe_transaction_ready_rate as f64
200 && current_load_shedding_percentage > 10
201 {
202 current_load_shedding_percentage - STEADY_OVERLOAD_REDUCTION_PERCENTAGE
206 } else {
207 0
209 };
210
211 let load_shedding_percentage = min(
212 load_shedding_percentage,
213 config.max_load_shedding_percentage,
214 );
215 let overload_status = load_shedding_percentage > 0;
216 (overload_status, load_shedding_percentage)
217}
218
219fn should_reject_tx(
221 load_shedding_percentage: u32,
222 tx_digest: TransactionDigest,
223 temporal_seed: u64,
224) -> bool {
225 let mut hasher = XxHash64::with_seed(temporal_seed);
228 hasher.write(tx_digest.inner());
229 let value = hasher.finish();
230 value % 100 < load_shedding_percentage as u64
231}
232
233pub fn overload_monitor_accept_tx(
235 load_shedding_percentage: u32,
236 tx_digest: TransactionDigest,
237) -> SuiResult {
238 let temporal_seed = SystemTime::now()
244 .duration_since(UNIX_EPOCH)
245 .expect("Sui did not exist prior to 1970")
246 .as_secs()
247 / SEED_UPDATE_DURATION_SECS;
248
249 if should_reject_tx(load_shedding_percentage, tx_digest, temporal_seed) {
250 fp_bail!(
253 SuiErrorKind::ValidatorOverloadedRetryAfter {
254 retry_after_secs: SEED_UPDATE_DURATION_SECS
255 }
256 .into()
257 );
258 }
259 Ok(())
260}
261
262#[cfg(test)]
263#[allow(clippy::disallowed_methods)] mod tests {
265 use super::*;
266
267 use crate::authority::test_authority_builder::TestAuthorityBuilder;
268 use rand::{
269 Rng, SeedableRng,
270 rngs::{OsRng, StdRng},
271 };
272 use std::sync::Arc;
273 use sui_macros::sim_test;
274 use tokio::sync::mpsc::UnboundedReceiver;
275 use tokio::sync::mpsc::UnboundedSender;
276 use tokio::sync::mpsc::unbounded_channel;
277 use tokio::sync::oneshot;
278 use tokio::task::JoinHandle;
279 use tokio::time::{Instant, MissedTickBehavior, interval};
280
281 #[test]
282 pub fn test_authority_overload_info() {
283 let overload_info = AuthorityOverloadInfo::default();
284 assert!(!overload_info.is_overload.load(Ordering::Relaxed));
285 assert_eq!(
286 overload_info
287 .load_shedding_percentage
288 .load(Ordering::Relaxed),
289 0
290 );
291
292 {
293 overload_info.set_overload(20);
294 assert!(overload_info.is_overload.load(Ordering::Relaxed));
295 assert_eq!(
296 overload_info
297 .load_shedding_percentage
298 .load(Ordering::Relaxed),
299 20
300 );
301 }
302
303 {
305 overload_info.set_overload(110);
306 assert!(overload_info.is_overload.load(Ordering::Relaxed));
307 assert_eq!(
308 overload_info
309 .load_shedding_percentage
310 .load(Ordering::Relaxed),
311 100
312 );
313 }
314
315 {
316 overload_info.clear_overload();
317 assert!(!overload_info.is_overload.load(Ordering::Relaxed));
318 assert_eq!(
319 overload_info
320 .load_shedding_percentage
321 .load(Ordering::Relaxed),
322 0
323 );
324 }
325 }
326
327 #[test]
328 pub fn test_calculate_load_shedding_ratio() {
329 assert_eq!(calculate_load_shedding_percentage(95.0, 100.1), 0);
330 assert_eq!(calculate_load_shedding_percentage(95.0, 100.0), 2);
331 assert_eq!(calculate_load_shedding_percentage(100.0, 100.0), 7);
332 assert_eq!(calculate_load_shedding_percentage(110.0, 100.0), 16);
333 assert_eq!(calculate_load_shedding_percentage(180.0, 100.0), 49);
334 assert_eq!(calculate_load_shedding_percentage(100.0, 0.0), 100);
335 assert_eq!(calculate_load_shedding_percentage(0.0, 1.0), 0);
336 }
337
338 #[test]
339 pub fn test_check_overload_signals() {
340 let config = AuthorityOverloadConfig {
341 execution_queue_latency_hard_limit: Duration::from_secs(10),
342 execution_queue_latency_soft_limit: Duration::from_secs(1),
343 max_load_shedding_percentage: 90,
344 ..Default::default()
345 };
346
347 assert_eq!(
349 check_overload_signals(&config, 0, Duration::from_millis(500), 1000.0, 10.0),
350 (false, 0)
351 );
352
353 assert_eq!(
356 check_overload_signals(&config, 0, Duration::from_secs(2), 100.0, 120.0),
357 (false, 0)
358 );
359
360 assert_eq!(
363 check_overload_signals(&config, 0, Duration::from_secs(2), 100.0, 100.0),
364 (true, 7)
365 );
366
367 assert_eq!(
370 check_overload_signals(&config, 0, Duration::from_secs(11), 100.0, 100.0),
371 (true, 50)
372 );
373
374 assert_eq!(
377 check_overload_signals(&config, 0, Duration::from_secs(11), 240.0, 100.0),
378 (true, 62)
379 );
380
381 assert_eq!(
384 check_overload_signals(&config, 0, Duration::from_secs(11), 20.0, 100.0),
385 (false, 0)
386 );
387
388 assert_eq!(
390 check_overload_signals(&config, 0, Duration::from_secs(11), 100.0, 0.0),
391 (true, 90)
392 );
393
394 assert_eq!(
397 check_overload_signals(&config, 50, Duration::from_secs(2), 116.0, 100.0),
398 (true, 60)
399 );
400
401 assert_eq!(
404 check_overload_signals(&config, 90, Duration::from_secs(2), 200.0, 300.0),
405 (true, 80)
406 );
407
408 assert_eq!(
410 check_overload_signals(&config, 50, Duration::from_secs(11), 100.0, 100.0),
411 (true, 75)
412 );
413 }
414
415 #[tokio::test(flavor = "current_thread")]
416 pub async fn test_check_authority_overload() {
417 telemetry_subscribers::init_for_testing();
418
419 let config = AuthorityOverloadConfig {
420 safe_transaction_ready_rate: 0,
421 ..Default::default()
422 };
423 let state = TestAuthorityBuilder::new()
424 .with_authority_overload_config(config.clone())
425 .build()
426 .await;
427
428 for _ in 0..1000 {
430 state
431 .metrics
432 .execution_queueing_latency
433 .report(Duration::from_secs(20));
434 }
435
436 let authority = Arc::downgrade(&state);
439 assert!(check_authority_overload(&authority, &config));
440 assert!(state.overload_info.is_overload.load(Ordering::Relaxed));
441 assert_eq!(
442 state
443 .overload_info
444 .load_shedding_percentage
445 .load(Ordering::Relaxed),
446 config.min_load_shedding_percentage_above_hard_limit
447 );
448
449 let authority = Arc::downgrade(&state);
452 drop(state);
453 assert!(!check_authority_overload(&authority, &config));
454 }
455
456 async fn start_overload_monitor() -> (Arc<AuthorityState>, JoinHandle<()>) {
458 let overload_config = AuthorityOverloadConfig::default();
459 let state = TestAuthorityBuilder::new()
460 .with_authority_overload_config(overload_config.clone())
461 .build()
462 .await;
463 let authority_state = Arc::downgrade(&state);
464 let monitor_handle = tokio::spawn(async move {
465 overload_monitor(authority_state, overload_config).await;
466 });
467 (state, monitor_handle)
468 }
469
470 fn start_load_generator(
474 steady_rate: f64,
475 tx: UnboundedSender<Instant>,
476 mut burst_rx: UnboundedReceiver<u32>,
477 authority: Arc<AuthorityState>,
478 enable_load_shedding: bool,
479 total_requests_arc: Arc<AtomicU32>,
480 dropped_requests_arc: Arc<AtomicU32>,
481 ) -> JoinHandle<()> {
482 tokio::spawn(async move {
483 let mut interval = interval(Duration::from_secs_f64(1.0 / steady_rate));
484 let mut rng = StdRng::from_rng(&mut OsRng).unwrap();
485 let mut total_requests: u32 = 0;
486 let mut total_dropped_requests: u32 = 0;
487
488 let mut do_send =
490 |enable_load_shedding: bool, authority: Arc<AuthorityState>| -> bool {
491 if enable_load_shedding {
492 let shedding_percentage = authority
493 .overload_info
494 .load_shedding_percentage
495 .load(Ordering::Relaxed);
496 !(shedding_percentage > 0 && rng.gen_range(0..100) < shedding_percentage)
497 } else {
498 true
499 }
500 };
501
502 loop {
503 tokio::select! {
504 now = interval.tick() => {
505 total_requests += 1;
506 if do_send(enable_load_shedding, authority.clone()) {
507 if tx.send(now).is_err() {
508 info!("Load generator stopping. Total requests {:?}, total dropped requests {:?}.", total_requests, total_dropped_requests);
509 total_requests_arc.store(total_requests, Ordering::SeqCst);
510 dropped_requests_arc.store(total_dropped_requests, Ordering::SeqCst);
511 return;
512 }
513 authority.metrics.txn_ready_rate_tracker.lock().record();
514 } else {
515 total_dropped_requests += 1;
516 }
517 }
518 Some(burst) = burst_rx.recv() => {
519 let now = Instant::now();
520 total_requests += burst;
521 for _ in 0..burst {
522 if do_send(enable_load_shedding, authority.clone()) {
523 if tx.send(now).is_err() {
524 info!("Load generator stopping. Total requests {:?}, total dropped requests {:?}.", total_requests, total_dropped_requests);
525 total_requests_arc.store(total_requests, Ordering::SeqCst);
526 dropped_requests_arc.store(total_dropped_requests, Ordering::SeqCst);
527 return;
528 }
529 authority.metrics.txn_ready_rate_tracker.lock().record();
530 } else {
531 total_dropped_requests += 1;
532 }
533 }
534 }
535 }
536 }
537 })
538 }
539
540 fn start_executor(
543 execution_rate: f64,
544 mut rx: UnboundedReceiver<Instant>,
545 mut stop_rx: oneshot::Receiver<()>,
546 authority: Arc<AuthorityState>,
547 ) -> JoinHandle<()> {
548 tokio::spawn(async move {
549 let mut interval = interval(Duration::from_secs_f64(1.0 / execution_rate));
550 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
551 loop {
552 tokio::select! {
553 Some(start_time) = rx.recv() => {
554 authority.metrics.execution_rate_tracker.lock().record();
555 authority.metrics.execution_queueing_latency.report(start_time.elapsed());
556 interval.tick().await;
557 }
558 _ = &mut stop_rx => {
559 info!("Executor stopping");
560 return;
561 }
562 }
563 }
564 })
565 }
566
567 async fn sleep_and_print_stats(state: Arc<AuthorityState>, seconds: u32) {
569 for _ in 0..seconds {
570 info!(
571 "Overload: {:?}. Shedding percentage: {:?}. Queue: {:?}, Ready rate: {:?}. Exec rate: {:?}.",
572 state.overload_info.is_overload.load(Ordering::Relaxed),
573 state
574 .overload_info
575 .load_shedding_percentage
576 .load(Ordering::Relaxed),
577 state.metrics.execution_queueing_latency.latency(),
578 state.metrics.txn_ready_rate_tracker.lock().rate(),
579 state.metrics.execution_rate_tracker.lock().rate(),
580 );
581 sleep(Duration::from_secs(1)).await;
582 }
583 }
584
585 async fn run_consistent_workload_test(
588 generator_rate: f64,
589 executor_rate: f64,
590 min_dropping_rate: f64,
591 max_dropping_rate: f64,
592 ) {
593 let (state, monitor_handle) = start_overload_monitor().await;
594
595 let (tx, rx) = unbounded_channel();
596 let (_burst_tx, burst_rx) = unbounded_channel();
597 let total_requests = Arc::new(AtomicU32::new(0));
598 let dropped_requests = Arc::new(AtomicU32::new(0));
599 let load_generator = start_load_generator(
600 generator_rate,
601 tx.clone(),
602 burst_rx,
603 state.clone(),
604 true,
605 total_requests.clone(),
606 dropped_requests.clone(),
607 );
608
609 let (stop_tx, stop_rx) = oneshot::channel();
610 let executor = start_executor(executor_rate, rx, stop_rx, state.clone());
611
612 sleep_and_print_stats(state.clone(), 300).await;
613
614 stop_tx.send(()).unwrap();
615 let _ = tokio::join!(load_generator, executor);
616
617 let dropped_ratio = dropped_requests.load(Ordering::SeqCst) as f64
618 / total_requests.load(Ordering::SeqCst) as f64;
619 assert!(min_dropping_rate <= dropped_ratio);
620 assert!(dropped_ratio <= max_dropping_rate);
621
622 monitor_handle.abort();
623 let _ = monitor_handle.await;
624 }
625
626 #[tokio::test(flavor = "current_thread", start_paused = true)]
628 pub async fn test_workload_consistent_no_overload() {
629 telemetry_subscribers::init_for_testing();
630 run_consistent_workload_test(900.0, 1000.0, 0.0, 0.0).await;
631 }
632
633 #[tokio::test(flavor = "current_thread", start_paused = true)]
636 pub async fn test_workload_consistent_slightly_overload() {
637 telemetry_subscribers::init_for_testing();
638 run_consistent_workload_test(1100.0, 1000.0, 0.05, 0.25).await;
640 }
641
642 #[tokio::test(flavor = "current_thread", start_paused = true)]
645 pub async fn test_workload_consistent_overload() {
646 telemetry_subscribers::init_for_testing();
647 run_consistent_workload_test(3000.0, 1000.0, 0.6, 0.8).await;
649 }
650
651 #[tokio::test(flavor = "current_thread", start_paused = true)]
653 pub async fn test_workload_single_spike() {
654 telemetry_subscribers::init_for_testing();
655 let (state, monitor_handle) = start_overload_monitor().await;
656
657 let (tx, rx) = unbounded_channel();
658 let (burst_tx, burst_rx) = unbounded_channel();
659 let total_requests = Arc::new(AtomicU32::new(0));
660 let dropped_requests = Arc::new(AtomicU32::new(0));
661 let load_generator = start_load_generator(
662 10.0,
663 tx.clone(),
664 burst_rx,
665 state.clone(),
666 true,
667 total_requests.clone(),
668 dropped_requests.clone(),
669 );
670
671 let (stop_tx, stop_rx) = oneshot::channel();
672 let executor = start_executor(1000.0, rx, stop_rx, state.clone());
673
674 sleep_and_print_stats(state.clone(), 10).await;
675 burst_tx.send(5000).unwrap();
677 sleep_and_print_stats(state.clone(), 20).await;
678
679 stop_tx.send(()).unwrap();
680 let _ = tokio::join!(load_generator, executor);
681
682 assert_eq!(dropped_requests.load(Ordering::SeqCst), 0);
684
685 monitor_handle.abort();
686 let _ = monitor_handle.await;
687 }
688
689 #[tokio::test(flavor = "current_thread", start_paused = true)]
692 pub async fn test_workload_consistent_short_spike() {
693 telemetry_subscribers::init_for_testing();
694 let (state, monitor_handle) = start_overload_monitor().await;
695
696 let (tx, rx) = unbounded_channel();
697 let (burst_tx, burst_rx) = unbounded_channel();
698 let total_requests = Arc::new(AtomicU32::new(0));
699 let dropped_requests = Arc::new(AtomicU32::new(0));
700 let load_generator = start_load_generator(
701 10.0,
702 tx.clone(),
703 burst_rx,
704 state.clone(),
705 true,
706 total_requests.clone(),
707 dropped_requests.clone(),
708 );
709
710 let (stop_tx, stop_rx) = oneshot::channel();
711 let executor = start_executor(1000.0, rx, stop_rx, state.clone());
712
713 sleep_and_print_stats(state.clone(), 15).await;
714 for _ in 0..16 {
715 burst_tx.send(10000).unwrap();
717 sleep_and_print_stats(state.clone(), 5).await;
718 }
719
720 stop_tx.send(()).unwrap();
721 let _ = tokio::join!(load_generator, executor);
722 let dropped_ratio = dropped_requests.load(Ordering::SeqCst) as f64
723 / total_requests.load(Ordering::SeqCst) as f64;
724
725 assert!(0.4 < dropped_ratio);
728 assert!(dropped_ratio < 0.6);
729
730 monitor_handle.abort();
731 let _ = monitor_handle.await;
732 }
733
734 #[test]
737 fn test_txn_rejection_rate() {
738 for rejection_percentage in 0..=100 {
739 let mut reject_count = 0;
740 for _ in 0..10000 {
741 let digest = TransactionDigest::random();
742 if should_reject_tx(rejection_percentage, digest, 28455473) {
743 reject_count += 1;
744 }
745 }
746
747 debug!(
748 "Rejection percentage: {:?}, reject count: {:?}.",
749 rejection_percentage, reject_count
750 );
751 assert!(rejection_percentage as f32 / 100.0 - 0.03 < reject_count as f32 / 10000.0);
753 assert!(reject_count as f32 / 10000.0 < rejection_percentage as f32 / 100.0 + 0.03);
754 }
755 }
756
757 #[sim_test]
759 async fn test_txn_rejection_over_time() {
760 let start_time = Instant::now();
761 let mut digest = TransactionDigest::random();
762 let mut temporal_seed = 1708108277 / SEED_UPDATE_DURATION_SECS;
763 let load_shedding_percentage = 50;
764
765 while !should_reject_tx(load_shedding_percentage, digest, temporal_seed)
767 && start_time.elapsed() < Duration::from_secs(30)
768 {
769 digest = TransactionDigest::random();
770 }
771
772 for _ in 0..100 {
774 assert!(should_reject_tx(
775 load_shedding_percentage,
776 digest,
777 temporal_seed
778 ));
779 }
780
781 temporal_seed += 1;
783 while should_reject_tx(load_shedding_percentage, digest, temporal_seed)
784 && start_time.elapsed() < Duration::from_secs(30)
785 {
786 temporal_seed += 1;
787 }
788
789 assert!(start_time.elapsed() < Duration::from_secs(30));
791 }
792}