sui_core/authority/
execution_time_estimator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::HashMap,
6    hash::{Hash, Hasher},
7    num::NonZeroUsize,
8    sync::{Arc, Weak},
9    time::{Duration, SystemTime},
10};
11
12use serde::{Deserialize, Serialize};
13
14use super::authority_per_epoch_store::AuthorityPerEpochStore;
15use super::weighted_moving_average::WeightedMovingAverage;
16use crate::consensus_adapter::SubmitToConsensus;
17use governor::{Quota, RateLimiter, clock::MonotonicClock};
18use itertools::Itertools;
19use lru::LruCache;
20#[cfg(not(msim))]
21use mysten_common::in_antithesis;
22use mysten_common::{assert_reachable, debug_fatal, in_test_configuration};
23use mysten_metrics::{monitored_scope, spawn_monitored_task};
24use rand::{Rng, SeedableRng, random, rngs, thread_rng};
25use simple_moving_average::{SMA, SingleSumSMA};
26use sui_config::node::ExecutionTimeObserverConfig;
27use sui_protocol_config::{ExecutionTimeEstimateParams, PerObjectCongestionControlMode};
28use sui_types::{
29    base_types::ObjectID,
30    committee::Committee,
31    error::SuiErrorKind,
32    execution::{ExecutionTimeObservationKey, ExecutionTiming},
33    messages_consensus::{AuthorityIndex, ConsensusTransaction, ExecutionTimeObservation},
34    transaction::{
35        Command, ProgrammableTransaction, StoredExecutionTimeObservations, TransactionData,
36        TransactionDataAPI, TransactionKind,
37    },
38};
39use tokio::{sync::mpsc, time::Instant};
40use tracing::{debug, info, trace, warn};
41
42// TODO: Move this into ExecutionTimeObserverConfig, if we switch to a moving average
43// implmentation without the window size in the type.
44const SMA_LOCAL_OBSERVATION_WINDOW_SIZE: usize = 20;
45const OBJECT_UTILIZATION_METRIC_HASH_MODULUS: u8 = 32;
46
47/// Determines whether to inject synthetic execution time in Antithesis environments.
48///
49/// This function checks two conditions:
50/// 1. Whether the code is running in an Antithesis environment
51/// 2. Whether injection is enabled via the `ANTITHESIS_ENABLE_EXECUTION_TIME_INJECTION` env var
52///    (enabled by default)
53#[cfg(not(msim))]
54fn antithesis_enable_injecting_synthetic_execution_time() -> bool {
55    use std::sync::OnceLock;
56    static ENABLE_INJECTION: OnceLock<bool> = OnceLock::new();
57    *ENABLE_INJECTION.get_or_init(|| {
58        if !in_antithesis() {
59            return false;
60        }
61
62        std::env::var("ANTITHESIS_ENABLE_EXECUTION_TIME_INJECTION")
63            .map(|v| v.to_lowercase() == "true" || v == "1")
64            .unwrap_or(true)
65    })
66}
67
68// Collects local execution time estimates to share via consensus.
69pub struct ExecutionTimeObserver {
70    epoch_store: Weak<AuthorityPerEpochStore>,
71    consensus_adapter: Box<dyn SubmitToConsensus>,
72
73    protocol_params: ExecutionTimeEstimateParams,
74    config: ExecutionTimeObserverConfig,
75
76    local_observations: LruCache<ExecutionTimeObservationKey, LocalObservations>,
77
78    // For each object, tracks the amount of time above our utilization target that we spent
79    // executing transactions. This is used to decide which observations should be shared
80    // via consensus.
81    object_utilization_tracker: LruCache<ObjectID, ObjectUtilization>,
82
83    // Sorted list of recently indebted objects, updated by consensus handler.
84    indebted_objects: Vec<ObjectID>,
85
86    sharing_rate_limiter: RateLimiter<
87        governor::state::NotKeyed,
88        governor::state::InMemoryState,
89        governor::clock::MonotonicClock,
90        governor::middleware::NoOpMiddleware<
91            <governor::clock::MonotonicClock as governor::clock::Clock>::Instant,
92        >,
93    >,
94
95    next_generation_number: u64,
96}
97
98#[derive(Debug, Clone)]
99pub struct LocalObservations {
100    moving_average: SingleSumSMA<Duration, u32, SMA_LOCAL_OBSERVATION_WINDOW_SIZE>,
101    weighted_moving_average: WeightedMovingAverage,
102    last_shared: Option<(Duration, Instant)>,
103    config: ExecutionTimeObserverConfig,
104}
105
106impl LocalObservations {
107    fn new(config: ExecutionTimeObserverConfig, default_duration: Duration) -> Self {
108        let window_size = config.weighted_moving_average_window_size();
109        Self {
110            moving_average: SingleSumSMA::from_zero(Duration::ZERO),
111            weighted_moving_average: WeightedMovingAverage::new(
112                default_duration.as_micros() as u64,
113                window_size,
114            ),
115            last_shared: None,
116            config,
117        }
118    }
119
120    fn add_sample(&mut self, duration: Duration, gas_price: u64) {
121        self.moving_average.add_sample(duration);
122        self.weighted_moving_average
123            .add_sample(duration.as_micros() as u64, gas_price);
124    }
125
126    fn get_average(&self) -> Duration {
127        if self.config.enable_gas_price_weighting() {
128            Duration::from_micros(self.weighted_moving_average.get_weighted_average())
129        } else {
130            self.moving_average.get_average()
131        }
132    }
133
134    fn diff_exceeds_threshold(
135        &self,
136        new_average: Duration,
137        threshold: f64,
138        min_interval: Duration,
139    ) -> bool {
140        let Some((last_shared, last_shared_timestamp)) = self.last_shared else {
141            // Diff threshold exceeded by default if we haven't shared anything yet.
142            return true;
143        };
144
145        if last_shared_timestamp.elapsed() < min_interval {
146            return false;
147        }
148
149        if threshold >= 0.0 {
150            // Positive threshold requires upward change.
151            new_average
152                .checked_sub(last_shared)
153                .is_some_and(|diff| diff > last_shared.mul_f64(threshold))
154        } else {
155            // Negative threshold requires downward change.
156            last_shared
157                .checked_sub(new_average)
158                .is_some_and(|diff| diff > last_shared.mul_f64(-threshold))
159        }
160    }
161}
162
163#[derive(Debug, Clone)]
164pub struct ObjectUtilization {
165    excess_execution_time: Duration,
166    last_measured: Option<Instant>,
167    was_overutilized: bool, // true if the object has ever had excess_execution_time
168}
169
170impl ObjectUtilization {
171    pub fn overutilized(&self, config: &ExecutionTimeObserverConfig) -> bool {
172        self.excess_execution_time > config.observation_sharing_object_utilization_threshold()
173    }
174}
175
176// Tracks local execution time observations and shares them via consensus.
177impl ExecutionTimeObserver {
178    pub fn spawn(
179        epoch_store: Arc<AuthorityPerEpochStore>,
180        consensus_adapter: Box<dyn SubmitToConsensus>,
181        config: ExecutionTimeObserverConfig,
182    ) {
183        let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) = epoch_store
184            .protocol_config()
185            .per_object_congestion_control_mode()
186        else {
187            info!(
188                "ExecutionTimeObserver disabled because per-object congestion control mode is not ExecutionTimeEstimate"
189            );
190            return;
191        };
192
193        let (tx_local_execution_time, mut rx_local_execution_time) =
194            mpsc::channel(config.observation_channel_capacity().into());
195        let (tx_object_debts, mut rx_object_debts) =
196            mpsc::channel(config.object_debt_channel_capacity().into());
197        epoch_store.set_local_execution_time_channels(tx_local_execution_time, tx_object_debts);
198
199        // TODO: pre-populate local observations with stored data from prior epoch.
200        let mut observer = Self {
201            epoch_store: Arc::downgrade(&epoch_store),
202            consensus_adapter,
203            local_observations: LruCache::new(config.observation_cache_size()),
204            object_utilization_tracker: LruCache::new(config.object_utilization_cache_size()),
205            indebted_objects: Vec::new(),
206            sharing_rate_limiter: RateLimiter::direct_with_clock(
207                Quota::per_second(config.observation_sharing_rate_limit())
208                    .allow_burst(config.observation_sharing_burst_limit()),
209                &MonotonicClock,
210            ),
211            protocol_params,
212            config,
213            next_generation_number: SystemTime::now()
214                .duration_since(std::time::UNIX_EPOCH)
215                .expect("Sui did not exist prior to 1970")
216                .as_micros()
217                .try_into()
218                .expect("This build of sui is not supported in the year 500,000"),
219        };
220        spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
221            loop {
222                tokio::select! {
223                    // TODO: add metrics for messages received.
224                    Some(object_debts) = rx_object_debts.recv() => {
225                        observer.update_indebted_objects(object_debts);
226                    }
227                    Some((tx, timings, total_duration, gas_price)) = rx_local_execution_time.recv() => {
228                        observer
229                            .record_local_observations(&tx, &timings, total_duration, gas_price);
230                    }
231                    else => { break }
232                }
233            }
234            info!("shutting down ExecutionTimeObserver");
235        }));
236    }
237
238    #[cfg(test)]
239    fn new_for_testing(
240        epoch_store: Arc<AuthorityPerEpochStore>,
241        consensus_adapter: Box<dyn SubmitToConsensus>,
242        observation_sharing_object_utilization_threshold: Duration,
243        enable_gas_price_weighting: bool,
244    ) -> Self {
245        let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) = epoch_store
246            .protocol_config()
247            .per_object_congestion_control_mode()
248        else {
249            panic!(
250                "tried to construct test ExecutionTimeObserver when congestion control mode is not ExecutionTimeEstimate"
251            );
252        };
253        Self {
254            epoch_store: Arc::downgrade(&epoch_store),
255            consensus_adapter,
256            protocol_params,
257            config: ExecutionTimeObserverConfig {
258                observation_sharing_object_utilization_threshold: Some(
259                    observation_sharing_object_utilization_threshold,
260                ),
261                enable_gas_price_weighting: Some(enable_gas_price_weighting),
262                ..ExecutionTimeObserverConfig::default()
263            },
264            local_observations: LruCache::new(NonZeroUsize::new(10000).unwrap()),
265            object_utilization_tracker: LruCache::new(NonZeroUsize::new(50000).unwrap()),
266            indebted_objects: Vec::new(),
267            sharing_rate_limiter: RateLimiter::direct_with_clock(
268                Quota::per_hour(std::num::NonZeroU32::MAX),
269                &MonotonicClock,
270            ),
271            next_generation_number: SystemTime::now()
272                .duration_since(std::time::UNIX_EPOCH)
273                .expect("Sui did not exist prior to 1970")
274                .as_micros()
275                .try_into()
276                .expect("This build of sui is not supported in the year 500,000"),
277        }
278    }
279
280    // Used by execution to report observed per-entry-point execution times to the estimator.
281    // Updates moving averages and submits observation to consensus if local observation differs
282    // from consensus median.
283    // TODO: Consider more detailed heuristic to account for overhead outside of commands.
284    fn record_local_observations(
285        &mut self,
286        tx: &ProgrammableTransaction,
287        timings: &[ExecutionTiming],
288        total_duration: Duration,
289        gas_price: u64,
290    ) {
291        let _scope = monitored_scope("ExecutionTimeObserver::record_local_observations");
292
293        // Simulate timing in test contexts to trigger congestion control.
294        #[cfg(msim)]
295        let should_inject = self.config.inject_synthetic_execution_time();
296        #[cfg(not(msim))]
297        let should_inject = antithesis_enable_injecting_synthetic_execution_time();
298
299        if should_inject {
300            let (generated_timings, generated_duration) = self.generate_test_timings(tx, timings);
301            self.record_local_observations_timing(
302                tx,
303                &generated_timings,
304                generated_duration,
305                gas_price,
306            )
307        } else {
308            self.record_local_observations_timing(tx, timings, total_duration, gas_price)
309        }
310    }
311
312    fn record_local_observations_timing(
313        &mut self,
314        tx: &ProgrammableTransaction,
315        timings: &[ExecutionTiming],
316        total_duration: Duration,
317        gas_price: u64,
318    ) {
319        assert!(tx.commands.len() >= timings.len());
320
321        let Some(epoch_store) = self.epoch_store.upgrade() else {
322            debug!("epoch is ending, dropping execution time observation");
323            return;
324        };
325
326        let mut uses_indebted_object = false;
327
328        // Update the accumulated excess execution time for shared object
329        // used for exclusive access in this transaction, and determine the max overage.
330        let max_excess_per_object_execution_time = tx
331            .shared_input_objects()
332            .filter_map(|obj| obj.is_accessed_exclusively().then_some(obj.id))
333            .map(|id| {
334                // Mark if any object used in the tx is indebted.
335                if !uses_indebted_object && self.indebted_objects.binary_search(&id).is_ok() {
336                    uses_indebted_object = true;
337                }
338
339                // For each object:
340                // - add the execution time of the current transaction to the tracker
341                // - subtract the maximum amount of time available for execution according
342                //   to our utilization target since the last report was received
343                //   (clamping to zero)
344                //
345                // What remains is the amount of excess time spent executing transactions on
346                // the object above the intended limit. If this value is greater than zero,
347                // it means the object is overutilized.
348                let now = Instant::now();
349                let utilization =
350                    self.object_utilization_tracker
351                        .get_or_insert_mut(id, || ObjectUtilization {
352                            excess_execution_time: Duration::ZERO,
353                            last_measured: None,
354                            was_overutilized: false,
355                        });
356                let overutilized_at_start = utilization.overutilized(&self.config);
357                utilization.excess_execution_time += total_duration;
358                utilization.excess_execution_time =
359                    utilization.excess_execution_time.saturating_sub(
360                        utilization
361                            .last_measured
362                            .map(|last_measured| {
363                                now.duration_since(last_measured)
364                                    .mul_f64(self.protocol_params.target_utilization as f64 / 100.0)
365                            })
366                            .unwrap_or(Duration::MAX),
367                    );
368                utilization.last_measured = Some(now);
369                if utilization.overutilized(&self.config) {
370                    utilization.was_overutilized = true;
371                }
372
373                // Update overutilized objects metrics.
374                if !overutilized_at_start && utilization.overutilized(&self.config) {
375                    trace!("object {id:?} is overutilized");
376                    epoch_store
377                        .metrics
378                        .epoch_execution_time_observer_overutilized_objects
379                        .inc();
380                } else if overutilized_at_start && !utilization.overutilized(&self.config) {
381                    epoch_store
382                        .metrics
383                        .epoch_execution_time_observer_overutilized_objects
384                        .dec();
385                }
386                if utilization.was_overutilized {
387                    let key = if self.config.report_object_utilization_metric_with_full_id() {
388                        id.to_string()
389                    } else {
390                        let key_lsb = id.into_bytes()[ObjectID::LENGTH - 1];
391                        let hash = key_lsb % OBJECT_UTILIZATION_METRIC_HASH_MODULUS;
392                        format!("{:x}", hash)
393                    };
394
395                    epoch_store
396                        .metrics
397                        .epoch_execution_time_observer_object_utilization
398                        .with_label_values(&[key.as_str()])
399                        .inc_by(total_duration.as_secs_f64());
400                }
401
402                utilization.excess_execution_time
403            })
404            .max()
405            .unwrap_or(Duration::ZERO);
406        epoch_store
407            .metrics
408            .epoch_execution_time_observer_utilization_cache_size
409            .set(self.object_utilization_tracker.len() as i64);
410
411        let total_command_duration: Duration = timings.iter().map(|t| t.duration()).sum();
412        let extra_overhead = total_duration - total_command_duration;
413
414        let mut to_share = Vec::with_capacity(tx.commands.len());
415        for (i, timing) in timings.iter().enumerate() {
416            let command = &tx.commands[i];
417
418            // Special-case handling for Publish command: only use hard-coded default estimate.
419            if matches!(command, Command::Publish(_, _)) {
420                continue;
421            }
422
423            // TODO: Consider using failure/success information in computing estimates.
424            let mut command_duration = timing.duration();
425
426            // Distribute overhead proportionally to each command's measured duration.
427            let overhead_factor = if total_command_duration > Duration::ZERO {
428                command_duration.as_secs_f64() / total_command_duration.as_secs_f64()
429            } else {
430                // divisor here must be >0 or this loop would not be running at all
431                1.0 / (tx.commands.len() as f64)
432            };
433            command_duration += extra_overhead.mul_f64(overhead_factor);
434
435            // For native commands, adjust duration by length of command's inputs/outputs.
436            // This is sort of arbitrary, but hopefully works okay as a heuristic.
437            command_duration = command_duration.div_f64(command_length(command).get() as f64);
438
439            // Update gas-weighted moving-average observation for the command.
440            let key = ExecutionTimeObservationKey::from_command(command);
441            let local_observation = self.local_observations.get_or_insert_mut(key.clone(), || {
442                LocalObservations::new(self.config.clone(), Duration::ZERO)
443            });
444            local_observation.add_sample(command_duration, gas_price);
445
446            // Send a new observation through consensus if:
447            // - our current moving average differs too much from the last one we shared, and
448            // - the tx has at least one mutable shared object with utilization that's too high
449            // TODO: Consider only sharing observations that disagree with consensus estimate.
450            let new_average = local_observation.get_average();
451            let mut should_share = false;
452
453            // Share upward adjustments if an object is overutilized.
454            if max_excess_per_object_execution_time
455                >= self
456                    .config
457                    .observation_sharing_object_utilization_threshold()
458                && local_observation.diff_exceeds_threshold(
459                    new_average,
460                    self.config.observation_sharing_diff_threshold(),
461                    self.config.observation_sharing_min_interval(),
462                )
463            {
464                should_share = true;
465                epoch_store
466                    .metrics
467                    .epoch_execution_time_observations_sharing_reason
468                    .with_label_values(&["utilization"])
469                    .inc();
470            };
471
472            // Share downward adjustments if an object is indebted.
473            if uses_indebted_object
474                && local_observation.diff_exceeds_threshold(
475                    new_average,
476                    -self.config.observation_sharing_diff_threshold(),
477                    self.config.observation_sharing_min_interval(),
478                )
479            {
480                should_share = true;
481                epoch_store
482                    .metrics
483                    .epoch_execution_time_observations_sharing_reason
484                    .with_label_values(&["indebted"])
485                    .inc();
486            }
487
488            if should_share {
489                debug!("sharing new execution time observation for {key:?}: {new_average:?}");
490                to_share.push((key, new_average));
491                local_observation.last_shared = Some((new_average, Instant::now()));
492            }
493        }
494
495        // Share new observations.
496        self.share_observations(to_share);
497    }
498
499    fn generate_test_timings(
500        &self,
501        tx: &ProgrammableTransaction,
502        timings: &[ExecutionTiming],
503    ) -> (Vec<ExecutionTiming>, Duration) {
504        let generated_timings: Vec<_> = tx
505            .commands
506            .iter()
507            .zip(timings.iter())
508            .map(|(command, timing)| {
509                let key = ExecutionTimeObservationKey::from_command(command);
510                let duration = self.get_test_duration(&key);
511                if timing.is_abort() {
512                    ExecutionTiming::Abort(duration)
513                } else {
514                    ExecutionTiming::Success(duration)
515                }
516            })
517            .collect();
518
519        let total_duration = generated_timings
520            .iter()
521            .map(|t| t.duration())
522            .sum::<Duration>()
523            + thread_rng().gen_range(Duration::from_millis(10)..Duration::from_millis(50));
524
525        (generated_timings, total_duration)
526    }
527
528    fn get_test_duration(&self, key: &ExecutionTimeObservationKey) -> Duration {
529        #[cfg(msim)]
530        let should_inject = self.config.inject_synthetic_execution_time();
531        #[cfg(not(msim))]
532        let should_inject = false;
533
534        if !in_test_configuration() && !should_inject {
535            panic!("get_test_duration called in non-test configuration");
536        }
537
538        thread_local! {
539            static PER_TEST_SEED: u64 = random::<u64>();
540        }
541
542        let mut hasher = std::collections::hash_map::DefaultHasher::new();
543
544        let checkpoint_digest_used = self
545            .epoch_store
546            .upgrade()
547            .and_then(|store| {
548                store
549                    .get_lowest_non_genesis_checkpoint_summary()
550                    .ok()
551                    .flatten()
552            })
553            .map(|summary| summary.content_digest.hash(&mut hasher))
554            .is_some();
555
556        if !checkpoint_digest_used {
557            PER_TEST_SEED.with(|seed| seed.hash(&mut hasher));
558        }
559
560        key.hash(&mut hasher);
561        let mut rng = rngs::StdRng::seed_from_u64(hasher.finish());
562        rng.gen_range(Duration::from_millis(100)..Duration::from_millis(600))
563    }
564
565    fn share_observations(&mut self, to_share: Vec<(ExecutionTimeObservationKey, Duration)>) {
566        if to_share.is_empty() {
567            return;
568        }
569        let Some(epoch_store) = self.epoch_store.upgrade() else {
570            debug!("epoch is ending, dropping execution time observation");
571            return;
572        };
573
574        let num_observations = to_share.len() as u64;
575
576        // Enforce global observation-sharing rate limit.
577        if let Err(e) = self.sharing_rate_limiter.check() {
578            epoch_store
579                .metrics
580                .epoch_execution_time_observations_dropped
581                .with_label_values(&["global_rate_limit"])
582                .inc_by(num_observations);
583            debug!("rate limit exceeded, dropping execution time observation; {e:?}");
584            return;
585        }
586
587        let epoch_store = epoch_store.clone();
588        let transaction = ConsensusTransaction::new_execution_time_observation(
589            ExecutionTimeObservation::new(epoch_store.name, self.next_generation_number, to_share),
590        );
591        self.next_generation_number += 1;
592
593        if let Err(e) = self.consensus_adapter.submit_best_effort(
594            &transaction,
595            &epoch_store,
596            Duration::from_secs(5),
597        ) {
598            if !matches!(e.as_inner(), SuiErrorKind::EpochEnded(_)) {
599                epoch_store
600                    .metrics
601                    .epoch_execution_time_observations_dropped
602                    .with_label_values(&["submit_to_consensus"])
603                    .inc_by(num_observations);
604                warn!("failed to submit execution time observation: {e:?}");
605            }
606        } else {
607            // Note: it is not actually guaranteed that the observation has been submitted at this point,
608            // but that is also not true with ConsensusAdapter::submit_to_consensus. The only way to know
609            // for sure is to observe that the message is processed by consensus handler.
610            assert_reachable!("successfully shares execution time observations");
611            epoch_store
612                .metrics
613                .epoch_execution_time_observations_shared
614                .inc_by(num_observations);
615        }
616    }
617
618    fn update_indebted_objects(&mut self, mut object_debts: Vec<ObjectID>) {
619        let _scope = monitored_scope("ExecutionTimeObserver::update_indebted_objects");
620
621        let Some(epoch_store) = self.epoch_store.upgrade() else {
622            debug!("epoch is ending, dropping indebted object update");
623            return;
624        };
625
626        object_debts.sort_unstable();
627        object_debts.dedup();
628        self.indebted_objects = object_debts;
629        epoch_store
630            .metrics
631            .epoch_execution_time_observer_indebted_objects
632            .set(self.indebted_objects.len() as i64);
633    }
634}
635
636// Key used to save StoredExecutionTimeObservations in the Sui system state object's
637// `extra_fields` Bag.
638pub const EXTRA_FIELD_EXECUTION_TIME_ESTIMATES_KEY: u64 = 0;
639
640// Key used to save the chunk count for chunked execution time observations
641pub const EXTRA_FIELD_EXECUTION_TIME_ESTIMATES_CHUNK_COUNT_KEY: u64 = 1;
642
643// Tracks global execution time observations provided by validators from consensus
644// and computes deterministic per-command estimates for use in congestion control.
645pub struct ExecutionTimeEstimator {
646    committee: Arc<Committee>,
647    protocol_params: ExecutionTimeEstimateParams,
648
649    consensus_observations: HashMap<ExecutionTimeObservationKey, ConsensusObservations>,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize)]
653pub struct ConsensusObservations {
654    observations: Vec<(u64 /* generation */, Option<Duration>)>, // keyed by authority index
655    stake_weighted_median: Option<Duration>,                     // cached value
656}
657
658impl ConsensusObservations {
659    fn update_stake_weighted_median(
660        &mut self,
661        committee: &Committee,
662        config: &ExecutionTimeEstimateParams,
663    ) {
664        let mut stake_with_observations = 0;
665        let sorted_observations: Vec<_> = self
666            .observations
667            .iter()
668            .enumerate()
669            .filter_map(|(i, (_, duration))| {
670                duration.map(|duration| {
671                    let authority_index: AuthorityIndex = i.try_into().unwrap();
672                    stake_with_observations += committee.stake_by_index(authority_index).unwrap();
673                    (duration, authority_index)
674                })
675            })
676            .sorted()
677            .collect();
678
679        // Don't use observations until we have received enough.
680        if stake_with_observations < config.stake_weighted_median_threshold {
681            self.stake_weighted_median = None;
682            return;
683        }
684
685        // Compute stake-weighted median.
686        let median_stake = stake_with_observations / 2;
687        let mut running_stake = 0;
688        for (duration, authority_index) in sorted_observations {
689            running_stake += committee.stake_by_index(authority_index).unwrap();
690            if running_stake > median_stake {
691                self.stake_weighted_median = Some(duration);
692                break;
693            }
694        }
695    }
696}
697
698impl ExecutionTimeEstimator {
699    pub fn new(
700        committee: Arc<Committee>,
701        protocol_params: ExecutionTimeEstimateParams,
702        initial_observations: impl Iterator<
703            Item = (
704                AuthorityIndex,
705                Option<u64>,
706                ExecutionTimeObservationKey,
707                Duration,
708            ),
709        >,
710    ) -> Self {
711        let mut estimator = Self {
712            committee,
713            protocol_params,
714            consensus_observations: HashMap::new(),
715        };
716        for (source, generation, key, duration) in initial_observations {
717            estimator.process_observation_from_consensus(
718                source,
719                generation,
720                key.to_owned(),
721                duration,
722                true,
723            );
724        }
725        for observation in estimator.consensus_observations.values_mut() {
726            observation
727                .update_stake_weighted_median(&estimator.committee, &estimator.protocol_params);
728        }
729        estimator
730    }
731
732    #[cfg(test)]
733    pub fn new_for_testing() -> Self {
734        let (committee, _) = Committee::new_simple_test_committee_of_size(1);
735        Self {
736            committee: Arc::new(committee),
737            protocol_params: ExecutionTimeEstimateParams {
738                target_utilization: 100,
739                max_estimate_us: u64::MAX,
740                ..ExecutionTimeEstimateParams::default()
741            },
742            consensus_observations: HashMap::new(),
743        }
744    }
745
746    pub fn process_observations_from_consensus(
747        &mut self,
748        source: AuthorityIndex,
749        generation: Option<u64>,
750        observations: &[(ExecutionTimeObservationKey, Duration)],
751    ) {
752        for (key, duration) in observations {
753            self.process_observation_from_consensus(
754                source,
755                generation,
756                key.to_owned(),
757                *duration,
758                false,
759            );
760        }
761    }
762
763    fn process_observation_from_consensus(
764        &mut self,
765        source: AuthorityIndex,
766        generation: Option<u64>,
767        observation_key: ExecutionTimeObservationKey,
768        duration: Duration,
769        skip_update: bool,
770    ) {
771        if matches!(observation_key, ExecutionTimeObservationKey::Publish) {
772            // Special-case handling for Publish command: only use hard-coded default estimate.
773            warn!(
774                "dropping Publish observation received from possibly-Byzanitine authority {source}"
775            );
776            return;
777        }
778
779        assert_reachable!("receives some valid execution time observations");
780
781        let observations = self
782            .consensus_observations
783            .entry(observation_key)
784            .or_insert_with(|| {
785                let len = self.committee.num_members();
786                let mut empty_observations = Vec::with_capacity(len);
787                empty_observations.resize(len, (0, None));
788                ConsensusObservations {
789                    observations: empty_observations,
790                    stake_weighted_median: if self
791                        .protocol_params
792                        .default_none_duration_for_new_keys
793                    {
794                        None
795                    } else {
796                        Some(Duration::ZERO)
797                    },
798                }
799            });
800
801        let (obs_generation, obs_duration) =
802            &mut observations.observations[TryInto::<usize>::try_into(source).unwrap()];
803        if generation.is_some_and(|generation| *obs_generation >= generation) {
804            // Ignore outdated observation.
805            return;
806        }
807        *obs_generation = generation.unwrap_or(0);
808        *obs_duration = Some(duration);
809        if !skip_update {
810            observations.update_stake_weighted_median(&self.committee, &self.protocol_params);
811        }
812    }
813
814    pub fn get_estimate(&self, tx: &TransactionData) -> Duration {
815        let TransactionKind::ProgrammableTransaction(tx) = tx.kind() else {
816            debug_fatal!("get_estimate called on non-ProgrammableTransaction");
817            return Duration::ZERO;
818        };
819        tx.commands
820            .iter()
821            .map(|command| {
822                let key = ExecutionTimeObservationKey::from_command(command);
823                self.consensus_observations
824                    .get(&key)
825                    .and_then(|obs| obs.stake_weighted_median)
826                    .unwrap_or_else(|| key.default_duration())
827                    // For native commands, adjust duration by length of command's inputs/outputs.
828                    // This is sort of arbitrary, but hopefully works okay as a heuristic.
829                    .mul_f64(command_length(command).get() as f64)
830            })
831            .sum::<Duration>()
832            .min(Duration::from_micros(self.protocol_params.max_estimate_us))
833    }
834
835    pub fn take_observations(&mut self) -> StoredExecutionTimeObservations {
836        StoredExecutionTimeObservations::V1(
837            self.consensus_observations
838                .drain()
839                .map(|(key, observations)| {
840                    let observations = observations
841                        .observations
842                        .into_iter()
843                        .enumerate()
844                        .filter_map(|(idx, (_, duration))| {
845                            duration.map(|d| {
846                                (
847                                    self.committee
848                                        .authority_by_index(idx.try_into().unwrap())
849                                        .cloned()
850                                        .unwrap(),
851                                    d,
852                                )
853                            })
854                        })
855                        .collect();
856                    (key, observations)
857                })
858                .collect(),
859        )
860    }
861
862    pub fn get_observations(&self) -> Vec<(ExecutionTimeObservationKey, ConsensusObservations)> {
863        self.consensus_observations
864            .iter()
865            .map(|(key, observations)| (key.clone(), observations.clone()))
866            .collect()
867    }
868}
869
870fn command_length(command: &Command) -> NonZeroUsize {
871    // Commands with variable-length inputs/outputs are reported as +1
872    // to account for fixed overhead and prevent divide-by-zero.
873    NonZeroUsize::new(match command {
874        Command::MoveCall(_) => 1,
875        Command::TransferObjects(src, _) => src.len() + 1,
876        Command::SplitCoins(_, amts) => amts.len() + 1,
877        Command::MergeCoins(_, src) => src.len() + 1,
878        Command::Publish(_, _) => 1,
879        Command::MakeMoveVec(_, src) => src.len() + 1,
880        Command::Upgrade(_, _, _, _) => 1,
881    })
882    .unwrap()
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888    use crate::authority::test_authority_builder::TestAuthorityBuilder;
889    use crate::checkpoints::CheckpointStore;
890    use crate::consensus_adapter::{
891        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
892        MockConsensusClient,
893    };
894    use sui_protocol_config::ProtocolConfig;
895    use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress};
896    use sui_types::transaction::{
897        Argument, CallArg, ObjectArg, ProgrammableMoveCall, SharedObjectMutability,
898    };
899    use {
900        rand::{Rng, SeedableRng},
901        sui_protocol_config::ProtocolVersion,
902        sui_types::supported_protocol_versions::Chain,
903    };
904
905    #[tokio::test]
906    async fn test_record_local_observations() {
907        telemetry_subscribers::init_for_testing();
908
909        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
910            config.set_per_object_congestion_control_mode_for_testing(
911                PerObjectCongestionControlMode::ExecutionTimeEstimate(
912                    ExecutionTimeEstimateParams {
913                        target_utilization: 100,
914                        allowed_txn_cost_overage_burst_limit_us: 0,
915                        randomness_scalar: 100,
916                        max_estimate_us: u64::MAX,
917                        stored_observations_num_included_checkpoints: 10,
918                        stored_observations_limit: u64::MAX,
919                        stake_weighted_median_threshold: 0,
920                        default_none_duration_for_new_keys: true,
921                        observations_chunk_size: None,
922                    },
923                ),
924            );
925            config
926        });
927
928        let mock_consensus_client = MockConsensusClient::new();
929        let authority = TestAuthorityBuilder::new().build().await;
930        let epoch_store = authority.epoch_store_for_testing();
931        let consensus_adapter = Arc::new(ConsensusAdapter::new(
932            Arc::new(mock_consensus_client),
933            CheckpointStore::new_for_tests(),
934            authority.name,
935            Arc::new(ConnectionMonitorStatusForTests {}),
936            100_000,
937            100_000,
938            None,
939            None,
940            ConsensusAdapterMetrics::new_test(),
941            epoch_store.protocol_config().clone(),
942        ));
943        let mut observer = ExecutionTimeObserver::new_for_testing(
944            epoch_store.clone(),
945            Box::new(consensus_adapter.clone()),
946            Duration::ZERO, // disable object utilization thresholds for this test
947            false,          // disable gas price weighting for this test
948        );
949
950        // Create a simple PTB with one move call
951        let package = ObjectID::random();
952        let module = "test_module".to_string();
953        let function = "test_function".to_string();
954        let ptb = ProgrammableTransaction {
955            inputs: vec![],
956            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
957                package,
958                module: module.clone(),
959                function: function.clone(),
960                type_arguments: vec![],
961                arguments: vec![],
962            }))],
963        };
964
965        // Record an observation
966        let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
967        let total_duration = Duration::from_millis(110);
968        observer.record_local_observations(&ptb, &timings, total_duration, 1);
969
970        let key = ExecutionTimeObservationKey::MoveEntryPoint {
971            package,
972            module: module.clone(),
973            function: function.clone(),
974            type_arguments: vec![],
975        };
976
977        // Check that local observation was recorded and shared
978        let local_obs = observer.local_observations.get(&key).unwrap();
979        assert_eq!(
980            local_obs.get_average(),
981            // 10ms overhead should be entirely apportioned to the one command in the PTB
982            Duration::from_millis(110)
983        );
984        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
985
986        // Record another observation
987        let timings = vec![ExecutionTiming::Success(Duration::from_millis(110))];
988        let total_duration = Duration::from_millis(120);
989        observer.record_local_observations(&ptb, &timings, total_duration, 1);
990
991        // Check that moving average was updated
992        let local_obs = observer.local_observations.get(&key).unwrap();
993        assert_eq!(
994            local_obs.get_average(),
995            // average of 110ms and 120ms observations
996            Duration::from_millis(115)
997        );
998        // new 115ms average should not be shared; it's <5% different from 110ms
999        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1000
1001        // Record another observation
1002        let timings = vec![ExecutionTiming::Success(Duration::from_millis(120))];
1003        let total_duration = Duration::from_millis(130);
1004        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1005
1006        // Check that moving average was updated
1007        let local_obs = observer.local_observations.get(&key).unwrap();
1008        assert_eq!(
1009            local_obs.get_average(),
1010            // average of [110ms, 120ms, 130ms]
1011            Duration::from_millis(120)
1012        );
1013        // new 120ms average should not be shared; it's >5% different from 110ms,
1014        // but not enough time has passed
1015        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1016
1017        // Manually update last-shared time to long ago
1018        observer
1019            .local_observations
1020            .get_mut(&key)
1021            .unwrap()
1022            .last_shared = Some((
1023            Duration::from_millis(110),
1024            Instant::now() - Duration::from_secs(60),
1025        ));
1026
1027        // Record last observation
1028        let timings = vec![ExecutionTiming::Success(Duration::from_millis(120))];
1029        let total_duration = Duration::from_millis(160);
1030        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1031
1032        // Verify that moving average is the same and a new observation was shared, as
1033        // enough time has now elapsed
1034        let local_obs = observer.local_observations.get(&key).unwrap();
1035        assert_eq!(
1036            local_obs.get_average(),
1037            // average of [110ms, 120ms, 130ms, 160ms]
1038            Duration::from_millis(130)
1039        );
1040        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(130));
1041    }
1042
1043    #[tokio::test]
1044    async fn test_record_local_observations_with_gas_price_weighting() {
1045        telemetry_subscribers::init_for_testing();
1046
1047        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1048            config.set_per_object_congestion_control_mode_for_testing(
1049                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1050                    ExecutionTimeEstimateParams {
1051                        target_utilization: 100,
1052                        allowed_txn_cost_overage_burst_limit_us: 0,
1053                        randomness_scalar: 100,
1054                        max_estimate_us: u64::MAX,
1055                        stored_observations_num_included_checkpoints: 10,
1056                        stored_observations_limit: u64::MAX,
1057                        stake_weighted_median_threshold: 0,
1058                        default_none_duration_for_new_keys: true,
1059                        observations_chunk_size: None,
1060                    },
1061                ),
1062            );
1063            config
1064        });
1065
1066        let mock_consensus_client = MockConsensusClient::new();
1067        let authority = TestAuthorityBuilder::new().build().await;
1068        let epoch_store = authority.epoch_store_for_testing();
1069        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1070            Arc::new(mock_consensus_client),
1071            CheckpointStore::new_for_tests(),
1072            authority.name,
1073            Arc::new(ConnectionMonitorStatusForTests {}),
1074            100_000,
1075            100_000,
1076            None,
1077            None,
1078            ConsensusAdapterMetrics::new_test(),
1079            epoch_store.protocol_config().clone(),
1080        ));
1081        let mut observer = ExecutionTimeObserver::new_for_testing(
1082            epoch_store.clone(),
1083            Box::new(consensus_adapter.clone()),
1084            Duration::ZERO, // disable object utilization thresholds for this test
1085            true,           // enable gas price weighting for this test
1086        );
1087
1088        // Create a simple PTB with one move call
1089        let package = ObjectID::random();
1090        let module = "test_module".to_string();
1091        let function = "test_function".to_string();
1092        let ptb = ProgrammableTransaction {
1093            inputs: vec![],
1094            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1095                package,
1096                module: module.clone(),
1097                function: function.clone(),
1098                type_arguments: vec![],
1099                arguments: vec![],
1100            }))],
1101        };
1102
1103        // Record an observation
1104        let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
1105        let total_duration = Duration::from_millis(110);
1106        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1107
1108        let key = ExecutionTimeObservationKey::MoveEntryPoint {
1109            package,
1110            module: module.clone(),
1111            function: function.clone(),
1112            type_arguments: vec![],
1113        };
1114
1115        // Check that local observation was recorded and shared
1116        let local_obs = observer.local_observations.get(&key).unwrap();
1117        assert_eq!(
1118            local_obs.get_average(),
1119            // 10ms overhead should be entirely apportioned to the one command in the PTB
1120            Duration::from_millis(110)
1121        );
1122        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1123
1124        // Record another observation
1125        let timings = vec![ExecutionTiming::Success(Duration::from_millis(110))];
1126        let total_duration = Duration::from_millis(120);
1127        observer.record_local_observations(&ptb, &timings, total_duration, 2);
1128
1129        // Check that weighted moving average was updated
1130        let local_obs = observer.local_observations.get(&key).unwrap();
1131        assert_eq!(
1132            local_obs.get_average(),
1133            // Our local observation averages are weighted by gas price:
1134            // 110ms * 1 + 110ms * 2 / (1 + 2) = 116.666ms
1135            Duration::from_micros(116_666)
1136        );
1137    }
1138
1139    #[tokio::test]
1140    async fn test_record_local_observations_with_multiple_commands() {
1141        telemetry_subscribers::init_for_testing();
1142
1143        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1144            config.set_per_object_congestion_control_mode_for_testing(
1145                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1146                    ExecutionTimeEstimateParams {
1147                        target_utilization: 100,
1148                        allowed_txn_cost_overage_burst_limit_us: 0,
1149                        randomness_scalar: 0,
1150                        max_estimate_us: u64::MAX,
1151                        stored_observations_num_included_checkpoints: 10,
1152                        stored_observations_limit: u64::MAX,
1153                        stake_weighted_median_threshold: 0,
1154                        default_none_duration_for_new_keys: true,
1155                        observations_chunk_size: None,
1156                    },
1157                ),
1158            );
1159            config
1160        });
1161
1162        let mock_consensus_client = MockConsensusClient::new();
1163        let authority = TestAuthorityBuilder::new().build().await;
1164        let epoch_store = authority.epoch_store_for_testing();
1165        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1166            Arc::new(mock_consensus_client),
1167            CheckpointStore::new_for_tests(),
1168            authority.name,
1169            Arc::new(ConnectionMonitorStatusForTests {}),
1170            100_000,
1171            100_000,
1172            None,
1173            None,
1174            ConsensusAdapterMetrics::new_test(),
1175            epoch_store.protocol_config().clone(),
1176        ));
1177        let mut observer = ExecutionTimeObserver::new_for_testing(
1178            epoch_store.clone(),
1179            Box::new(consensus_adapter.clone()),
1180            Duration::ZERO, // disable object utilization thresholds for this test
1181            false,          // disable gas price weighting for this test
1182        );
1183
1184        // Create a PTB with multiple commands.
1185        let package = ObjectID::random();
1186        let module = "test_module".to_string();
1187        let function = "test_function".to_string();
1188        let ptb = ProgrammableTransaction {
1189            inputs: vec![],
1190            commands: vec![
1191                Command::MoveCall(Box::new(ProgrammableMoveCall {
1192                    package,
1193                    module: module.clone(),
1194                    function: function.clone(),
1195                    type_arguments: vec![],
1196                    arguments: vec![],
1197                })),
1198                Command::TransferObjects(
1199                    // Inputs don't exist above, but doesn't matter for this test.
1200                    vec![Argument::Input(1), Argument::Input(2)],
1201                    Argument::Input(0),
1202                ),
1203            ],
1204        };
1205        let timings = vec![
1206            ExecutionTiming::Success(Duration::from_millis(100)),
1207            ExecutionTiming::Success(Duration::from_millis(50)),
1208        ];
1209        let total_duration = Duration::from_millis(180);
1210        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1211
1212        // Check that both commands were recorded
1213        let move_key = ExecutionTimeObservationKey::MoveEntryPoint {
1214            package,
1215            module: module.clone(),
1216            function: function.clone(),
1217            type_arguments: vec![],
1218        };
1219        let move_obs = observer.local_observations.get(&move_key).unwrap();
1220        assert_eq!(
1221            move_obs.get_average(),
1222            // 100/150 == 2/3 of 30ms overhead distributed to Move command
1223            Duration::from_millis(120)
1224        );
1225
1226        let transfer_obs = observer
1227            .local_observations
1228            .get(&ExecutionTimeObservationKey::TransferObjects)
1229            .unwrap();
1230        assert_eq!(
1231            transfer_obs.get_average(),
1232            // 50ms time before adjustments
1233            // 50/150 == 1/3 of 30ms overhead distributed to object xfer
1234            // 60ms adjusetd time / 3 command length == 20ms
1235            Duration::from_millis(20)
1236        );
1237    }
1238
1239    #[tokio::test]
1240    async fn test_record_local_observations_with_object_utilization_threshold() {
1241        telemetry_subscribers::init_for_testing();
1242
1243        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1244            config.set_per_object_congestion_control_mode_for_testing(
1245                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1246                    ExecutionTimeEstimateParams {
1247                        target_utilization: 100,
1248                        allowed_txn_cost_overage_burst_limit_us: 0,
1249                        randomness_scalar: 0,
1250                        max_estimate_us: u64::MAX,
1251                        stored_observations_num_included_checkpoints: 10,
1252                        stored_observations_limit: u64::MAX,
1253                        stake_weighted_median_threshold: 0,
1254                        default_none_duration_for_new_keys: true,
1255                        observations_chunk_size: None,
1256                    },
1257                ),
1258            );
1259            config
1260        });
1261
1262        let mock_consensus_client = MockConsensusClient::new();
1263        let authority = TestAuthorityBuilder::new().build().await;
1264        let epoch_store = authority.epoch_store_for_testing();
1265        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1266            Arc::new(mock_consensus_client),
1267            CheckpointStore::new_for_tests(),
1268            authority.name,
1269            Arc::new(ConnectionMonitorStatusForTests {}),
1270            100_000,
1271            100_000,
1272            None,
1273            None,
1274            ConsensusAdapterMetrics::new_test(),
1275            epoch_store.protocol_config().clone(),
1276        ));
1277        let mut observer = ExecutionTimeObserver::new_for_testing(
1278            epoch_store.clone(),
1279            Box::new(consensus_adapter.clone()),
1280            Duration::from_millis(500), // only share observations with excess utilization >= 500ms
1281            false,                      // disable gas price weighting for this test
1282        );
1283
1284        // Create a simple PTB with one move call and one mutable shared input
1285        let package = ObjectID::random();
1286        let module = "test_module".to_string();
1287        let function = "test_function".to_string();
1288        let shared_object_id = ObjectID::random();
1289        let ptb = ProgrammableTransaction {
1290            inputs: vec![CallArg::Object(ObjectArg::SharedObject {
1291                id: shared_object_id,
1292                initial_shared_version: SequenceNumber::new(),
1293                mutability: SharedObjectMutability::Mutable,
1294            })],
1295            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1296                package,
1297                module: module.clone(),
1298                function: function.clone(),
1299                type_arguments: vec![],
1300                arguments: vec![],
1301            }))],
1302        };
1303        let key = ExecutionTimeObservationKey::MoveEntryPoint {
1304            package,
1305            module: module.clone(),
1306            function: function.clone(),
1307            type_arguments: vec![],
1308        };
1309
1310        tokio::time::pause();
1311
1312        // First observation - should not share due to low utilization
1313        let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1314        observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1315        assert!(
1316            observer
1317                .local_observations
1318                .get(&key)
1319                .unwrap()
1320                .last_shared
1321                .is_none()
1322        );
1323
1324        // Second observation - no time has passed, so now utilization is high; should share upward change
1325        let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1326        observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1327        assert_eq!(
1328            observer
1329                .local_observations
1330                .get(&key)
1331                .unwrap()
1332                .last_shared
1333                .unwrap()
1334                .0,
1335            Duration::from_secs(2)
1336        );
1337
1338        // Third execution with significant upward diff and high utilization - should share again
1339        tokio::time::advance(Duration::from_secs(5)).await;
1340        let timings = vec![ExecutionTiming::Success(Duration::from_secs(3))];
1341        observer.record_local_observations(&ptb, &timings, Duration::from_secs(5), 1);
1342        assert_eq!(
1343            observer
1344                .local_observations
1345                .get(&key)
1346                .unwrap()
1347                .last_shared
1348                .unwrap()
1349                .0,
1350            Duration::from_secs(3)
1351        );
1352
1353        // Fourth execution with significant downward diff but still overutilized - should NOT share downward change
1354        // (downward changes are only shared for indebted objects, not overutilized ones)
1355        tokio::time::advance(Duration::from_millis(150)).await;
1356        let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
1357        observer.record_local_observations(&ptb, &timings, Duration::from_millis(500), 1);
1358        assert_eq!(
1359            observer
1360                .local_observations
1361                .get(&key)
1362                .unwrap()
1363                .last_shared
1364                .unwrap()
1365                .0,
1366            Duration::from_secs(3) // still the old value, no sharing of downward change
1367        );
1368
1369        // Fifth execution after utilization drops - should not share upward diff since not overutilized
1370        tokio::time::advance(Duration::from_secs(60)).await;
1371        let timings = vec![ExecutionTiming::Success(Duration::from_secs(11))];
1372        observer.record_local_observations(&ptb, &timings, Duration::from_secs(11), 1);
1373        assert_eq!(
1374            observer
1375                .local_observations
1376                .get(&key)
1377                .unwrap()
1378                .last_shared
1379                .unwrap()
1380                .0,
1381            Duration::from_secs(3) // still the old value, no sharing when not overutilized
1382        );
1383    }
1384
1385    #[tokio::test]
1386    async fn test_record_local_observations_with_indebted_objects() {
1387        telemetry_subscribers::init_for_testing();
1388
1389        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1390            config.set_per_object_congestion_control_mode_for_testing(
1391                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1392                    ExecutionTimeEstimateParams {
1393                        target_utilization: 100,
1394                        allowed_txn_cost_overage_burst_limit_us: 0,
1395                        randomness_scalar: 0,
1396                        max_estimate_us: u64::MAX,
1397                        stored_observations_num_included_checkpoints: 10,
1398                        stored_observations_limit: u64::MAX,
1399                        stake_weighted_median_threshold: 0,
1400                        default_none_duration_for_new_keys: true,
1401                        observations_chunk_size: None,
1402                    },
1403                ),
1404            );
1405            config
1406        });
1407
1408        let mock_consensus_client = MockConsensusClient::new();
1409        let authority = TestAuthorityBuilder::new().build().await;
1410        let epoch_store = authority.epoch_store_for_testing();
1411        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1412            Arc::new(mock_consensus_client),
1413            CheckpointStore::new_for_tests(),
1414            authority.name,
1415            Arc::new(ConnectionMonitorStatusForTests {}),
1416            100_000,
1417            100_000,
1418            None,
1419            None,
1420            ConsensusAdapterMetrics::new_test(),
1421            epoch_store.protocol_config().clone(),
1422        ));
1423        let mut observer = ExecutionTimeObserver::new_for_testing(
1424            epoch_store.clone(),
1425            Box::new(consensus_adapter.clone()),
1426            Duration::from_millis(500), // Low utilization threshold to enable overutilized sharing initially
1427            false,                      // disable gas price weighting for this test
1428        );
1429
1430        // Create a simple PTB with one move call and one mutable shared input
1431        let package = ObjectID::random();
1432        let module = "test_module".to_string();
1433        let function = "test_function".to_string();
1434        let shared_object_id = ObjectID::random();
1435        let ptb = ProgrammableTransaction {
1436            inputs: vec![CallArg::Object(ObjectArg::SharedObject {
1437                id: shared_object_id,
1438                initial_shared_version: SequenceNumber::new(),
1439                mutability: SharedObjectMutability::Mutable,
1440            })],
1441            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1442                package,
1443                module: module.clone(),
1444                function: function.clone(),
1445                type_arguments: vec![],
1446                arguments: vec![],
1447            }))],
1448        };
1449        let key = ExecutionTimeObservationKey::MoveEntryPoint {
1450            package,
1451            module: module.clone(),
1452            function: function.clone(),
1453            type_arguments: vec![],
1454        };
1455
1456        tokio::time::pause();
1457
1458        // First observation - should not share due to low utilization
1459        let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1460        observer.record_local_observations(&ptb, &timings, Duration::from_secs(1), 1);
1461        assert!(
1462            observer
1463                .local_observations
1464                .get(&key)
1465                .unwrap()
1466                .last_shared
1467                .is_none()
1468        );
1469
1470        // Second observation - no time has passed, so now utilization is high; should share upward change
1471        let timings = vec![ExecutionTiming::Success(Duration::from_secs(2))];
1472        observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1473        assert_eq!(
1474            observer
1475                .local_observations
1476                .get(&key)
1477                .unwrap()
1478                .last_shared
1479                .unwrap()
1480                .0,
1481            Duration::from_millis(1500) // (1s + 2s) / 2 = 1.5s
1482        );
1483
1484        // Mark the shared object as indebted and increase utilization threshold to prevent overutilized sharing
1485        observer.update_indebted_objects(vec![shared_object_id]);
1486        observer
1487            .config
1488            .observation_sharing_object_utilization_threshold = Some(Duration::from_secs(1000));
1489
1490        // Wait for min interval and record a significant downward change
1491        // This should share because the object is indebted
1492        tokio::time::advance(Duration::from_secs(60)).await;
1493        let timings = vec![ExecutionTiming::Success(Duration::from_millis(300))];
1494        observer.record_local_observations(&ptb, &timings, Duration::from_millis(300), 1);
1495
1496        // Moving average should be (1s + 2s + 0.3s) / 3 = 1.1s
1497        // This downward change should have been shared for indebted object
1498        assert_eq!(
1499            observer
1500                .local_observations
1501                .get(&key)
1502                .unwrap()
1503                .last_shared
1504                .unwrap()
1505                .0,
1506            Duration::from_millis(1100)
1507        );
1508    }
1509
1510    #[tokio::test]
1511    async fn test_stake_weighted_median() {
1512        telemetry_subscribers::init_for_testing();
1513
1514        let (committee, _) =
1515            Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1516
1517        let params = ExecutionTimeEstimateParams {
1518            stake_weighted_median_threshold: 0,
1519            ..Default::default()
1520        };
1521
1522        let mut tracker = ConsensusObservations {
1523            observations: vec![
1524                (0, Some(Duration::from_secs(1))), // 10% stake
1525                (0, Some(Duration::from_secs(2))), // 20% stake
1526                (0, Some(Duration::from_secs(3))), // 30% stake
1527                (0, Some(Duration::from_secs(4))), // 40% stake
1528            ],
1529            stake_weighted_median: None,
1530        };
1531        tracker.update_stake_weighted_median(&committee, &params);
1532        // With stake weights [10,20,30,40]:
1533        // - Duration 1 covers 10% of stake
1534        // - Duration 2 covers 30% of stake (10+20)
1535        // - Duration 3 covers 60% of stake (10+20+30)
1536        // - Duration 4 covers 100% of stake
1537        // Median should be 3 since that's where we cross 50% of stake
1538        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1539
1540        // Test duration sorting
1541        let mut tracker = ConsensusObservations {
1542            observations: vec![
1543                (0, Some(Duration::from_secs(3))), // 10% stake
1544                (0, Some(Duration::from_secs(4))), // 20% stake
1545                (0, Some(Duration::from_secs(1))), // 30% stake
1546                (0, Some(Duration::from_secs(2))), // 40% stake
1547            ],
1548            stake_weighted_median: None,
1549        };
1550        tracker.update_stake_weighted_median(&committee, &params);
1551        // With sorted stake weights [30,40,10,20]:
1552        // - Duration 1 covers 30% of stake
1553        // - Duration 2 covers 70% of stake (30+40)
1554        // - Duration 3 covers 80% of stake (30+40+10)
1555        // - Duration 4 covers 100% of stake
1556        // Median should be 2 since that's where we cross 50% of stake
1557        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(2)));
1558
1559        // Test with one missing observation
1560        let mut tracker = ConsensusObservations {
1561            observations: vec![
1562                (0, Some(Duration::from_secs(1))), // 10% stake
1563                (0, None),                         // 20% stake (missing)
1564                (0, Some(Duration::from_secs(3))), // 30% stake
1565                (0, Some(Duration::from_secs(4))), // 40% stake
1566            ],
1567            stake_weighted_median: None,
1568        };
1569        tracker.update_stake_weighted_median(&committee, &params);
1570        // With missing observation for 20% stake:
1571        // - Duration 1 covers 10% of stake
1572        // - Duration 3 covers 40% of stake (10+30)
1573        // - Duration 4 covers 80% of stake (10+30+40)
1574        // Median should be 4 since that's where we pass half of available stake (80% / 2 == 40%)
1575        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(4)));
1576
1577        // Test with multiple missing observations
1578        let mut tracker = ConsensusObservations {
1579            observations: vec![
1580                (0, Some(Duration::from_secs(1))), // 10% stake
1581                (0, Some(Duration::from_secs(2))), // 20% stake
1582                (0, None),                         // 30% stake (missing)
1583                (0, None),                         // 40% stake (missing)
1584            ],
1585            stake_weighted_median: None,
1586        };
1587        tracker.update_stake_weighted_median(&committee, &params);
1588        // With missing observations:
1589        // - Duration 1 covers 10% of stake
1590        // - Duration 2 covers 30% of stake (10+20)
1591        // Median should be 2 since that's where we cross half of available stake (40% / 2 == 20%)
1592        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(2)));
1593
1594        // Test with one observation
1595        let mut tracker = ConsensusObservations {
1596            observations: vec![
1597                (0, None),                         // 10% stake
1598                (0, None),                         // 20% stake
1599                (0, Some(Duration::from_secs(3))), // 30% stake
1600                (0, None),                         // 40% stake
1601            ],
1602            stake_weighted_median: None,
1603        };
1604        tracker.update_stake_weighted_median(&committee, &params);
1605        // With only one observation, median should be that observation
1606        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1607
1608        // Test with all same durations
1609        let mut tracker = ConsensusObservations {
1610            observations: vec![
1611                (0, Some(Duration::from_secs(5))), // 10% stake
1612                (0, Some(Duration::from_secs(5))), // 20% stake
1613                (0, Some(Duration::from_secs(5))), // 30% stake
1614                (0, Some(Duration::from_secs(5))), // 40% stake
1615            ],
1616            stake_weighted_median: None,
1617        };
1618        tracker.update_stake_weighted_median(&committee, &params);
1619        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(5)));
1620    }
1621
1622    #[tokio::test]
1623    async fn test_stake_weighted_median_threshold() {
1624        telemetry_subscribers::init_for_testing();
1625
1626        let (committee, _) =
1627            Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1628
1629        // Test with threshold requiring at least 50% stake
1630        let params = ExecutionTimeEstimateParams {
1631            stake_weighted_median_threshold: 5000,
1632            ..Default::default()
1633        };
1634
1635        // Test with insufficient stake (only 30% have observations)
1636        let mut tracker = ConsensusObservations {
1637            observations: vec![
1638                (0, Some(Duration::from_secs(1))), // 10% stake
1639                (0, Some(Duration::from_secs(2))), // 20% stake
1640                (0, None),                         // 30% stake (missing)
1641                (0, None),                         // 40% stake (missing)
1642            ],
1643            stake_weighted_median: None,
1644        };
1645        tracker.update_stake_weighted_median(&committee, &params);
1646        // Should not compute median since only 30% stake has observations (< 50% threshold)
1647        assert_eq!(tracker.stake_weighted_median, None);
1648
1649        // Test with sufficient stake (60% have observations)
1650        let mut tracker = ConsensusObservations {
1651            observations: vec![
1652                (0, Some(Duration::from_secs(1))), // 10% stake
1653                (0, Some(Duration::from_secs(2))), // 20% stake
1654                (0, Some(Duration::from_secs(3))), // 30% stake
1655                (0, None),                         // 40% stake (missing)
1656            ],
1657            stake_weighted_median: None,
1658        };
1659        tracker.update_stake_weighted_median(&committee, &params);
1660        // Should compute median since 60% stake has observations (>= 50% threshold)
1661        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1662    }
1663
1664    #[tokio::test]
1665    async fn test_execution_time_estimator() {
1666        telemetry_subscribers::init_for_testing();
1667
1668        let (committee, _) =
1669            Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1670        let mut estimator = ExecutionTimeEstimator::new(
1671            Arc::new(committee),
1672            ExecutionTimeEstimateParams {
1673                target_utilization: 50,
1674                max_estimate_us: 1_500_000,
1675
1676                // Not used in this test.
1677                allowed_txn_cost_overage_burst_limit_us: 0,
1678                randomness_scalar: 0,
1679                stored_observations_num_included_checkpoints: 10,
1680                stored_observations_limit: u64::MAX,
1681                stake_weighted_median_threshold: 0,
1682                default_none_duration_for_new_keys: true,
1683                observations_chunk_size: None,
1684            },
1685            std::iter::empty(),
1686        );
1687        // Create test keys
1688        let package = ObjectID::random();
1689        let module = "test_module".to_string();
1690        let function = "test_function".to_string();
1691        let move_key = ExecutionTimeObservationKey::MoveEntryPoint {
1692            package,
1693            module: module.clone(),
1694            function: function.clone(),
1695            type_arguments: vec![],
1696        };
1697        let transfer_key = ExecutionTimeObservationKey::TransferObjects;
1698
1699        // Record observations from different validators
1700        // First record some old observations that should be ignored
1701        estimator.process_observation_from_consensus(
1702            0,
1703            Some(1),
1704            move_key.clone(),
1705            Duration::from_millis(1000),
1706            false,
1707        );
1708        estimator.process_observation_from_consensus(
1709            1,
1710            Some(1),
1711            move_key.clone(),
1712            Duration::from_millis(1000),
1713            false,
1714        );
1715        estimator.process_observation_from_consensus(
1716            2,
1717            Some(1),
1718            move_key.clone(),
1719            Duration::from_millis(1000),
1720            false,
1721        );
1722
1723        estimator.process_observation_from_consensus(
1724            0,
1725            Some(1),
1726            transfer_key.clone(),
1727            Duration::from_millis(500),
1728            false,
1729        );
1730        estimator.process_observation_from_consensus(
1731            1,
1732            Some(1),
1733            transfer_key.clone(),
1734            Duration::from_millis(500),
1735            false,
1736        );
1737        estimator.process_observation_from_consensus(
1738            2,
1739            Some(1),
1740            transfer_key.clone(),
1741            Duration::from_millis(500),
1742            false,
1743        );
1744
1745        // Now record newer observations that should be used
1746        estimator.process_observation_from_consensus(
1747            0,
1748            Some(2),
1749            move_key.clone(),
1750            Duration::from_millis(100),
1751            false,
1752        );
1753        estimator.process_observation_from_consensus(
1754            1,
1755            Some(2),
1756            move_key.clone(),
1757            Duration::from_millis(200),
1758            false,
1759        );
1760        estimator.process_observation_from_consensus(
1761            2,
1762            Some(2),
1763            move_key.clone(),
1764            Duration::from_millis(300),
1765            false,
1766        );
1767
1768        estimator.process_observation_from_consensus(
1769            0,
1770            Some(2),
1771            transfer_key.clone(),
1772            Duration::from_millis(50),
1773            false,
1774        );
1775        estimator.process_observation_from_consensus(
1776            1,
1777            Some(2),
1778            transfer_key.clone(),
1779            Duration::from_millis(60),
1780            false,
1781        );
1782        estimator.process_observation_from_consensus(
1783            2,
1784            Some(2),
1785            transfer_key.clone(),
1786            Duration::from_millis(70),
1787            false,
1788        );
1789
1790        // Try to record old observations again - these should be ignored
1791        estimator.process_observation_from_consensus(
1792            0,
1793            Some(1),
1794            move_key.clone(),
1795            Duration::from_millis(1000),
1796            false,
1797        );
1798        estimator.process_observation_from_consensus(
1799            1,
1800            Some(1),
1801            transfer_key.clone(),
1802            Duration::from_millis(500),
1803            false,
1804        );
1805        estimator.process_observation_from_consensus(
1806            2,
1807            Some(1),
1808            move_key.clone(),
1809            Duration::from_millis(1000),
1810            false,
1811        );
1812
1813        // Test single command transaction
1814        let single_move_tx = TransactionData::new_programmable(
1815            SuiAddress::ZERO,
1816            vec![],
1817            ProgrammableTransaction {
1818                inputs: vec![],
1819                commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1820                    package,
1821                    module: module.clone(),
1822                    function: function.clone(),
1823                    type_arguments: vec![],
1824                    arguments: vec![],
1825                }))],
1826            },
1827            100,
1828            100,
1829        );
1830
1831        // Should return median of move call observations (300ms)
1832        assert_eq!(
1833            estimator.get_estimate(&single_move_tx),
1834            Duration::from_millis(300)
1835        );
1836
1837        // Test multi-command transaction
1838        let multi_command_tx = TransactionData::new_programmable(
1839            SuiAddress::ZERO,
1840            vec![],
1841            ProgrammableTransaction {
1842                inputs: vec![],
1843                commands: vec![
1844                    Command::MoveCall(Box::new(ProgrammableMoveCall {
1845                        package,
1846                        module: module.clone(),
1847                        function: function.clone(),
1848                        type_arguments: vec![],
1849                        arguments: vec![],
1850                    })),
1851                    Command::TransferObjects(
1852                        vec![Argument::Input(1), Argument::Input(2)],
1853                        Argument::Input(0),
1854                    ),
1855                ],
1856            },
1857            100,
1858            100,
1859        );
1860
1861        // Should return sum of median move call (300ms)
1862        // plus the median transfer (70ms) * command length (3)
1863        assert_eq!(
1864            estimator.get_estimate(&multi_command_tx),
1865            Duration::from_millis(510)
1866        );
1867    }
1868
1869    #[derive(Debug, Clone, Serialize, Deserialize)]
1870    struct ExecutionTimeObserverSnapshot {
1871        protocol_version: u64,
1872        consensus_observations: Vec<(ExecutionTimeObservationKey, ConsensusObservations)>,
1873        transaction_estimates: Vec<(String, Duration)>, // (transaction_description, estimated_duration)
1874    }
1875
1876    fn generate_test_inputs(
1877        seed: u64,
1878        num_validators: usize,
1879        generation_override: Option<u64>,
1880    ) -> Vec<(
1881        AuthorityIndex,
1882        Option<u64>,
1883        ExecutionTimeObservationKey,
1884        Duration,
1885    )> {
1886        let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
1887
1888        let observation_keys = vec![
1889            ExecutionTimeObservationKey::MoveEntryPoint {
1890                package: ObjectID::from_hex_literal("0x1").unwrap(),
1891                module: "coin".to_string(),
1892                function: "transfer".to_string(),
1893                type_arguments: vec![],
1894            },
1895            ExecutionTimeObservationKey::MoveEntryPoint {
1896                package: ObjectID::from_hex_literal("0x2").unwrap(),
1897                module: "nft".to_string(),
1898                function: "mint".to_string(),
1899                type_arguments: vec![],
1900            },
1901            ExecutionTimeObservationKey::TransferObjects,
1902            ExecutionTimeObservationKey::SplitCoins,
1903            ExecutionTimeObservationKey::MergeCoins,
1904            ExecutionTimeObservationKey::MakeMoveVec,
1905            ExecutionTimeObservationKey::Upgrade,
1906        ];
1907
1908        let mut inputs = Vec::new();
1909        let target_samples = 25;
1910
1911        for _ in 0..target_samples {
1912            let key = observation_keys[rng.gen_range(0..observation_keys.len())].clone();
1913            let authority_index =
1914                AuthorityIndex::try_from(rng.gen_range(0..num_validators)).unwrap();
1915
1916            // Use realistic range where newer generations might replace older ones
1917            let generation = generation_override.unwrap_or_else(|| rng.gen_range(1..=10));
1918
1919            // Generate duration based on key type with realistic variance
1920            // Sometimes generate zero values to test corner cases with byzantine validators
1921            let base_duration = if rng.gen_ratio(1, 20) {
1922                // 5% chance of zero duration to test corner cases
1923                0
1924            } else {
1925                match &key {
1926                    ExecutionTimeObservationKey::MoveEntryPoint { .. } => rng.gen_range(50..=500),
1927                    ExecutionTimeObservationKey::TransferObjects => rng.gen_range(10..=100),
1928                    ExecutionTimeObservationKey::SplitCoins => rng.gen_range(20..=80),
1929                    ExecutionTimeObservationKey::MergeCoins => rng.gen_range(15..=70),
1930                    ExecutionTimeObservationKey::MakeMoveVec => rng.gen_range(5..=30),
1931                    ExecutionTimeObservationKey::Upgrade => rng.gen_range(100..=1000),
1932                    ExecutionTimeObservationKey::Publish => rng.gen_range(200..=2000),
1933                }
1934            };
1935
1936            let duration = Duration::from_millis(base_duration);
1937
1938            inputs.push((authority_index, Some(generation), key, duration));
1939        }
1940
1941        inputs
1942    }
1943
1944    fn generate_test_transactions(seed: u64) -> Vec<(String, TransactionData)> {
1945        let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
1946        let mut transactions = Vec::new();
1947
1948        let package3 = ObjectID::from_hex_literal("0x3").unwrap();
1949        transactions.push((
1950            "coin_transfer_call".to_string(),
1951            TransactionData::new_programmable(
1952                SuiAddress::ZERO,
1953                vec![],
1954                ProgrammableTransaction {
1955                    inputs: vec![],
1956                    commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1957                        package: ObjectID::from_hex_literal("0x1").unwrap(),
1958                        module: "coin".to_string(),
1959                        function: "transfer".to_string(),
1960                        type_arguments: vec![],
1961                        arguments: vec![],
1962                    }))],
1963                },
1964                rng.gen_range(100..1000),
1965                rng.gen_range(100..1000),
1966            ),
1967        ));
1968
1969        transactions.push((
1970            "mixed_move_calls".to_string(),
1971            TransactionData::new_programmable(
1972                SuiAddress::ZERO,
1973                vec![],
1974                ProgrammableTransaction {
1975                    inputs: vec![],
1976                    commands: vec![
1977                        Command::MoveCall(Box::new(ProgrammableMoveCall {
1978                            package: ObjectID::from_hex_literal("0x1").unwrap(),
1979                            module: "coin".to_string(),
1980                            function: "transfer".to_string(),
1981                            type_arguments: vec![],
1982                            arguments: vec![],
1983                        })),
1984                        Command::MoveCall(Box::new(ProgrammableMoveCall {
1985                            package: ObjectID::from_hex_literal("0x2").unwrap(),
1986                            module: "nft".to_string(),
1987                            function: "mint".to_string(),
1988                            type_arguments: vec![],
1989                            arguments: vec![],
1990                        })),
1991                    ],
1992                },
1993                rng.gen_range(100..1000),
1994                rng.gen_range(100..1000),
1995            ),
1996        ));
1997
1998        transactions.push((
1999            "native_commands_with_observations".to_string(),
2000            TransactionData::new_programmable(
2001                SuiAddress::ZERO,
2002                vec![],
2003                ProgrammableTransaction {
2004                    inputs: vec![],
2005                    commands: vec![
2006                        Command::TransferObjects(vec![Argument::Input(0)], Argument::Input(1)),
2007                        Command::SplitCoins(Argument::Input(2), vec![Argument::Input(3)]),
2008                        Command::MergeCoins(Argument::Input(4), vec![Argument::Input(5)]),
2009                        Command::MakeMoveVec(None, vec![Argument::Input(6)]),
2010                    ],
2011                },
2012                rng.gen_range(100..1000),
2013                rng.gen_range(100..1000),
2014            ),
2015        ));
2016
2017        let num_objects = rng.gen_range(1..=5);
2018        transactions.push((
2019            format!("transfer_objects_{}_items", num_objects),
2020            TransactionData::new_programmable(
2021                SuiAddress::ZERO,
2022                vec![],
2023                ProgrammableTransaction {
2024                    inputs: vec![],
2025                    commands: vec![Command::TransferObjects(
2026                        (0..num_objects).map(Argument::Input).collect(),
2027                        Argument::Input(num_objects),
2028                    )],
2029                },
2030                rng.gen_range(100..1000),
2031                rng.gen_range(100..1000),
2032            ),
2033        ));
2034
2035        let num_amounts = rng.gen_range(1..=4);
2036        transactions.push((
2037            format!("split_coins_{}_amounts", num_amounts),
2038            TransactionData::new_programmable(
2039                SuiAddress::ZERO,
2040                vec![],
2041                ProgrammableTransaction {
2042                    inputs: vec![],
2043                    commands: vec![Command::SplitCoins(
2044                        Argument::Input(0),
2045                        (1..=num_amounts).map(Argument::Input).collect(),
2046                    )],
2047                },
2048                rng.gen_range(100..1000),
2049                rng.gen_range(100..1000),
2050            ),
2051        ));
2052
2053        let num_sources = rng.gen_range(1..=3);
2054        transactions.push((
2055            format!("merge_coins_{}_sources", num_sources),
2056            TransactionData::new_programmable(
2057                SuiAddress::ZERO,
2058                vec![],
2059                ProgrammableTransaction {
2060                    inputs: vec![],
2061                    commands: vec![Command::MergeCoins(
2062                        Argument::Input(0),
2063                        (1..=num_sources).map(Argument::Input).collect(),
2064                    )],
2065                },
2066                rng.gen_range(100..1000),
2067                rng.gen_range(100..1000),
2068            ),
2069        ));
2070
2071        let num_elements = rng.gen_range(0..=6);
2072        transactions.push((
2073            format!("make_move_vec_{}_elements", num_elements),
2074            TransactionData::new_programmable(
2075                SuiAddress::ZERO,
2076                vec![],
2077                ProgrammableTransaction {
2078                    inputs: vec![],
2079                    commands: vec![Command::MakeMoveVec(
2080                        None,
2081                        (0..num_elements).map(Argument::Input).collect(),
2082                    )],
2083                },
2084                rng.gen_range(100..1000),
2085                rng.gen_range(100..1000),
2086            ),
2087        ));
2088
2089        transactions.push((
2090            "mixed_commands".to_string(),
2091            TransactionData::new_programmable(
2092                SuiAddress::ZERO,
2093                vec![],
2094                ProgrammableTransaction {
2095                    inputs: vec![],
2096                    commands: vec![
2097                        Command::MoveCall(Box::new(ProgrammableMoveCall {
2098                            package: package3,
2099                            module: "game".to_string(),
2100                            function: "play".to_string(),
2101                            type_arguments: vec![],
2102                            arguments: vec![],
2103                        })),
2104                        Command::TransferObjects(
2105                            vec![Argument::Input(1), Argument::Input(2)],
2106                            Argument::Input(0),
2107                        ),
2108                        Command::SplitCoins(Argument::Input(3), vec![Argument::Input(4)]),
2109                    ],
2110                },
2111                rng.gen_range(100..1000),
2112                rng.gen_range(100..1000),
2113            ),
2114        ));
2115
2116        transactions.push((
2117            "upgrade_package".to_string(),
2118            TransactionData::new_programmable(
2119                SuiAddress::ZERO,
2120                vec![],
2121                ProgrammableTransaction {
2122                    inputs: vec![],
2123                    commands: vec![Command::Upgrade(
2124                        vec![],
2125                        vec![],
2126                        package3,
2127                        Argument::Input(0),
2128                    )],
2129                },
2130                rng.gen_range(100..1000),
2131                rng.gen_range(100..1000),
2132            ),
2133        ));
2134
2135        transactions
2136    }
2137
2138    // Safeguard against forking because of changes to the execution time estimator.
2139    //
2140    // Within an epoch, each estimator must reach the same conclusion about the observations and
2141    // stake_weighted_median from the observations shared by other validators, as this is used
2142    // for transaction ordering.
2143    //
2144    // Therefore; any change in the calculation of the observations or stake_weighted_median
2145    // not accompanied by a protocol version change may fork.
2146    //
2147    // This test uses snapshots of computed stake weighted median at particular protocol versions
2148    // to attempt to discover regressions that might fork.
2149    #[test]
2150    fn snapshot_tests() {
2151        println!("\n============================================================================");
2152        println!("!                                                                          !");
2153        println!("! IMPORTANT: never update snapshots from this test. only add new versions! !");
2154        println!("!                                                                          !");
2155        println!("============================================================================\n");
2156
2157        let max_version = ProtocolVersion::MAX.as_u64();
2158
2159        let test_versions: Vec<u64> = (max_version.saturating_sub(9)..=max_version).collect();
2160
2161        for version in test_versions {
2162            let protocol_version = ProtocolVersion::new(version);
2163            let protocol_config = ProtocolConfig::get_for_version(protocol_version, Chain::Unknown);
2164            let (committee, _) = Committee::new_simple_test_committee_of_size(4);
2165            let committee = Arc::new(committee);
2166
2167            let initial_generation =
2168                if let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) =
2169                    protocol_config.per_object_congestion_control_mode()
2170                {
2171                    if params.default_none_duration_for_new_keys {
2172                        None
2173                    } else {
2174                        Some(0)
2175                    }
2176                } else {
2177                    Some(0) // fallback for versions without execution time estimate mode
2178                };
2179
2180            let initial_observations =
2181                generate_test_inputs(0, committee.num_members(), initial_generation);
2182            let mut estimator = ExecutionTimeEstimator::new(
2183                committee.clone(),
2184                ExecutionTimeEstimateParams {
2185                    max_estimate_us: u64::MAX, // Allow unlimited estimates for testing
2186                    ..ExecutionTimeEstimateParams::default()
2187                },
2188                initial_observations.into_iter(),
2189            );
2190
2191            let test_inputs = generate_test_inputs(version, committee.num_members(), None);
2192
2193            for (source, generation, observation_key, duration) in test_inputs {
2194                estimator.process_observation_from_consensus(
2195                    source,
2196                    generation,
2197                    observation_key,
2198                    duration,
2199                    false,
2200                );
2201            }
2202
2203            let mut final_observations = estimator.get_observations();
2204            final_observations.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
2205
2206            let test_transactions = generate_test_transactions(version);
2207            let mut transaction_estimates = Vec::new();
2208            for (description, tx_data) in test_transactions {
2209                let estimate = estimator.get_estimate(&tx_data);
2210                transaction_estimates.push((description, estimate));
2211            }
2212
2213            let snapshot_data = ExecutionTimeObserverSnapshot {
2214                protocol_version: version,
2215                consensus_observations: final_observations.clone(),
2216                transaction_estimates,
2217            };
2218            insta::assert_yaml_snapshot!(
2219                format!("execution_time_observer_v{}", version),
2220                snapshot_data
2221            );
2222        }
2223    }
2224}