sui_core/
consensus_throughput_calculator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use arc_swap::ArcSwap;
5use parking_lot::Mutex;
6use std::collections::{BTreeMap, VecDeque};
7use std::num::NonZeroU64;
8use std::sync::Arc;
9use sui_protocol_config::Chain;
10use sui_types::digests::ChainIdentifier;
11use sui_types::messages_consensus::TimestampMs;
12use tracing::{debug, warn};
13
14use crate::authority::AuthorityMetrics;
15
16const DEFAULT_OBSERVATIONS_WINDOW: u64 = 120; // number of observations to use to calculate the past throughput
17const DEFAULT_THROUGHPUT_PROFILE_UPDATE_INTERVAL_SECS: u64 = 60; // seconds that need to pass between two consecutive throughput profile updates
18const DEFAULT_THROUGHPUT_PROFILE_COOL_DOWN_THRESHOLD: u64 = 10; // 10% of throughput
19
20#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd)]
21pub struct ThroughputProfile {
22    pub level: Level,
23    /// The lower range of the throughput that this profile is referring to. For example, if
24    /// `throughput = 1_000`, then for values >= 1_000 this throughput profile applies.
25    pub throughput: u64,
26}
27
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd)]
29pub enum Level {
30    Low,
31    Medium,
32    High,
33}
34
35impl From<usize> for Level {
36    fn from(value: usize) -> Self {
37        if value == 0 {
38            Level::Low
39        } else if value == 1 {
40            Level::Medium
41        } else {
42            Level::High
43        }
44    }
45}
46
47impl From<Level> for usize {
48    fn from(value: Level) -> Self {
49        match value {
50            Level::Low => 0,
51            Level::Medium => 1,
52            Level::High => 2,
53        }
54    }
55}
56
57#[derive(Debug)]
58pub struct ThroughputProfileRanges {
59    /// Holds the throughput profiles by the throughput range (upper_throughput, cool_down_threshold)
60    profiles: BTreeMap<u64, ThroughputProfile>,
61}
62
63impl ThroughputProfileRanges {
64    pub fn from_chain(chain_id: ChainIdentifier) -> ThroughputProfileRanges {
65        let to_profiles = |medium: u64, high: u64| -> Vec<ThroughputProfile> {
66            vec![
67                ThroughputProfile {
68                    level: Level::Low,
69                    throughput: 0,
70                },
71                ThroughputProfile {
72                    level: Level::Medium,
73                    throughput: medium,
74                },
75                ThroughputProfile {
76                    level: Level::High,
77                    throughput: high,
78                },
79            ]
80        };
81
82        match chain_id.chain() {
83            Chain::Mainnet => ThroughputProfileRanges::new(&to_profiles(500, 2_000)),
84            Chain::Testnet => ThroughputProfileRanges::new(&to_profiles(500, 2_000)),
85            Chain::Unknown => ThroughputProfileRanges::new(&to_profiles(1_000, 2_000)),
86        }
87    }
88
89    pub fn new(profiles: &[ThroughputProfile]) -> Self {
90        let mut p: BTreeMap<u64, ThroughputProfile> = BTreeMap::new();
91
92        for profile in profiles {
93            assert!(
94                !p.iter().any(|(_, pr)| pr.level == profile.level),
95                "Attempted to insert profile with same level"
96            );
97            assert!(
98                p.insert(profile.throughput, *profile).is_none(),
99                "Attempted to insert profile with same throughput"
100            );
101        }
102
103        // By default the Low profile should exist with throughput 0
104        assert_eq!(
105            *p.get(&0).unwrap(),
106            ThroughputProfile {
107                level: Level::Low,
108                throughput: 0
109            }
110        );
111
112        Self { profiles: p }
113    }
114
115    pub fn lowest_profile(&self) -> ThroughputProfile {
116        *self
117            .profiles
118            .first_key_value()
119            .expect("Should contain at least one throughput profile")
120            .1
121    }
122
123    pub fn highest_profile(&self) -> ThroughputProfile {
124        *self
125            .profiles
126            .last_key_value()
127            .expect("Should contain at least one throughput profile")
128            .1
129    }
130    /// Resolves the throughput profile that corresponds to the provided throughput.
131    pub fn resolve(&self, current_throughput: u64) -> ThroughputProfile {
132        let mut iter = self.profiles.iter();
133        while let Some((threshold, profile)) = iter.next_back() {
134            if current_throughput >= *threshold {
135                return *profile;
136            }
137        }
138
139        warn!(
140            "Could not resolve throughput profile for throughput {} - we shouldn't end up here. Fallback to lowest profile as default.",
141            current_throughput
142        );
143
144        // If not found, then we should return the lowest possible profile as default to stay on safe side.
145        self.highest_profile()
146    }
147}
148
149impl Default for ThroughputProfileRanges {
150    fn default() -> Self {
151        let profiles = vec![
152            ThroughputProfile {
153                level: Level::Low,
154                throughput: 0,
155            },
156            ThroughputProfile {
157                level: Level::High,
158                throughput: 2_000,
159            },
160        ];
161        ThroughputProfileRanges::new(&profiles)
162    }
163}
164
165pub type TimestampSecs = u64;
166
167#[derive(Debug, Copy, Clone)]
168pub struct ThroughputProfileEntry {
169    /// The throughput profile
170    profile: ThroughputProfile,
171    /// The time when this throughput profile was created
172    timestamp: TimestampSecs,
173    /// The calculated throughput when this profile created
174    throughput: u64,
175}
176
177#[derive(Default)]
178struct ConsensusThroughputCalculatorInner {
179    observations: VecDeque<(TimestampSecs, u64)>,
180    total_transactions: u64,
181    /// The last timestamp that we considered as oldest to calculate the throughput over the observations window.
182    last_oldest_timestamp: Option<TimestampSecs>,
183}
184
185/// The ConsensusThroughputProfiler is responsible for assigning the right throughput profile by polling
186/// the measured consensus throughput. It is important to rely on the ConsensusThroughputCalculator to measure
187/// throughput as we need to make sure that validators will see an as possible consistent view to assign
188/// the right profile.
189pub struct ConsensusThroughputProfiler {
190    /// The throughput profile will be eligible for update every `throughput_profile_update_interval` seconds.
191    /// A bucketing approach is followed where the throughput timestamp is used in order to calculate on which
192    /// seconds bucket is assigned to. When we detect a change on that bucket then an update is triggered (if a different
193    /// profile is calculated). That allows validators to align on the update timing and ensure they will eventually
194    /// converge as the consensus timestamps are used.
195    throughput_profile_update_interval: TimestampSecs,
196    /// When current calculated throughput (A) is lower than previous, and the assessed profile is now a lower than previous,
197    /// we'll change to the lower profile only when (A) <= (previous_profile.throughput) * (100 - throughput_profile_cool_down_threshold) / 100.
198    /// Otherwise we'll stick to the previous profile. We want to do that to avoid any jittery behaviour that alternates between two profiles.
199    throughput_profile_cool_down_threshold: u64,
200    /// The profile ranges to use to profile the throughput
201    profile_ranges: ThroughputProfileRanges,
202    /// The most recently calculated throughput profile
203    last_throughput_profile: ArcSwap<ThroughputProfileEntry>,
204    metrics: Arc<AuthorityMetrics>,
205    /// The throughput calculator to use to derive the current throughput.
206    calculator: Arc<ConsensusThroughputCalculator>,
207}
208
209impl ConsensusThroughputProfiler {
210    pub fn new(
211        calculator: Arc<ConsensusThroughputCalculator>,
212        throughput_profile_update_interval: Option<TimestampSecs>,
213        throughput_profile_cool_down_threshold: Option<u64>,
214        metrics: Arc<AuthorityMetrics>,
215        profile_ranges: ThroughputProfileRanges,
216    ) -> Self {
217        let throughput_profile_update_interval = throughput_profile_update_interval
218            .unwrap_or(DEFAULT_THROUGHPUT_PROFILE_UPDATE_INTERVAL_SECS);
219        let throughput_profile_cool_down_threshold = throughput_profile_cool_down_threshold
220            .unwrap_or(DEFAULT_THROUGHPUT_PROFILE_COOL_DOWN_THRESHOLD);
221
222        assert!(
223            throughput_profile_update_interval > 0,
224            "throughput_profile_update_interval should be >= 0"
225        );
226
227        assert!(
228            (0..=30).contains(&throughput_profile_cool_down_threshold),
229            "Out of bounds provided cool down threshold offset"
230        );
231
232        debug!("Profile ranges used: {:?}", profile_ranges);
233
234        Self {
235            throughput_profile_update_interval,
236            throughput_profile_cool_down_threshold,
237            last_throughput_profile: ArcSwap::from_pointee(ThroughputProfileEntry {
238                profile: profile_ranges.highest_profile(),
239                timestamp: 0,
240                throughput: 0,
241            }), // assume high throughput so the node is more conservative on bootstrap
242            profile_ranges,
243            metrics,
244            calculator,
245        }
246    }
247
248    // Return the current throughput level and the corresponding throughput when this was last updated.
249    // If that is not set yet then as default the High profile is returned and the throughput will be None.
250    pub fn throughput_level(&self) -> (Level, u64) {
251        // Update throughput profile if necessary time has passed
252        let (throughput, timestamp) = self.calculator.current_throughput();
253        let profile = self.update_and_fetch_throughput_profile(throughput, timestamp);
254
255        (profile.profile.level, profile.throughput)
256    }
257
258    // Calculate and update the throughput profile based on the provided throughput. The throughput profile
259    // will only get updated when a different value has been calculated. For example, if the
260    // `last_throughput_profile` is `Low` , and again we calculate it as `Low` based on input, then we'll
261    // not update the profile or the timestamp. We do care to perform updates only when profiles differ.
262    // To ensure that we are protected against throughput profile change fluctuations, we update a
263    // throughput profile every `throughput_profile_update_interval` seconds based on the provided unix timestamps.
264    // The last throughput profile entry is returned.
265    fn update_and_fetch_throughput_profile(
266        &self,
267        throughput: u64,
268        timestamp: TimestampSecs,
269    ) -> ThroughputProfileEntry {
270        let last_profile = self.last_throughput_profile.load();
271
272        // Skip any processing if provided timestamp is older than the last used one. Also return existing
273        // profile when provided timestamp is 0 - this avoids triggering an immediate update eventually overriding
274        // the default value.
275        if timestamp == 0 || timestamp < last_profile.timestamp {
276            return **last_profile;
277        }
278
279        let profile = self.profile_ranges.resolve(throughput);
280
281        let current_seconds_bucket = timestamp / self.throughput_profile_update_interval;
282        let last_profile_seconds_bucket =
283            last_profile.timestamp / self.throughput_profile_update_interval;
284
285        // Update only when we minimum time has been passed since last update.
286        // We allow the edge case to update on the same bucket when a different profile has been
287        // computed for the exact same timestamp.
288        let should_update_profile = if current_seconds_bucket > last_profile_seconds_bucket
289            || (profile != last_profile.profile && last_profile.timestamp == timestamp)
290        {
291            if profile < last_profile.profile {
292                // If new profile is smaller than previous one, then make sure the cool down threshold is respected.
293                let min_throughput = last_profile
294                    .profile
295                    .throughput
296                    .saturating_mul(100 - self.throughput_profile_cool_down_threshold)
297                    / 100;
298                throughput <= min_throughput
299            } else {
300                true
301            }
302        } else {
303            false
304        };
305
306        if should_update_profile {
307            let p = ThroughputProfileEntry {
308                profile,
309                timestamp,
310                throughput,
311            };
312            debug!("Updating throughput profile to {:?}", p);
313            self.last_throughput_profile.store(Arc::new(p));
314
315            self.metrics
316                .consensus_calculated_throughput_profile
317                .set(usize::from(profile.level) as i64);
318
319            p
320        } else {
321            **last_profile
322        }
323    }
324}
325
326/// ConsensusThroughputCalculator is calculating the transaction throughput as this is coming out from
327/// consensus. The throughput is calculated using a sliding window approach and leveraging the timestamps
328/// provided by consensus.
329pub struct ConsensusThroughputCalculator {
330    /// The number of transaction throughput observations that should be stored within the observations
331    /// vector in the ConsensusThroughputCalculatorInner. Those observations will be used to calculate
332    /// the current transactions throughput. We want to select a number that give us enough observations
333    /// so we better calculate the throughput and protected against spikes. A large enough value though
334    /// will make us less reactive to throughput changes.
335    observations_window: u64,
336    inner: Mutex<ConsensusThroughputCalculatorInner>,
337    current_throughput: ArcSwap<(u64, TimestampSecs)>,
338    metrics: Arc<AuthorityMetrics>,
339}
340
341impl ConsensusThroughputCalculator {
342    pub fn new(observations_window: Option<NonZeroU64>, metrics: Arc<AuthorityMetrics>) -> Self {
343        let observations_window = observations_window
344            .unwrap_or(NonZeroU64::new(DEFAULT_OBSERVATIONS_WINDOW).unwrap())
345            .get();
346
347        Self {
348            observations_window,
349            inner: Mutex::new(ConsensusThroughputCalculatorInner::default()),
350            current_throughput: ArcSwap::from_pointee((0, 0)),
351            metrics,
352        }
353    }
354
355    // Adds an observation of the number of transactions that have been sequenced after deduplication
356    // and the corresponding leader timestamp. The observation timestamps should be monotonically
357    // incremented otherwise observation will be ignored.
358    pub fn add_transactions(&self, timestamp_ms: TimestampMs, num_of_transactions: u64) {
359        let mut inner = self.inner.lock();
360        let timestamp_secs: TimestampSecs = timestamp_ms / 1_000; // lowest bucket we care is seconds
361
362        if let Some((front_ts, transactions)) = inner.observations.front_mut() {
363            // First check that the timestamp is monotonically incremented - ignore any observation that is not
364            // later from previous one (it shouldn't really happen).
365            if timestamp_secs < *front_ts {
366                warn!(
367                    "Ignoring observation of transactions:{} as has earlier timestamp than last observation {}s < {}s",
368                    num_of_transactions, timestamp_secs, front_ts
369                );
370                return;
371            }
372
373            // Not very likely, but if transactions refer to same second we add to the last element.
374            if timestamp_secs == *front_ts {
375                *transactions = transactions.saturating_add(num_of_transactions);
376            } else {
377                inner
378                    .observations
379                    .push_front((timestamp_secs, num_of_transactions));
380            }
381        } else {
382            inner
383                .observations
384                .push_front((timestamp_secs, num_of_transactions));
385        }
386
387        // update total number of transactions in the observations list
388        inner.total_transactions = inner.total_transactions.saturating_add(num_of_transactions);
389
390        // If we have more values on our window of max values, remove the last one, and calculate throughput.
391        // If we have the exact same values on our window of max values, then still calculate the throughput to ensure
392        // that we are taking into account the case where the last bucket gets updated because it falls into the same second.
393        if inner.observations.len() as u64 >= self.observations_window {
394            let last_element_ts = if inner.observations.len() as u64 == self.observations_window {
395                if let Some(ts) = inner.last_oldest_timestamp {
396                    ts
397                } else {
398                    warn!(
399                        "Skip calculation - we still don't have enough elements to pop the last observation"
400                    );
401                    return;
402                }
403            } else {
404                let (ts, txes) = inner.observations.pop_back().unwrap();
405                inner.total_transactions = inner.total_transactions.saturating_sub(txes);
406                ts
407            };
408
409            // update the last oldest timestamp
410            inner.last_oldest_timestamp = Some(last_element_ts);
411
412            // get the first element's timestamp to calculate the transaction rate
413            let (first_element_ts, _first_element_transactions) = inner
414                .observations
415                .front()
416                .expect("There should be at least on element in the list");
417
418            let period = first_element_ts.saturating_sub(last_element_ts);
419
420            if period > 0 {
421                let current_throughput = inner.total_transactions / period;
422
423                self.metrics
424                    .consensus_calculated_throughput
425                    .set(current_throughput as i64);
426
427                self.current_throughput
428                    .store(Arc::new((current_throughput, timestamp_secs)));
429            } else {
430                warn!(
431                    "Skip calculating throughput as time period is {}. This is very unlikely to happen, should investigate.",
432                    period
433                );
434            }
435        }
436    }
437
438    // Returns the current (live calculated) throughput and the corresponding timestamp of when this got updated.
439    pub fn current_throughput(&self) -> (u64, TimestampSecs) {
440        *self.current_throughput.load().as_ref()
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447    use crate::consensus_throughput_calculator::Level::{High, Low};
448    use prometheus::Registry;
449
450    #[test]
451    pub fn test_throughput_profile_ranges() {
452        let ranges = ThroughputProfileRanges::default();
453
454        assert_eq!(
455            ranges.resolve(0),
456            ThroughputProfile {
457                level: Low,
458                throughput: 0
459            }
460        );
461        assert_eq!(
462            ranges.resolve(1_000),
463            ThroughputProfile {
464                level: Low,
465                throughput: 0
466            }
467        );
468        assert_eq!(
469            ranges.resolve(2_000),
470            ThroughputProfile {
471                level: High,
472                throughput: 2_000
473            }
474        );
475        assert_eq!(
476            ranges.resolve(u64::MAX),
477            ThroughputProfile {
478                level: High,
479                throughput: 2_000
480            }
481        );
482    }
483
484    #[test]
485    #[cfg_attr(msim, ignore)]
486    pub fn test_consensus_throughput_calculator() {
487        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
488        let max_observation_points: NonZeroU64 = NonZeroU64::new(3).unwrap();
489
490        let calculator = ConsensusThroughputCalculator::new(Some(max_observation_points), metrics);
491
492        assert_eq!(calculator.current_throughput(), (0, 0));
493
494        calculator.add_transactions(1000 as TimestampMs, 1_000);
495        calculator.add_transactions(2000 as TimestampMs, 1_000);
496        calculator.add_transactions(3000 as TimestampMs, 1_000);
497        calculator.add_transactions(4000 as TimestampMs, 1_000);
498
499        // We expect to have a rate of 1K tx/sec with last update timestamp the 4th second
500        assert_eq!(calculator.current_throughput(), (1000, 4));
501
502        // We are adding more transactions to get over 2K tx/sec
503        calculator.add_transactions(5_000 as TimestampMs, 2_500);
504        calculator.add_transactions(6_000 as TimestampMs, 2_800);
505        assert_eq!(calculator.current_throughput(), (2100, 6));
506
507        // Let's now add 0 transactions after 5 seconds. Since 5 seconds have passed since the last
508        // update and now the transactions are 0 we expect the throughput to be calculate as:
509        // 2800 + 2500 + 0 = 5300 / (15sec - 4sec) = 5300 / 11sec = 481 tx/sec
510        calculator.add_transactions(15_000 as TimestampMs, 0);
511
512        assert_eq!(calculator.current_throughput(), (481, 15));
513
514        // Adding zero transactions for the next 5 seconds will make throughput zero
515        calculator.add_transactions(17_000 as TimestampMs, 0);
516        assert_eq!(calculator.current_throughput(), (233, 17));
517
518        calculator.add_transactions(19_000 as TimestampMs, 0);
519        calculator.add_transactions(20_000 as TimestampMs, 0);
520        assert_eq!(calculator.current_throughput(), (0, 20));
521
522        // By adding now a few entries with lots of transactions increase again the throughput
523        calculator.add_transactions(21_000 as TimestampMs, 1_000);
524        calculator.add_transactions(22_000 as TimestampMs, 2_000);
525        calculator.add_transactions(23_000 as TimestampMs, 3_100);
526        assert_eq!(calculator.current_throughput(), (2033, 23));
527    }
528
529    #[test]
530    #[cfg_attr(msim, ignore)]
531    pub fn test_throughput_calculator_same_timestamp_observations() {
532        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
533        let max_observation_points: NonZeroU64 = NonZeroU64::new(2).unwrap();
534
535        let calculator = ConsensusThroughputCalculator::new(Some(max_observation_points), metrics);
536
537        // adding one observation
538        calculator.add_transactions(1_000, 0);
539
540        // Adding observations with same timestamp should fall under the same bucket and won't lead
541        // to throughput update.
542        for _ in 0..10 {
543            calculator.add_transactions(2_340, 100);
544        }
545        assert_eq!(calculator.current_throughput(), (0, 0));
546
547        // Adding now one observation on a different second bucket will change throughput
548        calculator.add_transactions(5_000, 0);
549
550        assert_eq!(calculator.current_throughput(), (250, 5));
551
552        // Updating further the last bucket with more transactions it keeps updating the throughput
553        calculator.add_transactions(5_000, 400);
554        assert_eq!(calculator.current_throughput(), (350, 5));
555
556        calculator.add_transactions(5_000, 300);
557        assert_eq!(calculator.current_throughput(), (425, 5));
558    }
559
560    #[test]
561    #[cfg_attr(msim, ignore)]
562    pub fn test_consensus_throughput_profiler() {
563        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
564        let throughput_profile_update_interval: TimestampSecs = 5;
565        let max_observation_points: NonZeroU64 = NonZeroU64::new(3).unwrap();
566        let throughput_profile_cool_down_threshold: u64 = 10;
567
568        let ranges = ThroughputProfileRanges::default();
569
570        let calculator = Arc::new(ConsensusThroughputCalculator::new(
571            Some(max_observation_points),
572            metrics.clone(),
573        ));
574        let profiler = ConsensusThroughputProfiler::new(
575            calculator.clone(),
576            Some(throughput_profile_update_interval),
577            Some(throughput_profile_cool_down_threshold),
578            metrics,
579            ranges,
580        );
581
582        // When no transactions exists, the calculator will return by default "High" to err on the
583        // assumption that there is lots of load.
584        assert_eq!(profiler.throughput_level(), (High, 0));
585
586        calculator.add_transactions(1000 as TimestampMs, 1_000);
587        calculator.add_transactions(2000 as TimestampMs, 1_000);
588        calculator.add_transactions(3000 as TimestampMs, 1_000);
589
590        // We expect to have a rate of 1K tx/sec, that's < 2K limit , so throughput profile remains to "High" - nothing gets updated
591        assert_eq!(profiler.throughput_level(), (High, 0));
592
593        // We are adding more transactions to get over 2K tx/sec, so throughput profile should now be categorised
594        // as "high"
595        calculator.add_transactions(4000 as TimestampMs, 2_500);
596        calculator.add_transactions(5000 as TimestampMs, 2_800);
597        assert_eq!(profiler.throughput_level(), (High, 2100));
598
599        // Let's now add 0 transactions after at least 5 seconds. Since the update should happen every 5 seconds
600        // now the transactions are 0 we expect the throughput to be calculate as:
601        // 2800 + 2800 + 0 = 5300 / 15 - 4sec = 5600 / 11sec = 509 tx/sec
602        calculator.add_transactions(7_000 as TimestampMs, 2_800);
603        calculator.add_transactions(15_000 as TimestampMs, 0);
604
605        assert_eq!(profiler.throughput_level(), (Low, 509));
606
607        // Adding zero transactions for the next 5 seconds will make throughput zero.
608        // Profile will remain Low and throughput will get updated
609        calculator.add_transactions(17_000 as TimestampMs, 0);
610        calculator.add_transactions(19_000 as TimestampMs, 0);
611        calculator.add_transactions(20_000 as TimestampMs, 0);
612
613        assert_eq!(profiler.throughput_level(), (Low, 0));
614
615        // By adding a few entries with lots of transactions for the exact same last timestamp it will
616        // trigger a throughput profile update.
617        calculator.add_transactions(20_000 as TimestampMs, 4_000);
618        calculator.add_transactions(20_000 as TimestampMs, 4_000);
619        calculator.add_transactions(20_000 as TimestampMs, 4_000);
620        assert_eq!(profiler.throughput_level(), (High, 2400));
621
622        // no further updates will happen until the next 5sec bucket update.
623        calculator.add_transactions(22_000 as TimestampMs, 0);
624        calculator.add_transactions(23_000 as TimestampMs, 0);
625        assert_eq!(profiler.throughput_level(), (High, 2400));
626    }
627
628    #[test]
629    #[cfg_attr(msim, ignore)]
630    pub fn test_consensus_throughput_profiler_update_interval() {
631        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
632        let throughput_profile_update_interval: TimestampSecs = 5;
633        let max_observation_points: NonZeroU64 = NonZeroU64::new(2).unwrap();
634
635        let ranges = ThroughputProfileRanges::default();
636
637        let calculator = Arc::new(ConsensusThroughputCalculator::new(
638            Some(max_observation_points),
639            metrics.clone(),
640        ));
641        let profiler = ConsensusThroughputProfiler::new(
642            calculator.clone(),
643            Some(throughput_profile_update_interval),
644            None,
645            metrics,
646            ranges,
647        );
648
649        // Current setup is `throughput_profile_update_interval` = 5sec, which means that throughput profile
650        // should get updated every 5 seconds (based on the provided unix timestamp).
651
652        calculator.add_transactions(3_000 as TimestampMs, 2_200);
653        calculator.add_transactions(4_000 as TimestampMs, 4_200);
654        calculator.add_transactions(7_000 as TimestampMs, 4_200);
655
656        assert_eq!(profiler.throughput_level(), (High, 2_100));
657
658        // When adding transactions at timestamp 10s the bucket changes and the profile should get updated
659        calculator.add_transactions(10_000 as TimestampMs, 1_000);
660
661        assert_eq!(profiler.throughput_level(), (Low, 866));
662
663        // Now adding transactions at timestamp 16s the bucket changes and profile should get updated
664        calculator.add_transactions(16_000 as TimestampMs, 20_000);
665
666        assert_eq!(profiler.throughput_level(), (High, 2333));
667
668        // Keep adding transactions that fall under the same timestamp as the previous one, even though
669        // traffic should be marked as low it doesn't until the bucket of 20s is updated.
670        calculator.add_transactions(17_000 as TimestampMs, 0);
671        calculator.add_transactions(18_000 as TimestampMs, 0);
672        calculator.add_transactions(19_000 as TimestampMs, 0);
673
674        assert_eq!(profiler.throughput_level(), (High, 2333));
675
676        calculator.add_transactions(20_000 as TimestampMs, 0);
677
678        assert_eq!(profiler.throughput_level(), (Low, 0));
679    }
680
681    #[test]
682    #[cfg_attr(msim, ignore)]
683    pub fn test_consensus_throughput_profiler_cool_down() {
684        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
685        let throughput_profile_update_window: TimestampSecs = 3;
686        let max_observation_points: NonZeroU64 = NonZeroU64::new(3).unwrap();
687        let throughput_profile_cool_down_threshold: u64 = 10;
688
689        let ranges = ThroughputProfileRanges::default();
690
691        let calculator = Arc::new(ConsensusThroughputCalculator::new(
692            Some(max_observation_points),
693            metrics.clone(),
694        ));
695        let profiler = ConsensusThroughputProfiler::new(
696            calculator.clone(),
697            Some(throughput_profile_update_window),
698            Some(throughput_profile_cool_down_threshold),
699            metrics,
700            ranges,
701        );
702
703        // Adding 4 observations of 3_000 tx/sec, so in the end throughput profile should be flagged as high
704        for i in 1..=4 {
705            calculator.add_transactions(i * 1_000, 3_000);
706        }
707        assert_eq!(profiler.throughput_level(), (High, 3_000));
708
709        // Now let's add some transactions to bring throughput little bit bellow the upper Low threshold (2000 tx/sec)
710        // but still above the 10% offset which is 1800 tx/sec.
711        calculator.add_transactions(5_000, 1_900);
712        calculator.add_transactions(6_000, 1_900);
713        calculator.add_transactions(7_000, 1_900);
714
715        assert_eq!(calculator.current_throughput(), (1_900, 7));
716        assert_eq!(profiler.throughput_level(), (High, 3_000));
717
718        // Let's bring down more throughput - now the throughput profile should get updated
719        calculator.add_transactions(8_000, 1_500);
720        calculator.add_transactions(9_000, 1_500);
721        calculator.add_transactions(10_000, 1_500);
722
723        assert_eq!(profiler.throughput_level(), (Low, 1500));
724    }
725}