1use arc_swap::ArcSwap;
5use parking_lot::Mutex;
6use std::collections::{BTreeMap, VecDeque};
7use std::num::NonZeroU64;
8use std::sync::Arc;
9use sui_protocol_config::Chain;
10use sui_types::digests::ChainIdentifier;
11use sui_types::messages_consensus::TimestampMs;
12use tracing::{debug, warn};
13
14use crate::authority::AuthorityMetrics;
15
16const DEFAULT_OBSERVATIONS_WINDOW: u64 = 120; const DEFAULT_THROUGHPUT_PROFILE_UPDATE_INTERVAL_SECS: u64 = 60; const DEFAULT_THROUGHPUT_PROFILE_COOL_DOWN_THRESHOLD: u64 = 10; #[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd)]
21pub struct ThroughputProfile {
22 pub level: Level,
23 pub throughput: u64,
26}
27
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd)]
29pub enum Level {
30 Low,
31 Medium,
32 High,
33}
34
35impl From<usize> for Level {
36 fn from(value: usize) -> Self {
37 if value == 0 {
38 Level::Low
39 } else if value == 1 {
40 Level::Medium
41 } else {
42 Level::High
43 }
44 }
45}
46
47impl From<Level> for usize {
48 fn from(value: Level) -> Self {
49 match value {
50 Level::Low => 0,
51 Level::Medium => 1,
52 Level::High => 2,
53 }
54 }
55}
56
57#[derive(Debug)]
58pub struct ThroughputProfileRanges {
59 profiles: BTreeMap<u64, ThroughputProfile>,
61}
62
63impl ThroughputProfileRanges {
64 pub fn from_chain(chain_id: ChainIdentifier) -> ThroughputProfileRanges {
65 let to_profiles = |medium: u64, high: u64| -> Vec<ThroughputProfile> {
66 vec![
67 ThroughputProfile {
68 level: Level::Low,
69 throughput: 0,
70 },
71 ThroughputProfile {
72 level: Level::Medium,
73 throughput: medium,
74 },
75 ThroughputProfile {
76 level: Level::High,
77 throughput: high,
78 },
79 ]
80 };
81
82 match chain_id.chain() {
83 Chain::Mainnet => ThroughputProfileRanges::new(&to_profiles(500, 2_000)),
84 Chain::Testnet => ThroughputProfileRanges::new(&to_profiles(500, 2_000)),
85 Chain::Unknown => ThroughputProfileRanges::new(&to_profiles(1_000, 2_000)),
86 }
87 }
88
89 pub fn new(profiles: &[ThroughputProfile]) -> Self {
90 let mut p: BTreeMap<u64, ThroughputProfile> = BTreeMap::new();
91
92 for profile in profiles {
93 assert!(
94 !p.iter().any(|(_, pr)| pr.level == profile.level),
95 "Attempted to insert profile with same level"
96 );
97 assert!(
98 p.insert(profile.throughput, *profile).is_none(),
99 "Attempted to insert profile with same throughput"
100 );
101 }
102
103 assert_eq!(
105 *p.get(&0).unwrap(),
106 ThroughputProfile {
107 level: Level::Low,
108 throughput: 0
109 }
110 );
111
112 Self { profiles: p }
113 }
114
115 pub fn lowest_profile(&self) -> ThroughputProfile {
116 *self
117 .profiles
118 .first_key_value()
119 .expect("Should contain at least one throughput profile")
120 .1
121 }
122
123 pub fn highest_profile(&self) -> ThroughputProfile {
124 *self
125 .profiles
126 .last_key_value()
127 .expect("Should contain at least one throughput profile")
128 .1
129 }
130 pub fn resolve(&self, current_throughput: u64) -> ThroughputProfile {
132 let mut iter = self.profiles.iter();
133 while let Some((threshold, profile)) = iter.next_back() {
134 if current_throughput >= *threshold {
135 return *profile;
136 }
137 }
138
139 warn!(
140 "Could not resolve throughput profile for throughput {} - we shouldn't end up here. Fallback to lowest profile as default.",
141 current_throughput
142 );
143
144 self.highest_profile()
146 }
147}
148
149impl Default for ThroughputProfileRanges {
150 fn default() -> Self {
151 let profiles = vec![
152 ThroughputProfile {
153 level: Level::Low,
154 throughput: 0,
155 },
156 ThroughputProfile {
157 level: Level::High,
158 throughput: 2_000,
159 },
160 ];
161 ThroughputProfileRanges::new(&profiles)
162 }
163}
164
165pub type TimestampSecs = u64;
166
167#[derive(Debug, Copy, Clone)]
168pub struct ThroughputProfileEntry {
169 profile: ThroughputProfile,
171 timestamp: TimestampSecs,
173 throughput: u64,
175}
176
177#[derive(Default)]
178struct ConsensusThroughputCalculatorInner {
179 observations: VecDeque<(TimestampSecs, u64)>,
180 total_transactions: u64,
181 last_oldest_timestamp: Option<TimestampSecs>,
183}
184
185pub struct ConsensusThroughputProfiler {
190 throughput_profile_update_interval: TimestampSecs,
196 throughput_profile_cool_down_threshold: u64,
200 profile_ranges: ThroughputProfileRanges,
202 last_throughput_profile: ArcSwap<ThroughputProfileEntry>,
204 metrics: Arc<AuthorityMetrics>,
205 calculator: Arc<ConsensusThroughputCalculator>,
207}
208
209impl ConsensusThroughputProfiler {
210 pub fn new(
211 calculator: Arc<ConsensusThroughputCalculator>,
212 throughput_profile_update_interval: Option<TimestampSecs>,
213 throughput_profile_cool_down_threshold: Option<u64>,
214 metrics: Arc<AuthorityMetrics>,
215 profile_ranges: ThroughputProfileRanges,
216 ) -> Self {
217 let throughput_profile_update_interval = throughput_profile_update_interval
218 .unwrap_or(DEFAULT_THROUGHPUT_PROFILE_UPDATE_INTERVAL_SECS);
219 let throughput_profile_cool_down_threshold = throughput_profile_cool_down_threshold
220 .unwrap_or(DEFAULT_THROUGHPUT_PROFILE_COOL_DOWN_THRESHOLD);
221
222 assert!(
223 throughput_profile_update_interval > 0,
224 "throughput_profile_update_interval should be >= 0"
225 );
226
227 assert!(
228 (0..=30).contains(&throughput_profile_cool_down_threshold),
229 "Out of bounds provided cool down threshold offset"
230 );
231
232 debug!("Profile ranges used: {:?}", profile_ranges);
233
234 Self {
235 throughput_profile_update_interval,
236 throughput_profile_cool_down_threshold,
237 last_throughput_profile: ArcSwap::from_pointee(ThroughputProfileEntry {
238 profile: profile_ranges.highest_profile(),
239 timestamp: 0,
240 throughput: 0,
241 }), profile_ranges,
243 metrics,
244 calculator,
245 }
246 }
247
248 pub fn throughput_level(&self) -> (Level, u64) {
251 let (throughput, timestamp) = self.calculator.current_throughput();
253 let profile = self.update_and_fetch_throughput_profile(throughput, timestamp);
254
255 (profile.profile.level, profile.throughput)
256 }
257
258 fn update_and_fetch_throughput_profile(
266 &self,
267 throughput: u64,
268 timestamp: TimestampSecs,
269 ) -> ThroughputProfileEntry {
270 let last_profile = self.last_throughput_profile.load();
271
272 if timestamp == 0 || timestamp < last_profile.timestamp {
276 return **last_profile;
277 }
278
279 let profile = self.profile_ranges.resolve(throughput);
280
281 let current_seconds_bucket = timestamp / self.throughput_profile_update_interval;
282 let last_profile_seconds_bucket =
283 last_profile.timestamp / self.throughput_profile_update_interval;
284
285 let should_update_profile = if current_seconds_bucket > last_profile_seconds_bucket
289 || (profile != last_profile.profile && last_profile.timestamp == timestamp)
290 {
291 if profile < last_profile.profile {
292 let min_throughput = last_profile
294 .profile
295 .throughput
296 .saturating_mul(100 - self.throughput_profile_cool_down_threshold)
297 / 100;
298 throughput <= min_throughput
299 } else {
300 true
301 }
302 } else {
303 false
304 };
305
306 if should_update_profile {
307 let p = ThroughputProfileEntry {
308 profile,
309 timestamp,
310 throughput,
311 };
312 debug!("Updating throughput profile to {:?}", p);
313 self.last_throughput_profile.store(Arc::new(p));
314
315 self.metrics
316 .consensus_calculated_throughput_profile
317 .set(usize::from(profile.level) as i64);
318
319 p
320 } else {
321 **last_profile
322 }
323 }
324}
325
326pub struct ConsensusThroughputCalculator {
330 observations_window: u64,
336 inner: Mutex<ConsensusThroughputCalculatorInner>,
337 current_throughput: ArcSwap<(u64, TimestampSecs)>,
338 metrics: Arc<AuthorityMetrics>,
339}
340
341impl ConsensusThroughputCalculator {
342 pub fn new(observations_window: Option<NonZeroU64>, metrics: Arc<AuthorityMetrics>) -> Self {
343 let observations_window = observations_window
344 .unwrap_or(NonZeroU64::new(DEFAULT_OBSERVATIONS_WINDOW).unwrap())
345 .get();
346
347 Self {
348 observations_window,
349 inner: Mutex::new(ConsensusThroughputCalculatorInner::default()),
350 current_throughput: ArcSwap::from_pointee((0, 0)),
351 metrics,
352 }
353 }
354
355 pub fn add_transactions(&self, timestamp_ms: TimestampMs, num_of_transactions: u64) {
359 let mut inner = self.inner.lock();
360 let timestamp_secs: TimestampSecs = timestamp_ms / 1_000; if let Some((front_ts, transactions)) = inner.observations.front_mut() {
363 if timestamp_secs < *front_ts {
366 warn!(
367 "Ignoring observation of transactions:{} as has earlier timestamp than last observation {}s < {}s",
368 num_of_transactions, timestamp_secs, front_ts
369 );
370 return;
371 }
372
373 if timestamp_secs == *front_ts {
375 *transactions = transactions.saturating_add(num_of_transactions);
376 } else {
377 inner
378 .observations
379 .push_front((timestamp_secs, num_of_transactions));
380 }
381 } else {
382 inner
383 .observations
384 .push_front((timestamp_secs, num_of_transactions));
385 }
386
387 inner.total_transactions = inner.total_transactions.saturating_add(num_of_transactions);
389
390 if inner.observations.len() as u64 >= self.observations_window {
394 let last_element_ts = if inner.observations.len() as u64 == self.observations_window {
395 if let Some(ts) = inner.last_oldest_timestamp {
396 ts
397 } else {
398 warn!(
399 "Skip calculation - we still don't have enough elements to pop the last observation"
400 );
401 return;
402 }
403 } else {
404 let (ts, txes) = inner.observations.pop_back().unwrap();
405 inner.total_transactions = inner.total_transactions.saturating_sub(txes);
406 ts
407 };
408
409 inner.last_oldest_timestamp = Some(last_element_ts);
411
412 let (first_element_ts, _first_element_transactions) = inner
414 .observations
415 .front()
416 .expect("There should be at least on element in the list");
417
418 let period = first_element_ts.saturating_sub(last_element_ts);
419
420 if period > 0 {
421 let current_throughput = inner.total_transactions / period;
422
423 self.metrics
424 .consensus_calculated_throughput
425 .set(current_throughput as i64);
426
427 self.current_throughput
428 .store(Arc::new((current_throughput, timestamp_secs)));
429 } else {
430 warn!(
431 "Skip calculating throughput as time period is {}. This is very unlikely to happen, should investigate.",
432 period
433 );
434 }
435 }
436 }
437
438 pub fn current_throughput(&self) -> (u64, TimestampSecs) {
440 *self.current_throughput.load().as_ref()
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447 use crate::consensus_throughput_calculator::Level::{High, Low};
448 use prometheus::Registry;
449
450 #[test]
451 pub fn test_throughput_profile_ranges() {
452 let ranges = ThroughputProfileRanges::default();
453
454 assert_eq!(
455 ranges.resolve(0),
456 ThroughputProfile {
457 level: Low,
458 throughput: 0
459 }
460 );
461 assert_eq!(
462 ranges.resolve(1_000),
463 ThroughputProfile {
464 level: Low,
465 throughput: 0
466 }
467 );
468 assert_eq!(
469 ranges.resolve(2_000),
470 ThroughputProfile {
471 level: High,
472 throughput: 2_000
473 }
474 );
475 assert_eq!(
476 ranges.resolve(u64::MAX),
477 ThroughputProfile {
478 level: High,
479 throughput: 2_000
480 }
481 );
482 }
483
484 #[test]
485 #[cfg_attr(msim, ignore)]
486 pub fn test_consensus_throughput_calculator() {
487 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
488 let max_observation_points: NonZeroU64 = NonZeroU64::new(3).unwrap();
489
490 let calculator = ConsensusThroughputCalculator::new(Some(max_observation_points), metrics);
491
492 assert_eq!(calculator.current_throughput(), (0, 0));
493
494 calculator.add_transactions(1000 as TimestampMs, 1_000);
495 calculator.add_transactions(2000 as TimestampMs, 1_000);
496 calculator.add_transactions(3000 as TimestampMs, 1_000);
497 calculator.add_transactions(4000 as TimestampMs, 1_000);
498
499 assert_eq!(calculator.current_throughput(), (1000, 4));
501
502 calculator.add_transactions(5_000 as TimestampMs, 2_500);
504 calculator.add_transactions(6_000 as TimestampMs, 2_800);
505 assert_eq!(calculator.current_throughput(), (2100, 6));
506
507 calculator.add_transactions(15_000 as TimestampMs, 0);
511
512 assert_eq!(calculator.current_throughput(), (481, 15));
513
514 calculator.add_transactions(17_000 as TimestampMs, 0);
516 assert_eq!(calculator.current_throughput(), (233, 17));
517
518 calculator.add_transactions(19_000 as TimestampMs, 0);
519 calculator.add_transactions(20_000 as TimestampMs, 0);
520 assert_eq!(calculator.current_throughput(), (0, 20));
521
522 calculator.add_transactions(21_000 as TimestampMs, 1_000);
524 calculator.add_transactions(22_000 as TimestampMs, 2_000);
525 calculator.add_transactions(23_000 as TimestampMs, 3_100);
526 assert_eq!(calculator.current_throughput(), (2033, 23));
527 }
528
529 #[test]
530 #[cfg_attr(msim, ignore)]
531 pub fn test_throughput_calculator_same_timestamp_observations() {
532 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
533 let max_observation_points: NonZeroU64 = NonZeroU64::new(2).unwrap();
534
535 let calculator = ConsensusThroughputCalculator::new(Some(max_observation_points), metrics);
536
537 calculator.add_transactions(1_000, 0);
539
540 for _ in 0..10 {
543 calculator.add_transactions(2_340, 100);
544 }
545 assert_eq!(calculator.current_throughput(), (0, 0));
546
547 calculator.add_transactions(5_000, 0);
549
550 assert_eq!(calculator.current_throughput(), (250, 5));
551
552 calculator.add_transactions(5_000, 400);
554 assert_eq!(calculator.current_throughput(), (350, 5));
555
556 calculator.add_transactions(5_000, 300);
557 assert_eq!(calculator.current_throughput(), (425, 5));
558 }
559
560 #[test]
561 #[cfg_attr(msim, ignore)]
562 pub fn test_consensus_throughput_profiler() {
563 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
564 let throughput_profile_update_interval: TimestampSecs = 5;
565 let max_observation_points: NonZeroU64 = NonZeroU64::new(3).unwrap();
566 let throughput_profile_cool_down_threshold: u64 = 10;
567
568 let ranges = ThroughputProfileRanges::default();
569
570 let calculator = Arc::new(ConsensusThroughputCalculator::new(
571 Some(max_observation_points),
572 metrics.clone(),
573 ));
574 let profiler = ConsensusThroughputProfiler::new(
575 calculator.clone(),
576 Some(throughput_profile_update_interval),
577 Some(throughput_profile_cool_down_threshold),
578 metrics,
579 ranges,
580 );
581
582 assert_eq!(profiler.throughput_level(), (High, 0));
585
586 calculator.add_transactions(1000 as TimestampMs, 1_000);
587 calculator.add_transactions(2000 as TimestampMs, 1_000);
588 calculator.add_transactions(3000 as TimestampMs, 1_000);
589
590 assert_eq!(profiler.throughput_level(), (High, 0));
592
593 calculator.add_transactions(4000 as TimestampMs, 2_500);
596 calculator.add_transactions(5000 as TimestampMs, 2_800);
597 assert_eq!(profiler.throughput_level(), (High, 2100));
598
599 calculator.add_transactions(7_000 as TimestampMs, 2_800);
603 calculator.add_transactions(15_000 as TimestampMs, 0);
604
605 assert_eq!(profiler.throughput_level(), (Low, 509));
606
607 calculator.add_transactions(17_000 as TimestampMs, 0);
610 calculator.add_transactions(19_000 as TimestampMs, 0);
611 calculator.add_transactions(20_000 as TimestampMs, 0);
612
613 assert_eq!(profiler.throughput_level(), (Low, 0));
614
615 calculator.add_transactions(20_000 as TimestampMs, 4_000);
618 calculator.add_transactions(20_000 as TimestampMs, 4_000);
619 calculator.add_transactions(20_000 as TimestampMs, 4_000);
620 assert_eq!(profiler.throughput_level(), (High, 2400));
621
622 calculator.add_transactions(22_000 as TimestampMs, 0);
624 calculator.add_transactions(23_000 as TimestampMs, 0);
625 assert_eq!(profiler.throughput_level(), (High, 2400));
626 }
627
628 #[test]
629 #[cfg_attr(msim, ignore)]
630 pub fn test_consensus_throughput_profiler_update_interval() {
631 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
632 let throughput_profile_update_interval: TimestampSecs = 5;
633 let max_observation_points: NonZeroU64 = NonZeroU64::new(2).unwrap();
634
635 let ranges = ThroughputProfileRanges::default();
636
637 let calculator = Arc::new(ConsensusThroughputCalculator::new(
638 Some(max_observation_points),
639 metrics.clone(),
640 ));
641 let profiler = ConsensusThroughputProfiler::new(
642 calculator.clone(),
643 Some(throughput_profile_update_interval),
644 None,
645 metrics,
646 ranges,
647 );
648
649 calculator.add_transactions(3_000 as TimestampMs, 2_200);
653 calculator.add_transactions(4_000 as TimestampMs, 4_200);
654 calculator.add_transactions(7_000 as TimestampMs, 4_200);
655
656 assert_eq!(profiler.throughput_level(), (High, 2_100));
657
658 calculator.add_transactions(10_000 as TimestampMs, 1_000);
660
661 assert_eq!(profiler.throughput_level(), (Low, 866));
662
663 calculator.add_transactions(16_000 as TimestampMs, 20_000);
665
666 assert_eq!(profiler.throughput_level(), (High, 2333));
667
668 calculator.add_transactions(17_000 as TimestampMs, 0);
671 calculator.add_transactions(18_000 as TimestampMs, 0);
672 calculator.add_transactions(19_000 as TimestampMs, 0);
673
674 assert_eq!(profiler.throughput_level(), (High, 2333));
675
676 calculator.add_transactions(20_000 as TimestampMs, 0);
677
678 assert_eq!(profiler.throughput_level(), (Low, 0));
679 }
680
681 #[test]
682 #[cfg_attr(msim, ignore)]
683 pub fn test_consensus_throughput_profiler_cool_down() {
684 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
685 let throughput_profile_update_window: TimestampSecs = 3;
686 let max_observation_points: NonZeroU64 = NonZeroU64::new(3).unwrap();
687 let throughput_profile_cool_down_threshold: u64 = 10;
688
689 let ranges = ThroughputProfileRanges::default();
690
691 let calculator = Arc::new(ConsensusThroughputCalculator::new(
692 Some(max_observation_points),
693 metrics.clone(),
694 ));
695 let profiler = ConsensusThroughputProfiler::new(
696 calculator.clone(),
697 Some(throughput_profile_update_window),
698 Some(throughput_profile_cool_down_threshold),
699 metrics,
700 ranges,
701 );
702
703 for i in 1..=4 {
705 calculator.add_transactions(i * 1_000, 3_000);
706 }
707 assert_eq!(profiler.throughput_level(), (High, 3_000));
708
709 calculator.add_transactions(5_000, 1_900);
712 calculator.add_transactions(6_000, 1_900);
713 calculator.add_transactions(7_000, 1_900);
714
715 assert_eq!(calculator.current_throughput(), (1_900, 7));
716 assert_eq!(profiler.throughput_level(), (High, 3_000));
717
718 calculator.add_transactions(8_000, 1_500);
720 calculator.add_transactions(9_000, 1_500);
721 calculator.add_transactions(10_000, 1_500);
722
723 assert_eq!(profiler.throughput_level(), (Low, 1500));
724 }
725}