sui_core/authority/
execution_time_estimator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::HashMap,
6    hash::{Hash, Hasher},
7    num::NonZeroUsize,
8    sync::{Arc, Weak},
9    time::{Duration, SystemTime},
10};
11
12use serde::{Deserialize, Serialize};
13
14use super::authority_per_epoch_store::AuthorityPerEpochStore;
15use super::weighted_moving_average::WeightedMovingAverage;
16use crate::consensus_adapter::SubmitToConsensus;
17use governor::{Quota, RateLimiter, clock::MonotonicClock};
18use itertools::Itertools;
19use lru::LruCache;
20#[cfg(not(msim))]
21use mysten_common::in_antithesis;
22use mysten_common::{assert_reachable, debug_fatal, in_test_configuration};
23use mysten_metrics::{monitored_scope, spawn_monitored_task};
24use rand::{Rng, SeedableRng, random, rngs, thread_rng};
25use simple_moving_average::{SMA, SingleSumSMA};
26use sui_config::node::ExecutionTimeObserverConfig;
27use sui_protocol_config::{ExecutionTimeEstimateParams, PerObjectCongestionControlMode};
28use sui_types::{
29    base_types::ObjectID,
30    committee::Committee,
31    error::SuiErrorKind,
32    execution::{ExecutionTimeObservationKey, ExecutionTiming},
33    messages_consensus::{AuthorityIndex, ConsensusTransaction, ExecutionTimeObservation},
34    transaction::{
35        Command, ProgrammableTransaction, StoredExecutionTimeObservations, TransactionData,
36        TransactionDataAPI, TransactionKind,
37    },
38};
39use tokio::{sync::mpsc, time::Instant};
40use tracing::{debug, info, trace, warn};
41
42// TODO: Move this into ExecutionTimeObserverConfig, if we switch to a moving average
43// implmentation without the window size in the type.
44const SMA_LOCAL_OBSERVATION_WINDOW_SIZE: usize = 20;
45const OBJECT_UTILIZATION_METRIC_HASH_MODULUS: u8 = 32;
46
47/// Determines whether to inject synthetic execution time in Antithesis environments.
48///
49/// This function checks two conditions:
50/// 1. Whether the code is running in an Antithesis environment
51/// 2. Whether injection is enabled via the `ANTITHESIS_ENABLE_EXECUTION_TIME_INJECTION` env var
52///    (enabled by default)
53#[cfg(not(msim))]
54fn antithesis_enable_injecting_synthetic_execution_time() -> bool {
55    use std::sync::OnceLock;
56    static ENABLE_INJECTION: OnceLock<bool> = OnceLock::new();
57    *ENABLE_INJECTION.get_or_init(|| {
58        if !in_antithesis() {
59            return false;
60        }
61
62        std::env::var("ANTITHESIS_ENABLE_EXECUTION_TIME_INJECTION")
63            .map(|v| v.to_lowercase() == "true" || v == "1")
64            .unwrap_or(true)
65    })
66}
67
68// Collects local execution time estimates to share via consensus.
69pub struct ExecutionTimeObserver {
70    epoch_store: Weak<AuthorityPerEpochStore>,
71    consensus_adapter: Box<dyn SubmitToConsensus>,
72
73    protocol_params: ExecutionTimeEstimateParams,
74    config: ExecutionTimeObserverConfig,
75
76    local_observations: LruCache<ExecutionTimeObservationKey, LocalObservations>,
77
78    // For each object, tracks the amount of time above our utilization target that we spent
79    // executing transactions. This is used to decide which observations should be shared
80    // via consensus.
81    object_utilization_tracker: LruCache<ObjectID, ObjectUtilization>,
82
83    // Sorted list of recently indebted objects, updated by consensus handler.
84    indebted_objects: Vec<ObjectID>,
85
86    sharing_rate_limiter: RateLimiter<
87        governor::state::NotKeyed,
88        governor::state::InMemoryState,
89        governor::clock::MonotonicClock,
90        governor::middleware::NoOpMiddleware<
91            <governor::clock::MonotonicClock as governor::clock::Clock>::Instant,
92        >,
93    >,
94
95    next_generation_number: u64,
96}
97
98#[derive(Debug, Clone)]
99pub struct LocalObservations {
100    moving_average: SingleSumSMA<Duration, u32, SMA_LOCAL_OBSERVATION_WINDOW_SIZE>,
101    weighted_moving_average: WeightedMovingAverage,
102    last_shared: Option<(Duration, Instant)>,
103    config: ExecutionTimeObserverConfig,
104}
105
106impl LocalObservations {
107    fn new(config: ExecutionTimeObserverConfig, default_duration: Duration) -> Self {
108        let window_size = config.weighted_moving_average_window_size();
109        Self {
110            moving_average: SingleSumSMA::from_zero(Duration::ZERO),
111            weighted_moving_average: WeightedMovingAverage::new(
112                default_duration.as_micros() as u64,
113                window_size,
114            ),
115            last_shared: None,
116            config,
117        }
118    }
119
120    fn add_sample(&mut self, duration: Duration, gas_price: u64) {
121        self.moving_average.add_sample(duration);
122        self.weighted_moving_average
123            .add_sample(duration.as_micros() as u64, gas_price);
124    }
125
126    fn get_average(&self) -> Duration {
127        if self.config.enable_gas_price_weighting() {
128            Duration::from_micros(self.weighted_moving_average.get_weighted_average())
129        } else {
130            self.moving_average.get_average()
131        }
132    }
133
134    fn diff_exceeds_threshold(
135        &self,
136        new_average: Duration,
137        threshold: f64,
138        min_interval: Duration,
139    ) -> bool {
140        let Some((last_shared, last_shared_timestamp)) = self.last_shared else {
141            // Diff threshold exceeded by default if we haven't shared anything yet.
142            return true;
143        };
144
145        if last_shared_timestamp.elapsed() < min_interval {
146            return false;
147        }
148
149        if threshold >= 0.0 {
150            // Positive threshold requires upward change.
151            new_average
152                .checked_sub(last_shared)
153                .is_some_and(|diff| diff > last_shared.mul_f64(threshold))
154        } else {
155            // Negative threshold requires downward change.
156            last_shared
157                .checked_sub(new_average)
158                .is_some_and(|diff| diff > last_shared.mul_f64(-threshold))
159        }
160    }
161}
162
163#[derive(Debug, Clone)]
164pub struct ObjectUtilization {
165    excess_execution_time: Duration,
166    last_measured: Option<Instant>,
167    was_overutilized: bool, // true if the object has ever had excess_execution_time
168}
169
170impl ObjectUtilization {
171    pub fn overutilized(&self, config: &ExecutionTimeObserverConfig) -> bool {
172        self.excess_execution_time > config.observation_sharing_object_utilization_threshold()
173    }
174}
175
176// Tracks local execution time observations and shares them via consensus.
177impl ExecutionTimeObserver {
178    pub fn spawn(
179        epoch_store: Arc<AuthorityPerEpochStore>,
180        consensus_adapter: Box<dyn SubmitToConsensus>,
181        config: ExecutionTimeObserverConfig,
182    ) {
183        let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) = epoch_store
184            .protocol_config()
185            .per_object_congestion_control_mode()
186        else {
187            info!(
188                "ExecutionTimeObserver disabled because per-object congestion control mode is not ExecutionTimeEstimate"
189            );
190            return;
191        };
192
193        let (tx_local_execution_time, mut rx_local_execution_time) =
194            mpsc::channel(config.observation_channel_capacity().into());
195        let (tx_object_debts, mut rx_object_debts) =
196            mpsc::channel(config.object_debt_channel_capacity().into());
197        epoch_store.set_local_execution_time_channels(tx_local_execution_time, tx_object_debts);
198
199        // TODO: pre-populate local observations with stored data from prior epoch.
200        let mut observer = Self {
201            epoch_store: Arc::downgrade(&epoch_store),
202            consensus_adapter,
203            local_observations: LruCache::new(config.observation_cache_size()),
204            object_utilization_tracker: LruCache::new(config.object_utilization_cache_size()),
205            indebted_objects: Vec::new(),
206            sharing_rate_limiter: RateLimiter::direct_with_clock(
207                Quota::per_second(config.observation_sharing_rate_limit())
208                    .allow_burst(config.observation_sharing_burst_limit()),
209                &MonotonicClock,
210            ),
211            protocol_params,
212            config,
213            next_generation_number: SystemTime::now()
214                .duration_since(std::time::UNIX_EPOCH)
215                .expect("Sui did not exist prior to 1970")
216                .as_micros()
217                .try_into()
218                .expect("This build of sui is not supported in the year 500,000"),
219        };
220        spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
221            loop {
222                tokio::select! {
223                    // TODO: add metrics for messages received.
224                    Some(object_debts) = rx_object_debts.recv() => {
225                        observer.update_indebted_objects(object_debts);
226                    }
227                    Some((tx, timings, total_duration, gas_price)) = rx_local_execution_time.recv() => {
228                        observer
229                            .record_local_observations(&tx, &timings, total_duration, gas_price);
230                    }
231                    else => { break }
232                }
233            }
234            info!("shutting down ExecutionTimeObserver");
235        }));
236    }
237
238    #[cfg(test)]
239    fn new_for_testing(
240        epoch_store: Arc<AuthorityPerEpochStore>,
241        consensus_adapter: Box<dyn SubmitToConsensus>,
242        observation_sharing_object_utilization_threshold: Duration,
243        enable_gas_price_weighting: bool,
244    ) -> Self {
245        let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) = epoch_store
246            .protocol_config()
247            .per_object_congestion_control_mode()
248        else {
249            panic!(
250                "tried to construct test ExecutionTimeObserver when congestion control mode is not ExecutionTimeEstimate"
251            );
252        };
253        Self {
254            epoch_store: Arc::downgrade(&epoch_store),
255            consensus_adapter,
256            protocol_params,
257            config: ExecutionTimeObserverConfig {
258                observation_sharing_object_utilization_threshold: Some(
259                    observation_sharing_object_utilization_threshold,
260                ),
261                enable_gas_price_weighting: Some(enable_gas_price_weighting),
262                ..ExecutionTimeObserverConfig::default()
263            },
264            local_observations: LruCache::new(NonZeroUsize::new(10000).unwrap()),
265            object_utilization_tracker: LruCache::new(NonZeroUsize::new(50000).unwrap()),
266            indebted_objects: Vec::new(),
267            sharing_rate_limiter: RateLimiter::direct_with_clock(
268                Quota::per_hour(std::num::NonZeroU32::MAX),
269                &MonotonicClock,
270            ),
271            next_generation_number: SystemTime::now()
272                .duration_since(std::time::UNIX_EPOCH)
273                .expect("Sui did not exist prior to 1970")
274                .as_micros()
275                .try_into()
276                .expect("This build of sui is not supported in the year 500,000"),
277        }
278    }
279
280    // Used by execution to report observed per-entry-point execution times to the estimator.
281    // Updates moving averages and submits observation to consensus if local observation differs
282    // from consensus median.
283    // TODO: Consider more detailed heuristic to account for overhead outside of commands.
284    fn record_local_observations(
285        &mut self,
286        tx: &ProgrammableTransaction,
287        timings: &[ExecutionTiming],
288        total_duration: Duration,
289        gas_price: u64,
290    ) {
291        let _scope = monitored_scope("ExecutionTimeObserver::record_local_observations");
292
293        // Simulate timing in test contexts to trigger congestion control.
294        #[cfg(msim)]
295        let should_inject = self.config.inject_synthetic_execution_time();
296        #[cfg(not(msim))]
297        let should_inject = antithesis_enable_injecting_synthetic_execution_time();
298
299        if should_inject {
300            let (generated_timings, generated_duration) = self.generate_test_timings(tx, timings);
301            self.record_local_observations_timing(
302                tx,
303                &generated_timings,
304                generated_duration,
305                gas_price,
306            )
307        } else {
308            self.record_local_observations_timing(tx, timings, total_duration, gas_price)
309        }
310    }
311
312    fn record_local_observations_timing(
313        &mut self,
314        tx: &ProgrammableTransaction,
315        timings: &[ExecutionTiming],
316        total_duration: Duration,
317        gas_price: u64,
318    ) {
319        assert!(tx.commands.len() >= timings.len());
320
321        let Some(epoch_store) = self.epoch_store.upgrade() else {
322            debug!("epoch is ending, dropping execution time observation");
323            return;
324        };
325
326        let mut uses_indebted_object = false;
327
328        // Update the accumulated excess execution time for shared object
329        // used for exclusive access in this transaction, and determine the max overage.
330        let max_excess_per_object_execution_time = tx
331            .shared_input_objects()
332            .filter_map(|obj| obj.is_accessed_exclusively().then_some(obj.id))
333            .map(|id| {
334                // Mark if any object used in the tx is indebted.
335                if !uses_indebted_object && self.indebted_objects.binary_search(&id).is_ok() {
336                    uses_indebted_object = true;
337                }
338
339                // For each object:
340                // - add the execution time of the current transaction to the tracker
341                // - subtract the maximum amount of time available for execution according
342                //   to our utilization target since the last report was received
343                //   (clamping to zero)
344                //
345                // What remains is the amount of excess time spent executing transactions on
346                // the object above the intended limit. If this value is greater than zero,
347                // it means the object is overutilized.
348                let now = Instant::now();
349                let utilization =
350                    self.object_utilization_tracker
351                        .get_or_insert_mut(id, || ObjectUtilization {
352                            excess_execution_time: Duration::ZERO,
353                            last_measured: None,
354                            was_overutilized: false,
355                        });
356                let overutilized_at_start = utilization.overutilized(&self.config);
357                utilization.excess_execution_time += total_duration;
358                utilization.excess_execution_time =
359                    utilization.excess_execution_time.saturating_sub(
360                        utilization
361                            .last_measured
362                            .map(|last_measured| {
363                                now.duration_since(last_measured)
364                                    .mul_f64(self.protocol_params.target_utilization as f64 / 100.0)
365                            })
366                            .unwrap_or(Duration::MAX),
367                    );
368                utilization.last_measured = Some(now);
369                if utilization.overutilized(&self.config) {
370                    utilization.was_overutilized = true;
371                }
372
373                // Update overutilized objects metrics.
374                if !overutilized_at_start && utilization.overutilized(&self.config) {
375                    trace!("object {id:?} is overutilized");
376                    epoch_store
377                        .metrics
378                        .epoch_execution_time_observer_overutilized_objects
379                        .inc();
380                } else if overutilized_at_start && !utilization.overutilized(&self.config) {
381                    epoch_store
382                        .metrics
383                        .epoch_execution_time_observer_overutilized_objects
384                        .dec();
385                }
386                if utilization.was_overutilized {
387                    let key = if self.config.report_object_utilization_metric_with_full_id() {
388                        id.to_string()
389                    } else {
390                        let key_lsb = id.into_bytes()[ObjectID::LENGTH - 1];
391                        let hash = key_lsb % OBJECT_UTILIZATION_METRIC_HASH_MODULUS;
392                        format!("{:x}", hash)
393                    };
394
395                    epoch_store
396                        .metrics
397                        .epoch_execution_time_observer_object_utilization
398                        .with_label_values(&[key.as_str()])
399                        .inc_by(total_duration.as_secs_f64());
400                }
401
402                utilization.excess_execution_time
403            })
404            .max()
405            .unwrap_or(Duration::ZERO);
406        epoch_store
407            .metrics
408            .epoch_execution_time_observer_utilization_cache_size
409            .set(self.object_utilization_tracker.len() as i64);
410
411        let total_command_duration: Duration = timings.iter().map(|t| t.duration()).sum();
412        let extra_overhead = total_duration - total_command_duration;
413
414        let mut to_share = Vec::with_capacity(tx.commands.len());
415        for (i, timing) in timings.iter().enumerate() {
416            let command = &tx.commands[i];
417
418            // Special-case handling for Publish command: only use hard-coded default estimate.
419            if matches!(command, Command::Publish(_, _)) {
420                continue;
421            }
422
423            // TODO: Consider using failure/success information in computing estimates.
424            let mut command_duration = timing.duration();
425
426            // Distribute overhead proportionally to each command's measured duration.
427            let overhead_factor = if total_command_duration > Duration::ZERO {
428                command_duration.as_secs_f64() / total_command_duration.as_secs_f64()
429            } else {
430                // divisor here must be >0 or this loop would not be running at all
431                1.0 / (tx.commands.len() as f64)
432            };
433            command_duration += extra_overhead.mul_f64(overhead_factor);
434
435            // For native commands, adjust duration by length of command's inputs/outputs.
436            // This is sort of arbitrary, but hopefully works okay as a heuristic.
437            command_duration = command_duration.div_f64(command_length(command).get() as f64);
438
439            // Update gas-weighted moving-average observation for the command.
440            let key = ExecutionTimeObservationKey::from_command(command);
441            let local_observation = self.local_observations.get_or_insert_mut(key.clone(), || {
442                LocalObservations::new(self.config.clone(), Duration::ZERO)
443            });
444            local_observation.add_sample(command_duration, gas_price);
445
446            // Send a new observation through consensus if:
447            // - our current moving average differs too much from the last one we shared, and
448            // - the tx has at least one mutable shared object with utilization that's too high
449            // TODO: Consider only sharing observations that disagree with consensus estimate.
450            let new_average = local_observation.get_average();
451            let mut should_share = false;
452
453            // Share upward adjustments if an object is overutilized.
454            if max_excess_per_object_execution_time
455                >= self
456                    .config
457                    .observation_sharing_object_utilization_threshold()
458                && local_observation.diff_exceeds_threshold(
459                    new_average,
460                    self.config.observation_sharing_diff_threshold(),
461                    self.config.observation_sharing_min_interval(),
462                )
463            {
464                should_share = true;
465                epoch_store
466                    .metrics
467                    .epoch_execution_time_observations_sharing_reason
468                    .with_label_values(&["utilization"])
469                    .inc();
470            };
471
472            // Share downward adjustments if an object is indebted.
473            if uses_indebted_object
474                && local_observation.diff_exceeds_threshold(
475                    new_average,
476                    -self.config.observation_sharing_diff_threshold(),
477                    self.config.observation_sharing_min_interval(),
478                )
479            {
480                should_share = true;
481                epoch_store
482                    .metrics
483                    .epoch_execution_time_observations_sharing_reason
484                    .with_label_values(&["indebted"])
485                    .inc();
486            }
487
488            if should_share {
489                debug!("sharing new execution time observation for {key:?}: {new_average:?}");
490                to_share.push((key, new_average));
491                local_observation.last_shared = Some((new_average, Instant::now()));
492            }
493        }
494
495        // Share new observations.
496        self.share_observations(to_share);
497    }
498
499    fn generate_test_timings(
500        &self,
501        tx: &ProgrammableTransaction,
502        timings: &[ExecutionTiming],
503    ) -> (Vec<ExecutionTiming>, Duration) {
504        let generated_timings: Vec<_> = tx
505            .commands
506            .iter()
507            .zip(timings.iter())
508            .map(|(command, timing)| {
509                let key = ExecutionTimeObservationKey::from_command(command);
510                let duration = self.get_test_duration(&key);
511                if timing.is_abort() {
512                    ExecutionTiming::Abort(duration)
513                } else {
514                    ExecutionTiming::Success(duration)
515                }
516            })
517            .collect();
518
519        let total_duration = generated_timings
520            .iter()
521            .map(|t| t.duration())
522            .sum::<Duration>()
523            + thread_rng().gen_range(Duration::from_millis(10)..Duration::from_millis(50));
524
525        (generated_timings, total_duration)
526    }
527
528    fn get_test_duration(&self, key: &ExecutionTimeObservationKey) -> Duration {
529        #[cfg(msim)]
530        let should_inject = self.config.inject_synthetic_execution_time();
531        #[cfg(not(msim))]
532        let should_inject = false;
533
534        if !in_test_configuration() && !should_inject {
535            panic!("get_test_duration called in non-test configuration");
536        }
537
538        thread_local! {
539            static PER_TEST_SEED: u64 = random::<u64>();
540        }
541
542        let mut hasher = std::collections::hash_map::DefaultHasher::new();
543
544        let checkpoint_digest_used = self
545            .epoch_store
546            .upgrade()
547            .and_then(|store| {
548                store
549                    .get_lowest_non_genesis_checkpoint_summary()
550                    .ok()
551                    .flatten()
552            })
553            .map(|summary| summary.content_digest.hash(&mut hasher))
554            .is_some();
555
556        if !checkpoint_digest_used {
557            PER_TEST_SEED.with(|seed| seed.hash(&mut hasher));
558        }
559
560        key.hash(&mut hasher);
561        let mut rng = rngs::StdRng::seed_from_u64(hasher.finish());
562        rng.gen_range(Duration::from_millis(100)..Duration::from_millis(600))
563    }
564
565    fn share_observations(&mut self, to_share: Vec<(ExecutionTimeObservationKey, Duration)>) {
566        if to_share.is_empty() {
567            return;
568        }
569        let Some(epoch_store) = self.epoch_store.upgrade() else {
570            debug!("epoch is ending, dropping execution time observation");
571            return;
572        };
573
574        let num_observations = to_share.len() as u64;
575
576        // Enforce global observation-sharing rate limit.
577        if let Err(e) = self.sharing_rate_limiter.check() {
578            epoch_store
579                .metrics
580                .epoch_execution_time_observations_dropped
581                .with_label_values(&["global_rate_limit"])
582                .inc_by(num_observations);
583            debug!("rate limit exceeded, dropping execution time observation; {e:?}");
584            return;
585        }
586
587        let epoch_store = epoch_store.clone();
588        let transaction = ConsensusTransaction::new_execution_time_observation(
589            ExecutionTimeObservation::new(epoch_store.name, self.next_generation_number, to_share),
590        );
591        self.next_generation_number += 1;
592
593        if let Err(e) = self.consensus_adapter.submit_best_effort(
594            &transaction,
595            &epoch_store,
596            Duration::from_secs(5),
597        ) {
598            if !matches!(e.as_inner(), SuiErrorKind::EpochEnded(_)) {
599                epoch_store
600                    .metrics
601                    .epoch_execution_time_observations_dropped
602                    .with_label_values(&["submit_to_consensus"])
603                    .inc_by(num_observations);
604                warn!("failed to submit execution time observation: {e:?}");
605            }
606        } else {
607            // Note: it is not actually guaranteed that the observation has been submitted at this point,
608            // but that is also not true with ConsensusAdapter::submit_to_consensus. The only way to know
609            // for sure is to observe that the message is processed by consensus handler.
610            assert_reachable!("successfully shares execution time observations");
611            epoch_store
612                .metrics
613                .epoch_execution_time_observations_shared
614                .inc_by(num_observations);
615        }
616    }
617
618    fn update_indebted_objects(&mut self, mut object_debts: Vec<ObjectID>) {
619        let _scope = monitored_scope("ExecutionTimeObserver::update_indebted_objects");
620
621        let Some(epoch_store) = self.epoch_store.upgrade() else {
622            debug!("epoch is ending, dropping indebted object update");
623            return;
624        };
625
626        object_debts.sort_unstable();
627        object_debts.dedup();
628        self.indebted_objects = object_debts;
629        epoch_store
630            .metrics
631            .epoch_execution_time_observer_indebted_objects
632            .set(self.indebted_objects.len() as i64);
633    }
634}
635
636// Key used to save StoredExecutionTimeObservations in the Sui system state object's
637// `extra_fields` Bag.
638pub const EXTRA_FIELD_EXECUTION_TIME_ESTIMATES_KEY: u64 = 0;
639
640// Key used to save the chunk count for chunked execution time observations
641pub const EXTRA_FIELD_EXECUTION_TIME_ESTIMATES_CHUNK_COUNT_KEY: u64 = 1;
642
643// Tracks global execution time observations provided by validators from consensus
644// and computes deterministic per-command estimates for use in congestion control.
645pub struct ExecutionTimeEstimator {
646    committee: Arc<Committee>,
647    protocol_params: ExecutionTimeEstimateParams,
648
649    consensus_observations: HashMap<ExecutionTimeObservationKey, ConsensusObservations>,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize)]
653pub struct ConsensusObservations {
654    observations: Vec<(u64 /* generation */, Option<Duration>)>, // keyed by authority index
655    stake_weighted_median: Option<Duration>,                     // cached value
656}
657
658impl ConsensusObservations {
659    fn update_stake_weighted_median(
660        &mut self,
661        committee: &Committee,
662        config: &ExecutionTimeEstimateParams,
663    ) {
664        let mut stake_with_observations = 0;
665        let sorted_observations: Vec<_> = self
666            .observations
667            .iter()
668            .enumerate()
669            .filter_map(|(i, (_, duration))| {
670                duration.map(|duration| {
671                    let authority_index: AuthorityIndex = i.try_into().unwrap();
672                    stake_with_observations += committee.stake_by_index(authority_index).unwrap();
673                    (duration, authority_index)
674                })
675            })
676            .sorted()
677            .collect();
678
679        // Don't use observations until we have received enough.
680        if stake_with_observations < config.stake_weighted_median_threshold {
681            self.stake_weighted_median = None;
682            return;
683        }
684
685        // Compute stake-weighted median.
686        let median_stake = stake_with_observations / 2;
687        let mut running_stake = 0;
688        for (duration, authority_index) in sorted_observations {
689            running_stake += committee.stake_by_index(authority_index).unwrap();
690            if running_stake > median_stake {
691                self.stake_weighted_median = Some(duration);
692                break;
693            }
694        }
695    }
696}
697
698impl ExecutionTimeEstimator {
699    pub fn new(
700        committee: Arc<Committee>,
701        protocol_params: ExecutionTimeEstimateParams,
702        initial_observations: impl Iterator<
703            Item = (
704                AuthorityIndex,
705                Option<u64>,
706                ExecutionTimeObservationKey,
707                Duration,
708            ),
709        >,
710    ) -> Self {
711        let mut estimator = Self {
712            committee,
713            protocol_params,
714            consensus_observations: HashMap::new(),
715        };
716        for (source, generation, key, duration) in initial_observations {
717            estimator.process_observation_from_consensus(
718                source,
719                generation,
720                key.to_owned(),
721                duration,
722                true,
723            );
724        }
725        for observation in estimator.consensus_observations.values_mut() {
726            observation
727                .update_stake_weighted_median(&estimator.committee, &estimator.protocol_params);
728        }
729        estimator
730    }
731
732    #[cfg(test)]
733    pub fn new_for_testing() -> Self {
734        let (committee, _) = Committee::new_simple_test_committee_of_size(1);
735        Self {
736            committee: Arc::new(committee),
737            protocol_params: ExecutionTimeEstimateParams {
738                target_utilization: 100,
739                max_estimate_us: u64::MAX,
740                ..ExecutionTimeEstimateParams::default()
741            },
742            consensus_observations: HashMap::new(),
743        }
744    }
745
746    pub fn process_observations_from_consensus(
747        &mut self,
748        source: AuthorityIndex,
749        generation: Option<u64>,
750        observations: &[(ExecutionTimeObservationKey, Duration)],
751    ) {
752        for (key, duration) in observations {
753            self.process_observation_from_consensus(
754                source,
755                generation,
756                key.to_owned(),
757                *duration,
758                false,
759            );
760        }
761    }
762
763    fn process_observation_from_consensus(
764        &mut self,
765        source: AuthorityIndex,
766        generation: Option<u64>,
767        observation_key: ExecutionTimeObservationKey,
768        duration: Duration,
769        skip_update: bool,
770    ) {
771        if matches!(observation_key, ExecutionTimeObservationKey::Publish) {
772            // Special-case handling for Publish command: only use hard-coded default estimate.
773            warn!(
774                "dropping Publish observation received from possibly-Byzanitine authority {source}"
775            );
776            return;
777        }
778
779        assert_reachable!("receives some valid execution time observations");
780
781        let observations = self
782            .consensus_observations
783            .entry(observation_key)
784            .or_insert_with(|| {
785                let len = self.committee.num_members();
786                let mut empty_observations = Vec::with_capacity(len);
787                empty_observations.resize(len, (0, None));
788                ConsensusObservations {
789                    observations: empty_observations,
790                    stake_weighted_median: if self
791                        .protocol_params
792                        .default_none_duration_for_new_keys
793                    {
794                        None
795                    } else {
796                        Some(Duration::ZERO)
797                    },
798                }
799            });
800
801        let (obs_generation, obs_duration) =
802            &mut observations.observations[TryInto::<usize>::try_into(source).unwrap()];
803        if generation.is_some_and(|generation| *obs_generation >= generation) {
804            // Ignore outdated observation.
805            return;
806        }
807        *obs_generation = generation.unwrap_or(0);
808        *obs_duration = Some(duration);
809        if !skip_update {
810            observations.update_stake_weighted_median(&self.committee, &self.protocol_params);
811        }
812    }
813
814    pub fn get_estimate(&self, tx: &TransactionData) -> Duration {
815        let TransactionKind::ProgrammableTransaction(tx) = tx.kind() else {
816            debug_fatal!("get_estimate called on non-ProgrammableTransaction");
817            return Duration::ZERO;
818        };
819        tx.commands
820            .iter()
821            .map(|command| {
822                let key = ExecutionTimeObservationKey::from_command(command);
823                self.consensus_observations
824                    .get(&key)
825                    .and_then(|obs| obs.stake_weighted_median)
826                    .unwrap_or_else(|| key.default_duration())
827                    // For native commands, adjust duration by length of command's inputs/outputs.
828                    // This is sort of arbitrary, but hopefully works okay as a heuristic.
829                    .mul_f64(command_length(command).get() as f64)
830            })
831            .sum::<Duration>()
832            .min(Duration::from_micros(self.protocol_params.max_estimate_us))
833    }
834
835    pub fn take_observations(&mut self) -> StoredExecutionTimeObservations {
836        StoredExecutionTimeObservations::V1(
837            self.consensus_observations
838                .drain()
839                .map(|(key, observations)| {
840                    let observations = observations
841                        .observations
842                        .into_iter()
843                        .enumerate()
844                        .filter_map(|(idx, (_, duration))| {
845                            duration.map(|d| {
846                                (
847                                    self.committee
848                                        .authority_by_index(idx.try_into().unwrap())
849                                        .cloned()
850                                        .unwrap(),
851                                    d,
852                                )
853                            })
854                        })
855                        .collect();
856                    (key, observations)
857                })
858                .collect(),
859        )
860    }
861
862    pub fn get_observations(&self) -> Vec<(ExecutionTimeObservationKey, ConsensusObservations)> {
863        self.consensus_observations
864            .iter()
865            .map(|(key, observations)| (key.clone(), observations.clone()))
866            .collect()
867    }
868}
869
870fn command_length(command: &Command) -> NonZeroUsize {
871    // Commands with variable-length inputs/outputs are reported as +1
872    // to account for fixed overhead and prevent divide-by-zero.
873    NonZeroUsize::new(match command {
874        Command::MoveCall(_) => 1,
875        Command::TransferObjects(src, _) => src.len() + 1,
876        Command::SplitCoins(_, amts) => amts.len() + 1,
877        Command::MergeCoins(_, src) => src.len() + 1,
878        Command::Publish(_, _) => 1,
879        Command::MakeMoveVec(_, src) => src.len() + 1,
880        Command::Upgrade(_, _, _, _) => 1,
881    })
882    .unwrap()
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888    use crate::authority::test_authority_builder::TestAuthorityBuilder;
889    use crate::checkpoints::CheckpointStore;
890    use crate::consensus_adapter::{
891        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
892        MockConsensusClient,
893    };
894    use sui_protocol_config::ProtocolConfig;
895    use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress};
896    use sui_types::transaction::{
897        Argument, CallArg, ObjectArg, ProgrammableMoveCall, SharedObjectMutability,
898    };
899    use {
900        rand::{Rng, SeedableRng},
901        sui_protocol_config::ProtocolVersion,
902        sui_types::supported_protocol_versions::Chain,
903    };
904
905    #[tokio::test]
906    async fn test_record_local_observations() {
907        telemetry_subscribers::init_for_testing();
908
909        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
910            config.set_per_object_congestion_control_mode_for_testing(
911                PerObjectCongestionControlMode::ExecutionTimeEstimate(
912                    ExecutionTimeEstimateParams {
913                        target_utilization: 100,
914                        allowed_txn_cost_overage_burst_limit_us: 0,
915                        randomness_scalar: 100,
916                        max_estimate_us: u64::MAX,
917                        stored_observations_num_included_checkpoints: 10,
918                        stored_observations_limit: u64::MAX,
919                        stake_weighted_median_threshold: 0,
920                        default_none_duration_for_new_keys: true,
921                        observations_chunk_size: None,
922                    },
923                ),
924            );
925            config
926        });
927
928        let mock_consensus_client = MockConsensusClient::new();
929        let authority = TestAuthorityBuilder::new().build().await;
930        let epoch_store = authority.epoch_store_for_testing();
931        let consensus_adapter = Arc::new(ConsensusAdapter::new(
932            Arc::new(mock_consensus_client),
933            CheckpointStore::new_for_tests(),
934            authority.name,
935            Arc::new(ConnectionMonitorStatusForTests {}),
936            100_000,
937            100_000,
938            None,
939            None,
940            ConsensusAdapterMetrics::new_test(),
941            epoch_store.protocol_config().clone(),
942        ));
943        let mut observer = ExecutionTimeObserver::new_for_testing(
944            epoch_store.clone(),
945            Box::new(consensus_adapter.clone()),
946            Duration::ZERO, // disable object utilization thresholds for this test
947            false,          // disable gas price weighting for this test
948        );
949
950        // Create a simple PTB with one move call
951        let package = ObjectID::random();
952        let module = "test_module".to_string();
953        let function = "test_function".to_string();
954        let ptb = ProgrammableTransaction {
955            inputs: vec![],
956            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
957                package,
958                module: module.clone(),
959                function: function.clone(),
960                type_arguments: vec![],
961                arguments: vec![],
962            }))],
963        };
964
965        // Record an observation
966        let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
967        let total_duration = Duration::from_millis(110);
968        observer.record_local_observations(&ptb, &timings, total_duration, 1);
969
970        let key = ExecutionTimeObservationKey::MoveEntryPoint {
971            package,
972            module: module.clone(),
973            function: function.clone(),
974            type_arguments: vec![],
975        };
976
977        // Check that local observation was recorded and shared
978        let local_obs = observer.local_observations.get(&key).unwrap();
979        assert_eq!(
980            local_obs.get_average(),
981            // 10ms overhead should be entirely apportioned to the one command in the PTB
982            Duration::from_millis(110)
983        );
984        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
985
986        // Record another observation
987        let timings = vec![ExecutionTiming::Success(Duration::from_millis(110))];
988        let total_duration = Duration::from_millis(120);
989        observer.record_local_observations(&ptb, &timings, total_duration, 1);
990
991        // Check that moving average was updated
992        let local_obs = observer.local_observations.get(&key).unwrap();
993        assert_eq!(
994            local_obs.get_average(),
995            // average of 110ms and 120ms observations
996            Duration::from_millis(115)
997        );
998        // new 115ms average should not be shared; it's <5% different from 110ms
999        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1000
1001        // Record another observation
1002        let timings = vec![ExecutionTiming::Success(Duration::from_millis(120))];
1003        let total_duration = Duration::from_millis(130);
1004        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1005
1006        // Check that moving average was updated
1007        let local_obs = observer.local_observations.get(&key).unwrap();
1008        assert_eq!(
1009            local_obs.get_average(),
1010            // average of [110ms, 120ms, 130ms]
1011            Duration::from_millis(120)
1012        );
1013        // new 120ms average should not be shared; it's >5% different from 110ms,
1014        // but not enough time has passed
1015        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1016
1017        // Manually update last-shared time to long ago
1018        observer
1019            .local_observations
1020            .get_mut(&key)
1021            .unwrap()
1022            .last_shared = Some((
1023            Duration::from_millis(110),
1024            Instant::now() - Duration::from_secs(60),
1025        ));
1026
1027        // Record last observation
1028        let timings = vec![ExecutionTiming::Success(Duration::from_millis(120))];
1029        let total_duration = Duration::from_millis(160);
1030        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1031
1032        // Verify that moving average is the same and a new observation was shared, as
1033        // enough time has now elapsed
1034        let local_obs = observer.local_observations.get(&key).unwrap();
1035        assert_eq!(
1036            local_obs.get_average(),
1037            // average of [110ms, 120ms, 130ms, 160ms]
1038            Duration::from_millis(130)
1039        );
1040        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(130));
1041    }
1042
1043    #[tokio::test]
1044    async fn test_record_local_observations_with_gas_price_weighting() {
1045        telemetry_subscribers::init_for_testing();
1046
1047        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1048            config.set_per_object_congestion_control_mode_for_testing(
1049                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1050                    ExecutionTimeEstimateParams {
1051                        target_utilization: 100,
1052                        allowed_txn_cost_overage_burst_limit_us: 0,
1053                        randomness_scalar: 100,
1054                        max_estimate_us: u64::MAX,
1055                        stored_observations_num_included_checkpoints: 10,
1056                        stored_observations_limit: u64::MAX,
1057                        stake_weighted_median_threshold: 0,
1058                        default_none_duration_for_new_keys: true,
1059                        observations_chunk_size: None,
1060                    },
1061                ),
1062            );
1063            config
1064        });
1065
1066        let mock_consensus_client = MockConsensusClient::new();
1067        let authority = TestAuthorityBuilder::new().build().await;
1068        let epoch_store = authority.epoch_store_for_testing();
1069        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1070            Arc::new(mock_consensus_client),
1071            CheckpointStore::new_for_tests(),
1072            authority.name,
1073            Arc::new(ConnectionMonitorStatusForTests {}),
1074            100_000,
1075            100_000,
1076            None,
1077            None,
1078            ConsensusAdapterMetrics::new_test(),
1079            epoch_store.protocol_config().clone(),
1080        ));
1081        let mut observer = ExecutionTimeObserver::new_for_testing(
1082            epoch_store.clone(),
1083            Box::new(consensus_adapter.clone()),
1084            Duration::ZERO, // disable object utilization thresholds for this test
1085            true,           // enable gas price weighting for this test
1086        );
1087
1088        // Create a simple PTB with one move call
1089        let package = ObjectID::random();
1090        let module = "test_module".to_string();
1091        let function = "test_function".to_string();
1092        let ptb = ProgrammableTransaction {
1093            inputs: vec![],
1094            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1095                package,
1096                module: module.clone(),
1097                function: function.clone(),
1098                type_arguments: vec![],
1099                arguments: vec![],
1100            }))],
1101        };
1102
1103        // Record an observation
1104        let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
1105        let total_duration = Duration::from_millis(110);
1106        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1107
1108        let key = ExecutionTimeObservationKey::MoveEntryPoint {
1109            package,
1110            module: module.clone(),
1111            function: function.clone(),
1112            type_arguments: vec![],
1113        };
1114
1115        // Check that local observation was recorded and shared
1116        let local_obs = observer.local_observations.get(&key).unwrap();
1117        assert_eq!(
1118            local_obs.get_average(),
1119            // 10ms overhead should be entirely apportioned to the one command in the PTB
1120            Duration::from_millis(110)
1121        );
1122        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1123
1124        // Record another observation
1125        let timings = vec![ExecutionTiming::Success(Duration::from_millis(110))];
1126        let total_duration = Duration::from_millis(120);
1127        observer.record_local_observations(&ptb, &timings, total_duration, 2);
1128
1129        // Check that weighted moving average was updated
1130        let local_obs = observer.local_observations.get(&key).unwrap();
1131        assert_eq!(
1132            local_obs.get_average(),
1133            // Our local observation averages are weighted by gas price:
1134            // 110ms * 1 + 110ms * 2 / (1 + 2) = 116.666ms
1135            Duration::from_micros(116_666)
1136        );
1137    }
1138
1139    #[tokio::test]
1140    async fn test_record_local_observations_with_multiple_commands() {
1141        telemetry_subscribers::init_for_testing();
1142
1143        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1144            config.set_per_object_congestion_control_mode_for_testing(
1145                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1146                    ExecutionTimeEstimateParams {
1147                        target_utilization: 100,
1148                        allowed_txn_cost_overage_burst_limit_us: 0,
1149                        randomness_scalar: 0,
1150                        max_estimate_us: u64::MAX,
1151                        stored_observations_num_included_checkpoints: 10,
1152                        stored_observations_limit: u64::MAX,
1153                        stake_weighted_median_threshold: 0,
1154                        default_none_duration_for_new_keys: true,
1155                        observations_chunk_size: None,
1156                    },
1157                ),
1158            );
1159            config
1160        });
1161
1162        let mock_consensus_client = MockConsensusClient::new();
1163        let authority = TestAuthorityBuilder::new().build().await;
1164        let epoch_store = authority.epoch_store_for_testing();
1165        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1166            Arc::new(mock_consensus_client),
1167            CheckpointStore::new_for_tests(),
1168            authority.name,
1169            Arc::new(ConnectionMonitorStatusForTests {}),
1170            100_000,
1171            100_000,
1172            None,
1173            None,
1174            ConsensusAdapterMetrics::new_test(),
1175            epoch_store.protocol_config().clone(),
1176        ));
1177        let mut observer = ExecutionTimeObserver::new_for_testing(
1178            epoch_store.clone(),
1179            Box::new(consensus_adapter.clone()),
1180            Duration::ZERO, // disable object utilization thresholds for this test
1181            false,          // disable gas price weighting for this test
1182        );
1183
1184        // Create a PTB with multiple commands.
1185        let package = ObjectID::random();
1186        let module = "test_module".to_string();
1187        let function = "test_function".to_string();
1188        let ptb = ProgrammableTransaction {
1189            inputs: vec![],
1190            commands: vec![
1191                Command::MoveCall(Box::new(ProgrammableMoveCall {
1192                    package,
1193                    module: module.clone(),
1194                    function: function.clone(),
1195                    type_arguments: vec![],
1196                    arguments: vec![],
1197                })),
1198                Command::TransferObjects(
1199                    // Inputs don't exist above, but doesn't matter for this test.
1200                    vec![Argument::Input(1), Argument::Input(2)],
1201                    Argument::Input(0),
1202                ),
1203            ],
1204        };
1205        let timings = vec![
1206            ExecutionTiming::Success(Duration::from_millis(100)),
1207            ExecutionTiming::Success(Duration::from_millis(50)),
1208        ];
1209        let total_duration = Duration::from_millis(180);
1210        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1211
1212        // Check that both commands were recorded
1213        let move_key = ExecutionTimeObservationKey::MoveEntryPoint {
1214            package,
1215            module: module.clone(),
1216            function: function.clone(),
1217            type_arguments: vec![],
1218        };
1219        let move_obs = observer.local_observations.get(&move_key).unwrap();
1220        assert_eq!(
1221            move_obs.get_average(),
1222            // 100/150 == 2/3 of 30ms overhead distributed to Move command
1223            Duration::from_millis(120)
1224        );
1225
1226        let transfer_obs = observer
1227            .local_observations
1228            .get(&ExecutionTimeObservationKey::TransferObjects)
1229            .unwrap();
1230        assert_eq!(
1231            transfer_obs.get_average(),
1232            // 50ms time before adjustments
1233            // 50/150 == 1/3 of 30ms overhead distributed to object xfer
1234            // 60ms adjusetd time / 3 command length == 20ms
1235            Duration::from_millis(20)
1236        );
1237    }
1238
1239    #[tokio::test]
1240    async fn test_record_local_observations_with_object_utilization_threshold() {
1241        telemetry_subscribers::init_for_testing();
1242
1243        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1244            config.set_per_object_congestion_control_mode_for_testing(
1245                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1246                    ExecutionTimeEstimateParams {
1247                        target_utilization: 100,
1248                        allowed_txn_cost_overage_burst_limit_us: 0,
1249                        randomness_scalar: 0,
1250                        max_estimate_us: u64::MAX,
1251                        stored_observations_num_included_checkpoints: 10,
1252                        stored_observations_limit: u64::MAX,
1253                        stake_weighted_median_threshold: 0,
1254                        default_none_duration_for_new_keys: true,
1255                        observations_chunk_size: None,
1256                    },
1257                ),
1258            );
1259            config
1260        });
1261
1262        let mock_consensus_client = MockConsensusClient::new();
1263        let authority = TestAuthorityBuilder::new().build().await;
1264        let epoch_store = authority.epoch_store_for_testing();
1265        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1266            Arc::new(mock_consensus_client),
1267            CheckpointStore::new_for_tests(),
1268            authority.name,
1269            Arc::new(ConnectionMonitorStatusForTests {}),
1270            100_000,
1271            100_000,
1272            None,
1273            None,
1274            ConsensusAdapterMetrics::new_test(),
1275            epoch_store.protocol_config().clone(),
1276        ));
1277        let mut observer = ExecutionTimeObserver::new_for_testing(
1278            epoch_store.clone(),
1279            Box::new(consensus_adapter.clone()),
1280            Duration::from_millis(500), // only share observations with excess utilization >= 500ms
1281            false,                      // disable gas price weighting for this test
1282        );
1283
1284        // Create a simple PTB with one move call and one mutable shared input
1285        let package = ObjectID::random();
1286        let module = "test_module".to_string();
1287        let function = "test_function".to_string();
1288        let shared_object_id = ObjectID::random();
1289        let ptb = ProgrammableTransaction {
1290            inputs: vec![CallArg::Object(ObjectArg::SharedObject {
1291                id: shared_object_id,
1292                initial_shared_version: SequenceNumber::new(),
1293                mutability: SharedObjectMutability::Mutable,
1294            })],
1295            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1296                package,
1297                module: module.clone(),
1298                function: function.clone(),
1299                type_arguments: vec![],
1300                arguments: vec![],
1301            }))],
1302        };
1303        let key = ExecutionTimeObservationKey::MoveEntryPoint {
1304            package,
1305            module: module.clone(),
1306            function: function.clone(),
1307            type_arguments: vec![],
1308        };
1309
1310        tokio::time::pause();
1311
1312        // First observation - should not share due to low utilization
1313        let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1314        observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1315        assert!(
1316            observer
1317                .local_observations
1318                .get(&key)
1319                .unwrap()
1320                .last_shared
1321                .is_none()
1322        );
1323
1324        // Second observation - no time has passed, so now utilization is high; should share upward change
1325        let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1326        observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1327        assert_eq!(
1328            observer
1329                .local_observations
1330                .get(&key)
1331                .unwrap()
1332                .last_shared
1333                .unwrap()
1334                .0,
1335            Duration::from_secs(2)
1336        );
1337
1338        // Third execution with significant upward diff and high utilization - should share again
1339        tokio::time::advance(Duration::from_secs(5)).await;
1340        let timings = vec![ExecutionTiming::Success(Duration::from_secs(3))];
1341        observer.record_local_observations(&ptb, &timings, Duration::from_secs(5), 1);
1342        assert_eq!(
1343            observer
1344                .local_observations
1345                .get(&key)
1346                .unwrap()
1347                .last_shared
1348                .unwrap()
1349                .0,
1350            Duration::from_secs(3)
1351        );
1352
1353        // Fourth execution with significant downward diff but still overutilized - should NOT share downward change
1354        // (downward changes are only shared for indebted objects, not overutilized ones)
1355        tokio::time::advance(Duration::from_millis(150)).await;
1356        let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
1357        observer.record_local_observations(&ptb, &timings, Duration::from_millis(500), 1);
1358        assert_eq!(
1359            observer
1360                .local_observations
1361                .get(&key)
1362                .unwrap()
1363                .last_shared
1364                .unwrap()
1365                .0,
1366            Duration::from_secs(3) // still the old value, no sharing of downward change
1367        );
1368
1369        // Fifth execution after utilization drops - should not share upward diff since not overutilized
1370        tokio::time::advance(Duration::from_secs(60)).await;
1371        let timings = vec![ExecutionTiming::Success(Duration::from_secs(11))];
1372        observer.record_local_observations(&ptb, &timings, Duration::from_secs(11), 1);
1373        assert_eq!(
1374            observer
1375                .local_observations
1376                .get(&key)
1377                .unwrap()
1378                .last_shared
1379                .unwrap()
1380                .0,
1381            Duration::from_secs(3) // still the old value, no sharing when not overutilized
1382        );
1383    }
1384
1385    #[tokio::test]
1386    async fn test_record_local_observations_with_indebted_objects() {
1387        telemetry_subscribers::init_for_testing();
1388
1389        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1390            config.set_per_object_congestion_control_mode_for_testing(
1391                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1392                    ExecutionTimeEstimateParams {
1393                        target_utilization: 100,
1394                        allowed_txn_cost_overage_burst_limit_us: 0,
1395                        randomness_scalar: 0,
1396                        max_estimate_us: u64::MAX,
1397                        stored_observations_num_included_checkpoints: 10,
1398                        stored_observations_limit: u64::MAX,
1399                        stake_weighted_median_threshold: 0,
1400                        default_none_duration_for_new_keys: true,
1401                        observations_chunk_size: None,
1402                    },
1403                ),
1404            );
1405            config
1406        });
1407
1408        let mock_consensus_client = MockConsensusClient::new();
1409        let authority = TestAuthorityBuilder::new().build().await;
1410        let epoch_store = authority.epoch_store_for_testing();
1411        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1412            Arc::new(mock_consensus_client),
1413            CheckpointStore::new_for_tests(),
1414            authority.name,
1415            Arc::new(ConnectionMonitorStatusForTests {}),
1416            100_000,
1417            100_000,
1418            None,
1419            None,
1420            ConsensusAdapterMetrics::new_test(),
1421            epoch_store.protocol_config().clone(),
1422        ));
1423        let mut observer = ExecutionTimeObserver::new_for_testing(
1424            epoch_store.clone(),
1425            Box::new(consensus_adapter.clone()),
1426            Duration::from_millis(500), // Low utilization threshold to enable overutilized sharing initially
1427            false,                      // disable gas price weighting for this test
1428        );
1429
1430        // Create a simple PTB with one move call and one mutable shared input
1431        let package = ObjectID::random();
1432        let module = "test_module".to_string();
1433        let function = "test_function".to_string();
1434        let shared_object_id = ObjectID::random();
1435        let ptb = ProgrammableTransaction {
1436            inputs: vec![CallArg::Object(ObjectArg::SharedObject {
1437                id: shared_object_id,
1438                initial_shared_version: SequenceNumber::new(),
1439                mutability: SharedObjectMutability::Mutable,
1440            })],
1441            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1442                package,
1443                module: module.clone(),
1444                function: function.clone(),
1445                type_arguments: vec![],
1446                arguments: vec![],
1447            }))],
1448        };
1449        let key = ExecutionTimeObservationKey::MoveEntryPoint {
1450            package,
1451            module: module.clone(),
1452            function: function.clone(),
1453            type_arguments: vec![],
1454        };
1455
1456        tokio::time::pause();
1457
1458        // First observation - should not share due to low utilization
1459        let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1460        observer.record_local_observations(&ptb, &timings, Duration::from_secs(1), 1);
1461        assert!(
1462            observer
1463                .local_observations
1464                .get(&key)
1465                .unwrap()
1466                .last_shared
1467                .is_none()
1468        );
1469
1470        // Second observation - no time has passed, so now utilization is high; should share upward change
1471        let timings = vec![ExecutionTiming::Success(Duration::from_secs(2))];
1472        observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1473        assert_eq!(
1474            observer
1475                .local_observations
1476                .get(&key)
1477                .unwrap()
1478                .last_shared
1479                .unwrap()
1480                .0,
1481            Duration::from_millis(1500) // (1s + 2s) / 2 = 1.5s
1482        );
1483
1484        // Mark the shared object as indebted and increase utilization threshold to prevent overutilized sharing
1485        observer.update_indebted_objects(vec![shared_object_id]);
1486        observer
1487            .config
1488            .observation_sharing_object_utilization_threshold = Some(Duration::from_secs(1000));
1489
1490        // Wait for min interval and record a significant downward change
1491        // This should share because the object is indebted
1492        tokio::time::advance(Duration::from_secs(60)).await;
1493        let timings = vec![ExecutionTiming::Success(Duration::from_millis(300))];
1494        observer.record_local_observations(&ptb, &timings, Duration::from_millis(300), 1);
1495
1496        // Moving average should be (1s + 2s + 0.3s) / 3 = 1.1s
1497        // This downward change should have been shared for indebted object
1498        assert_eq!(
1499            observer
1500                .local_observations
1501                .get(&key)
1502                .unwrap()
1503                .last_shared
1504                .unwrap()
1505                .0,
1506            Duration::from_millis(1100)
1507        );
1508    }
1509
1510    #[tokio::test]
1511    // TODO-DNS add tests for min stake amt
1512    async fn test_stake_weighted_median() {
1513        telemetry_subscribers::init_for_testing();
1514
1515        let (committee, _) =
1516            Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1517
1518        let params = ExecutionTimeEstimateParams {
1519            stake_weighted_median_threshold: 0,
1520            ..Default::default()
1521        };
1522
1523        let mut tracker = ConsensusObservations {
1524            observations: vec![
1525                (0, Some(Duration::from_secs(1))), // 10% stake
1526                (0, Some(Duration::from_secs(2))), // 20% stake
1527                (0, Some(Duration::from_secs(3))), // 30% stake
1528                (0, Some(Duration::from_secs(4))), // 40% stake
1529            ],
1530            stake_weighted_median: None,
1531        };
1532        tracker.update_stake_weighted_median(&committee, &params);
1533        // With stake weights [10,20,30,40]:
1534        // - Duration 1 covers 10% of stake
1535        // - Duration 2 covers 30% of stake (10+20)
1536        // - Duration 3 covers 60% of stake (10+20+30)
1537        // - Duration 4 covers 100% of stake
1538        // Median should be 3 since that's where we cross 50% of stake
1539        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1540
1541        // Test duration sorting
1542        let mut tracker = ConsensusObservations {
1543            observations: vec![
1544                (0, Some(Duration::from_secs(3))), // 10% stake
1545                (0, Some(Duration::from_secs(4))), // 20% stake
1546                (0, Some(Duration::from_secs(1))), // 30% stake
1547                (0, Some(Duration::from_secs(2))), // 40% stake
1548            ],
1549            stake_weighted_median: None,
1550        };
1551        tracker.update_stake_weighted_median(&committee, &params);
1552        // With sorted stake weights [30,40,10,20]:
1553        // - Duration 1 covers 30% of stake
1554        // - Duration 2 covers 70% of stake (30+40)
1555        // - Duration 3 covers 80% of stake (30+40+10)
1556        // - Duration 4 covers 100% of stake
1557        // Median should be 2 since that's where we cross 50% of stake
1558        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(2)));
1559
1560        // Test with one missing observation
1561        let mut tracker = ConsensusObservations {
1562            observations: vec![
1563                (0, Some(Duration::from_secs(1))), // 10% stake
1564                (0, None),                         // 20% stake (missing)
1565                (0, Some(Duration::from_secs(3))), // 30% stake
1566                (0, Some(Duration::from_secs(4))), // 40% stake
1567            ],
1568            stake_weighted_median: None,
1569        };
1570        tracker.update_stake_weighted_median(&committee, &params);
1571        // With missing observation for 20% stake:
1572        // - Duration 1 covers 10% of stake
1573        // - Duration 3 covers 40% of stake (10+30)
1574        // - Duration 4 covers 80% of stake (10+30+40)
1575        // Median should be 4 since that's where we pass half of available stake (80% / 2 == 40%)
1576        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(4)));
1577
1578        // Test with multiple missing observations
1579        let mut tracker = ConsensusObservations {
1580            observations: vec![
1581                (0, Some(Duration::from_secs(1))), // 10% stake
1582                (0, Some(Duration::from_secs(2))), // 20% stake
1583                (0, None),                         // 30% stake (missing)
1584                (0, None),                         // 40% stake (missing)
1585            ],
1586            stake_weighted_median: None,
1587        };
1588        tracker.update_stake_weighted_median(&committee, &params);
1589        // With missing observations:
1590        // - Duration 1 covers 10% of stake
1591        // - Duration 2 covers 30% of stake (10+20)
1592        // Median should be 2 since that's where we cross half of available stake (40% / 2 == 20%)
1593        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(2)));
1594
1595        // Test with one observation
1596        let mut tracker = ConsensusObservations {
1597            observations: vec![
1598                (0, None),                         // 10% stake
1599                (0, None),                         // 20% stake
1600                (0, Some(Duration::from_secs(3))), // 30% stake
1601                (0, None),                         // 40% stake
1602            ],
1603            stake_weighted_median: None,
1604        };
1605        tracker.update_stake_weighted_median(&committee, &params);
1606        // With only one observation, median should be that observation
1607        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1608
1609        // Test with all same durations
1610        let mut tracker = ConsensusObservations {
1611            observations: vec![
1612                (0, Some(Duration::from_secs(5))), // 10% stake
1613                (0, Some(Duration::from_secs(5))), // 20% stake
1614                (0, Some(Duration::from_secs(5))), // 30% stake
1615                (0, Some(Duration::from_secs(5))), // 40% stake
1616            ],
1617            stake_weighted_median: None,
1618        };
1619        tracker.update_stake_weighted_median(&committee, &params);
1620        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(5)));
1621    }
1622
1623    #[tokio::test]
1624    async fn test_stake_weighted_median_threshold() {
1625        telemetry_subscribers::init_for_testing();
1626
1627        let (committee, _) =
1628            Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1629
1630        // Test with threshold requiring at least 50% stake
1631        let params = ExecutionTimeEstimateParams {
1632            stake_weighted_median_threshold: 5000,
1633            ..Default::default()
1634        };
1635
1636        // Test with insufficient stake (only 30% have observations)
1637        let mut tracker = ConsensusObservations {
1638            observations: vec![
1639                (0, Some(Duration::from_secs(1))), // 10% stake
1640                (0, Some(Duration::from_secs(2))), // 20% stake
1641                (0, None),                         // 30% stake (missing)
1642                (0, None),                         // 40% stake (missing)
1643            ],
1644            stake_weighted_median: None,
1645        };
1646        tracker.update_stake_weighted_median(&committee, &params);
1647        // Should not compute median since only 30% stake has observations (< 50% threshold)
1648        assert_eq!(tracker.stake_weighted_median, None);
1649
1650        // Test with sufficient stake (60% have observations)
1651        let mut tracker = ConsensusObservations {
1652            observations: vec![
1653                (0, Some(Duration::from_secs(1))), // 10% stake
1654                (0, Some(Duration::from_secs(2))), // 20% stake
1655                (0, Some(Duration::from_secs(3))), // 30% stake
1656                (0, None),                         // 40% stake (missing)
1657            ],
1658            stake_weighted_median: None,
1659        };
1660        tracker.update_stake_weighted_median(&committee, &params);
1661        // Should compute median since 60% stake has observations (>= 50% threshold)
1662        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1663    }
1664
1665    #[tokio::test]
1666    async fn test_execution_time_estimator() {
1667        telemetry_subscribers::init_for_testing();
1668
1669        let (committee, _) =
1670            Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1671        let mut estimator = ExecutionTimeEstimator::new(
1672            Arc::new(committee),
1673            ExecutionTimeEstimateParams {
1674                target_utilization: 50,
1675                max_estimate_us: 1_500_000,
1676
1677                // Not used in this test.
1678                allowed_txn_cost_overage_burst_limit_us: 0,
1679                randomness_scalar: 0,
1680                stored_observations_num_included_checkpoints: 10,
1681                stored_observations_limit: u64::MAX,
1682                stake_weighted_median_threshold: 0,
1683                default_none_duration_for_new_keys: true,
1684                observations_chunk_size: None,
1685            },
1686            std::iter::empty(),
1687        );
1688        // Create test keys
1689        let package = ObjectID::random();
1690        let module = "test_module".to_string();
1691        let function = "test_function".to_string();
1692        let move_key = ExecutionTimeObservationKey::MoveEntryPoint {
1693            package,
1694            module: module.clone(),
1695            function: function.clone(),
1696            type_arguments: vec![],
1697        };
1698        let transfer_key = ExecutionTimeObservationKey::TransferObjects;
1699
1700        // Record observations from different validators
1701        // First record some old observations that should be ignored
1702        estimator.process_observation_from_consensus(
1703            0,
1704            Some(1),
1705            move_key.clone(),
1706            Duration::from_millis(1000),
1707            false,
1708        );
1709        estimator.process_observation_from_consensus(
1710            1,
1711            Some(1),
1712            move_key.clone(),
1713            Duration::from_millis(1000),
1714            false,
1715        );
1716        estimator.process_observation_from_consensus(
1717            2,
1718            Some(1),
1719            move_key.clone(),
1720            Duration::from_millis(1000),
1721            false,
1722        );
1723
1724        estimator.process_observation_from_consensus(
1725            0,
1726            Some(1),
1727            transfer_key.clone(),
1728            Duration::from_millis(500),
1729            false,
1730        );
1731        estimator.process_observation_from_consensus(
1732            1,
1733            Some(1),
1734            transfer_key.clone(),
1735            Duration::from_millis(500),
1736            false,
1737        );
1738        estimator.process_observation_from_consensus(
1739            2,
1740            Some(1),
1741            transfer_key.clone(),
1742            Duration::from_millis(500),
1743            false,
1744        );
1745
1746        // Now record newer observations that should be used
1747        estimator.process_observation_from_consensus(
1748            0,
1749            Some(2),
1750            move_key.clone(),
1751            Duration::from_millis(100),
1752            false,
1753        );
1754        estimator.process_observation_from_consensus(
1755            1,
1756            Some(2),
1757            move_key.clone(),
1758            Duration::from_millis(200),
1759            false,
1760        );
1761        estimator.process_observation_from_consensus(
1762            2,
1763            Some(2),
1764            move_key.clone(),
1765            Duration::from_millis(300),
1766            false,
1767        );
1768
1769        estimator.process_observation_from_consensus(
1770            0,
1771            Some(2),
1772            transfer_key.clone(),
1773            Duration::from_millis(50),
1774            false,
1775        );
1776        estimator.process_observation_from_consensus(
1777            1,
1778            Some(2),
1779            transfer_key.clone(),
1780            Duration::from_millis(60),
1781            false,
1782        );
1783        estimator.process_observation_from_consensus(
1784            2,
1785            Some(2),
1786            transfer_key.clone(),
1787            Duration::from_millis(70),
1788            false,
1789        );
1790
1791        // Try to record old observations again - these should be ignored
1792        estimator.process_observation_from_consensus(
1793            0,
1794            Some(1),
1795            move_key.clone(),
1796            Duration::from_millis(1000),
1797            false,
1798        );
1799        estimator.process_observation_from_consensus(
1800            1,
1801            Some(1),
1802            transfer_key.clone(),
1803            Duration::from_millis(500),
1804            false,
1805        );
1806        estimator.process_observation_from_consensus(
1807            2,
1808            Some(1),
1809            move_key.clone(),
1810            Duration::from_millis(1000),
1811            false,
1812        );
1813
1814        // Test single command transaction
1815        let single_move_tx = TransactionData::new_programmable(
1816            SuiAddress::ZERO,
1817            vec![],
1818            ProgrammableTransaction {
1819                inputs: vec![],
1820                commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1821                    package,
1822                    module: module.clone(),
1823                    function: function.clone(),
1824                    type_arguments: vec![],
1825                    arguments: vec![],
1826                }))],
1827            },
1828            100,
1829            100,
1830        );
1831
1832        // Should return median of move call observations (300ms)
1833        assert_eq!(
1834            estimator.get_estimate(&single_move_tx),
1835            Duration::from_millis(300)
1836        );
1837
1838        // Test multi-command transaction
1839        let multi_command_tx = TransactionData::new_programmable(
1840            SuiAddress::ZERO,
1841            vec![],
1842            ProgrammableTransaction {
1843                inputs: vec![],
1844                commands: vec![
1845                    Command::MoveCall(Box::new(ProgrammableMoveCall {
1846                        package,
1847                        module: module.clone(),
1848                        function: function.clone(),
1849                        type_arguments: vec![],
1850                        arguments: vec![],
1851                    })),
1852                    Command::TransferObjects(
1853                        vec![Argument::Input(1), Argument::Input(2)],
1854                        Argument::Input(0),
1855                    ),
1856                ],
1857            },
1858            100,
1859            100,
1860        );
1861
1862        // Should return sum of median move call (300ms)
1863        // plus the median transfer (70ms) * command length (3)
1864        assert_eq!(
1865            estimator.get_estimate(&multi_command_tx),
1866            Duration::from_millis(510)
1867        );
1868    }
1869
1870    #[derive(Debug, Clone, Serialize, Deserialize)]
1871    struct ExecutionTimeObserverSnapshot {
1872        protocol_version: u64,
1873        consensus_observations: Vec<(ExecutionTimeObservationKey, ConsensusObservations)>,
1874        transaction_estimates: Vec<(String, Duration)>, // (transaction_description, estimated_duration)
1875    }
1876
1877    fn generate_test_inputs(
1878        seed: u64,
1879        num_validators: usize,
1880        generation_override: Option<u64>,
1881    ) -> Vec<(
1882        AuthorityIndex,
1883        Option<u64>,
1884        ExecutionTimeObservationKey,
1885        Duration,
1886    )> {
1887        let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
1888
1889        let observation_keys = vec![
1890            ExecutionTimeObservationKey::MoveEntryPoint {
1891                package: ObjectID::from_hex_literal("0x1").unwrap(),
1892                module: "coin".to_string(),
1893                function: "transfer".to_string(),
1894                type_arguments: vec![],
1895            },
1896            ExecutionTimeObservationKey::MoveEntryPoint {
1897                package: ObjectID::from_hex_literal("0x2").unwrap(),
1898                module: "nft".to_string(),
1899                function: "mint".to_string(),
1900                type_arguments: vec![],
1901            },
1902            ExecutionTimeObservationKey::TransferObjects,
1903            ExecutionTimeObservationKey::SplitCoins,
1904            ExecutionTimeObservationKey::MergeCoins,
1905            ExecutionTimeObservationKey::MakeMoveVec,
1906            ExecutionTimeObservationKey::Upgrade,
1907        ];
1908
1909        let mut inputs = Vec::new();
1910        let target_samples = 25;
1911
1912        for _ in 0..target_samples {
1913            let key = observation_keys[rng.gen_range(0..observation_keys.len())].clone();
1914            let authority_index =
1915                AuthorityIndex::try_from(rng.gen_range(0..num_validators)).unwrap();
1916
1917            // Use realistic range where newer generations might replace older ones
1918            let generation = generation_override.unwrap_or_else(|| rng.gen_range(1..=10));
1919
1920            // Generate duration based on key type with realistic variance
1921            // Sometimes generate zero values to test corner cases with byzantine validators
1922            let base_duration = if rng.gen_ratio(1, 20) {
1923                // 5% chance of zero duration to test corner cases
1924                0
1925            } else {
1926                match &key {
1927                    ExecutionTimeObservationKey::MoveEntryPoint { .. } => rng.gen_range(50..=500),
1928                    ExecutionTimeObservationKey::TransferObjects => rng.gen_range(10..=100),
1929                    ExecutionTimeObservationKey::SplitCoins => rng.gen_range(20..=80),
1930                    ExecutionTimeObservationKey::MergeCoins => rng.gen_range(15..=70),
1931                    ExecutionTimeObservationKey::MakeMoveVec => rng.gen_range(5..=30),
1932                    ExecutionTimeObservationKey::Upgrade => rng.gen_range(100..=1000),
1933                    ExecutionTimeObservationKey::Publish => rng.gen_range(200..=2000),
1934                }
1935            };
1936
1937            let duration = Duration::from_millis(base_duration);
1938
1939            inputs.push((authority_index, Some(generation), key, duration));
1940        }
1941
1942        inputs
1943    }
1944
1945    fn generate_test_transactions(seed: u64) -> Vec<(String, TransactionData)> {
1946        let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
1947        let mut transactions = Vec::new();
1948
1949        let package3 = ObjectID::from_hex_literal("0x3").unwrap();
1950        transactions.push((
1951            "coin_transfer_call".to_string(),
1952            TransactionData::new_programmable(
1953                SuiAddress::ZERO,
1954                vec![],
1955                ProgrammableTransaction {
1956                    inputs: vec![],
1957                    commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1958                        package: ObjectID::from_hex_literal("0x1").unwrap(),
1959                        module: "coin".to_string(),
1960                        function: "transfer".to_string(),
1961                        type_arguments: vec![],
1962                        arguments: vec![],
1963                    }))],
1964                },
1965                rng.gen_range(100..1000),
1966                rng.gen_range(100..1000),
1967            ),
1968        ));
1969
1970        transactions.push((
1971            "mixed_move_calls".to_string(),
1972            TransactionData::new_programmable(
1973                SuiAddress::ZERO,
1974                vec![],
1975                ProgrammableTransaction {
1976                    inputs: vec![],
1977                    commands: vec![
1978                        Command::MoveCall(Box::new(ProgrammableMoveCall {
1979                            package: ObjectID::from_hex_literal("0x1").unwrap(),
1980                            module: "coin".to_string(),
1981                            function: "transfer".to_string(),
1982                            type_arguments: vec![],
1983                            arguments: vec![],
1984                        })),
1985                        Command::MoveCall(Box::new(ProgrammableMoveCall {
1986                            package: ObjectID::from_hex_literal("0x2").unwrap(),
1987                            module: "nft".to_string(),
1988                            function: "mint".to_string(),
1989                            type_arguments: vec![],
1990                            arguments: vec![],
1991                        })),
1992                    ],
1993                },
1994                rng.gen_range(100..1000),
1995                rng.gen_range(100..1000),
1996            ),
1997        ));
1998
1999        transactions.push((
2000            "native_commands_with_observations".to_string(),
2001            TransactionData::new_programmable(
2002                SuiAddress::ZERO,
2003                vec![],
2004                ProgrammableTransaction {
2005                    inputs: vec![],
2006                    commands: vec![
2007                        Command::TransferObjects(vec![Argument::Input(0)], Argument::Input(1)),
2008                        Command::SplitCoins(Argument::Input(2), vec![Argument::Input(3)]),
2009                        Command::MergeCoins(Argument::Input(4), vec![Argument::Input(5)]),
2010                        Command::MakeMoveVec(None, vec![Argument::Input(6)]),
2011                    ],
2012                },
2013                rng.gen_range(100..1000),
2014                rng.gen_range(100..1000),
2015            ),
2016        ));
2017
2018        let num_objects = rng.gen_range(1..=5);
2019        transactions.push((
2020            format!("transfer_objects_{}_items", num_objects),
2021            TransactionData::new_programmable(
2022                SuiAddress::ZERO,
2023                vec![],
2024                ProgrammableTransaction {
2025                    inputs: vec![],
2026                    commands: vec![Command::TransferObjects(
2027                        (0..num_objects).map(Argument::Input).collect(),
2028                        Argument::Input(num_objects),
2029                    )],
2030                },
2031                rng.gen_range(100..1000),
2032                rng.gen_range(100..1000),
2033            ),
2034        ));
2035
2036        let num_amounts = rng.gen_range(1..=4);
2037        transactions.push((
2038            format!("split_coins_{}_amounts", num_amounts),
2039            TransactionData::new_programmable(
2040                SuiAddress::ZERO,
2041                vec![],
2042                ProgrammableTransaction {
2043                    inputs: vec![],
2044                    commands: vec![Command::SplitCoins(
2045                        Argument::Input(0),
2046                        (1..=num_amounts).map(Argument::Input).collect(),
2047                    )],
2048                },
2049                rng.gen_range(100..1000),
2050                rng.gen_range(100..1000),
2051            ),
2052        ));
2053
2054        let num_sources = rng.gen_range(1..=3);
2055        transactions.push((
2056            format!("merge_coins_{}_sources", num_sources),
2057            TransactionData::new_programmable(
2058                SuiAddress::ZERO,
2059                vec![],
2060                ProgrammableTransaction {
2061                    inputs: vec![],
2062                    commands: vec![Command::MergeCoins(
2063                        Argument::Input(0),
2064                        (1..=num_sources).map(Argument::Input).collect(),
2065                    )],
2066                },
2067                rng.gen_range(100..1000),
2068                rng.gen_range(100..1000),
2069            ),
2070        ));
2071
2072        let num_elements = rng.gen_range(0..=6);
2073        transactions.push((
2074            format!("make_move_vec_{}_elements", num_elements),
2075            TransactionData::new_programmable(
2076                SuiAddress::ZERO,
2077                vec![],
2078                ProgrammableTransaction {
2079                    inputs: vec![],
2080                    commands: vec![Command::MakeMoveVec(
2081                        None,
2082                        (0..num_elements).map(Argument::Input).collect(),
2083                    )],
2084                },
2085                rng.gen_range(100..1000),
2086                rng.gen_range(100..1000),
2087            ),
2088        ));
2089
2090        transactions.push((
2091            "mixed_commands".to_string(),
2092            TransactionData::new_programmable(
2093                SuiAddress::ZERO,
2094                vec![],
2095                ProgrammableTransaction {
2096                    inputs: vec![],
2097                    commands: vec![
2098                        Command::MoveCall(Box::new(ProgrammableMoveCall {
2099                            package: package3,
2100                            module: "game".to_string(),
2101                            function: "play".to_string(),
2102                            type_arguments: vec![],
2103                            arguments: vec![],
2104                        })),
2105                        Command::TransferObjects(
2106                            vec![Argument::Input(1), Argument::Input(2)],
2107                            Argument::Input(0),
2108                        ),
2109                        Command::SplitCoins(Argument::Input(3), vec![Argument::Input(4)]),
2110                    ],
2111                },
2112                rng.gen_range(100..1000),
2113                rng.gen_range(100..1000),
2114            ),
2115        ));
2116
2117        transactions.push((
2118            "upgrade_package".to_string(),
2119            TransactionData::new_programmable(
2120                SuiAddress::ZERO,
2121                vec![],
2122                ProgrammableTransaction {
2123                    inputs: vec![],
2124                    commands: vec![Command::Upgrade(
2125                        vec![],
2126                        vec![],
2127                        package3,
2128                        Argument::Input(0),
2129                    )],
2130                },
2131                rng.gen_range(100..1000),
2132                rng.gen_range(100..1000),
2133            ),
2134        ));
2135
2136        transactions
2137    }
2138
2139    // Safeguard against forking because of changes to the execution time estimator.
2140    //
2141    // Within an epoch, each estimator must reach the same conclusion about the observations and
2142    // stake_weighted_median from the observations shared by other validators, as this is used
2143    // for transaction ordering.
2144    //
2145    // Therefore; any change in the calculation of the observations or stake_weighted_median
2146    // not accompanied by a protocol version change may fork.
2147    //
2148    // This test uses snapshots of computed stake weighted median at particular protocol versions
2149    // to attempt to discover regressions that might fork.
2150    #[test]
2151    fn snapshot_tests() {
2152        println!("\n============================================================================");
2153        println!("!                                                                          !");
2154        println!("! IMPORTANT: never update snapshots from this test. only add new versions! !");
2155        println!("!                                                                          !");
2156        println!("============================================================================\n");
2157
2158        let max_version = ProtocolVersion::MAX.as_u64();
2159
2160        let test_versions: Vec<u64> = (max_version.saturating_sub(9)..=max_version).collect();
2161
2162        for version in test_versions {
2163            let protocol_version = ProtocolVersion::new(version);
2164            let protocol_config = ProtocolConfig::get_for_version(protocol_version, Chain::Unknown);
2165            let (committee, _) = Committee::new_simple_test_committee_of_size(4);
2166            let committee = Arc::new(committee);
2167
2168            let initial_generation =
2169                if let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) =
2170                    protocol_config.per_object_congestion_control_mode()
2171                {
2172                    if params.default_none_duration_for_new_keys {
2173                        None
2174                    } else {
2175                        Some(0)
2176                    }
2177                } else {
2178                    Some(0) // fallback for versions without execution time estimate mode
2179                };
2180
2181            let initial_observations =
2182                generate_test_inputs(0, committee.num_members(), initial_generation);
2183            let mut estimator = ExecutionTimeEstimator::new(
2184                committee.clone(),
2185                ExecutionTimeEstimateParams {
2186                    max_estimate_us: u64::MAX, // Allow unlimited estimates for testing
2187                    ..ExecutionTimeEstimateParams::default()
2188                },
2189                initial_observations.into_iter(),
2190            );
2191
2192            let test_inputs = generate_test_inputs(version, committee.num_members(), None);
2193
2194            for (source, generation, observation_key, duration) in test_inputs {
2195                estimator.process_observation_from_consensus(
2196                    source,
2197                    generation,
2198                    observation_key,
2199                    duration,
2200                    false,
2201                );
2202            }
2203
2204            let mut final_observations = estimator.get_observations();
2205            final_observations.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
2206
2207            let test_transactions = generate_test_transactions(version);
2208            let mut transaction_estimates = Vec::new();
2209            for (description, tx_data) in test_transactions {
2210                let estimate = estimator.get_estimate(&tx_data);
2211                transaction_estimates.push((description, estimate));
2212            }
2213
2214            let snapshot_data = ExecutionTimeObserverSnapshot {
2215                protocol_version: version,
2216                consensus_observations: final_observations.clone(),
2217                transaction_estimates,
2218            };
2219            insta::assert_yaml_snapshot!(
2220                format!("execution_time_observer_v{}", version),
2221                snapshot_data
2222            );
2223        }
2224    }
2225}