sui_core/authority/
execution_time_estimator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::HashMap,
6    hash::{Hash, Hasher},
7    num::NonZeroUsize,
8    sync::{Arc, Weak},
9    time::{Duration, SystemTime},
10};
11
12use serde::{Deserialize, Serialize};
13
14use super::authority_per_epoch_store::AuthorityPerEpochStore;
15use super::weighted_moving_average::WeightedMovingAverage;
16use crate::consensus_adapter::SubmitToConsensus;
17use governor::{Quota, RateLimiter, clock::MonotonicClock};
18use itertools::Itertools;
19use lru::LruCache;
20#[cfg(not(msim))]
21use mysten_common::in_antithesis;
22use mysten_common::{assert_reachable, debug_fatal, in_test_configuration};
23use mysten_metrics::{monitored_scope, spawn_monitored_task};
24use rand::{Rng, SeedableRng, random, rngs, thread_rng};
25use simple_moving_average::{SMA, SingleSumSMA};
26use sui_config::node::ExecutionTimeObserverConfig;
27use sui_protocol_config::{ExecutionTimeEstimateParams, PerObjectCongestionControlMode};
28use sui_types::{
29    base_types::ObjectID,
30    committee::Committee,
31    error::SuiErrorKind,
32    execution::{ExecutionTimeObservationKey, ExecutionTiming},
33    messages_consensus::{AuthorityIndex, ConsensusTransaction, ExecutionTimeObservation},
34    transaction::{
35        Command, ProgrammableTransaction, StoredExecutionTimeObservations, TransactionData,
36        TransactionDataAPI, TransactionKind,
37    },
38};
39use tokio::{sync::mpsc, time::Instant};
40use tracing::{debug, info, trace, warn};
41
42// TODO: Move this into ExecutionTimeObserverConfig, if we switch to a moving average
43// implmentation without the window size in the type.
44const SMA_LOCAL_OBSERVATION_WINDOW_SIZE: usize = 20;
45const OBJECT_UTILIZATION_METRIC_HASH_MODULUS: u8 = 32;
46
47/// Determines whether to inject synthetic execution time in Antithesis environments.
48///
49/// This function checks two conditions:
50/// 1. Whether the code is running in an Antithesis environment
51/// 2. Whether injection is enabled via the `ANTITHESIS_ENABLE_EXECUTION_TIME_INJECTION` env var
52///    (enabled by default)
53#[cfg(not(msim))]
54fn antithesis_enable_injecting_synthetic_execution_time() -> bool {
55    use std::sync::OnceLock;
56    static ENABLE_INJECTION: OnceLock<bool> = OnceLock::new();
57    *ENABLE_INJECTION.get_or_init(|| {
58        if !in_antithesis() {
59            return false;
60        }
61
62        std::env::var("ANTITHESIS_ENABLE_EXECUTION_TIME_INJECTION")
63            .map(|v| v.to_lowercase() == "true" || v == "1")
64            .unwrap_or(true)
65    })
66}
67
68// Collects local execution time estimates to share via consensus.
69pub struct ExecutionTimeObserver {
70    epoch_store: Weak<AuthorityPerEpochStore>,
71    consensus_adapter: Box<dyn SubmitToConsensus>,
72
73    protocol_params: ExecutionTimeEstimateParams,
74    config: ExecutionTimeObserverConfig,
75
76    local_observations: LruCache<ExecutionTimeObservationKey, LocalObservations>,
77
78    // For each object, tracks the amount of time above our utilization target that we spent
79    // executing transactions. This is used to decide which observations should be shared
80    // via consensus.
81    object_utilization_tracker: LruCache<ObjectID, ObjectUtilization>,
82
83    // Sorted list of recently indebted objects, updated by consensus handler.
84    indebted_objects: Vec<ObjectID>,
85
86    sharing_rate_limiter: RateLimiter<
87        governor::state::NotKeyed,
88        governor::state::InMemoryState,
89        governor::clock::MonotonicClock,
90        governor::middleware::NoOpMiddleware<
91            <governor::clock::MonotonicClock as governor::clock::Clock>::Instant,
92        >,
93    >,
94
95    next_generation_number: u64,
96}
97
98#[derive(Debug, Clone)]
99pub struct LocalObservations {
100    moving_average: SingleSumSMA<Duration, u32, SMA_LOCAL_OBSERVATION_WINDOW_SIZE>,
101    weighted_moving_average: WeightedMovingAverage,
102    last_shared: Option<(Duration, Instant)>,
103    config: ExecutionTimeObserverConfig,
104}
105
106impl LocalObservations {
107    fn new(config: ExecutionTimeObserverConfig, default_duration: Duration) -> Self {
108        let window_size = config.weighted_moving_average_window_size();
109        Self {
110            moving_average: SingleSumSMA::from_zero(Duration::ZERO),
111            weighted_moving_average: WeightedMovingAverage::new(
112                default_duration.as_micros() as u64,
113                window_size,
114            ),
115            last_shared: None,
116            config,
117        }
118    }
119
120    fn add_sample(&mut self, duration: Duration, gas_price: u64) {
121        self.moving_average.add_sample(duration);
122        self.weighted_moving_average
123            .add_sample(duration.as_micros() as u64, gas_price);
124    }
125
126    fn get_average(&self) -> Duration {
127        if self.config.enable_gas_price_weighting() {
128            Duration::from_micros(self.weighted_moving_average.get_weighted_average())
129        } else {
130            self.moving_average.get_average()
131        }
132    }
133
134    fn diff_exceeds_threshold(
135        &self,
136        new_average: Duration,
137        threshold: f64,
138        min_interval: Duration,
139    ) -> bool {
140        let Some((last_shared, last_shared_timestamp)) = self.last_shared else {
141            // Diff threshold exceeded by default if we haven't shared anything yet.
142            return true;
143        };
144
145        if last_shared_timestamp.elapsed() < min_interval {
146            return false;
147        }
148
149        if threshold >= 0.0 {
150            // Positive threshold requires upward change.
151            new_average
152                .checked_sub(last_shared)
153                .is_some_and(|diff| diff > last_shared.mul_f64(threshold))
154        } else {
155            // Negative threshold requires downward change.
156            last_shared
157                .checked_sub(new_average)
158                .is_some_and(|diff| diff > last_shared.mul_f64(-threshold))
159        }
160    }
161}
162
163#[derive(Debug, Clone)]
164pub struct ObjectUtilization {
165    excess_execution_time: Duration,
166    last_measured: Option<Instant>,
167    was_overutilized: bool, // true if the object has ever had excess_execution_time
168}
169
170impl ObjectUtilization {
171    pub fn overutilized(&self, config: &ExecutionTimeObserverConfig) -> bool {
172        self.excess_execution_time > config.observation_sharing_object_utilization_threshold()
173    }
174}
175
176// Tracks local execution time observations and shares them via consensus.
177impl ExecutionTimeObserver {
178    pub fn spawn(
179        epoch_store: Arc<AuthorityPerEpochStore>,
180        consensus_adapter: Box<dyn SubmitToConsensus>,
181        config: ExecutionTimeObserverConfig,
182    ) {
183        let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) = epoch_store
184            .protocol_config()
185            .per_object_congestion_control_mode()
186        else {
187            info!(
188                "ExecutionTimeObserver disabled because per-object congestion control mode is not ExecutionTimeEstimate"
189            );
190            return;
191        };
192
193        let (tx_local_execution_time, mut rx_local_execution_time) =
194            mpsc::channel(config.observation_channel_capacity().into());
195        let (tx_object_debts, mut rx_object_debts) =
196            mpsc::channel(config.object_debt_channel_capacity().into());
197        epoch_store.set_local_execution_time_channels(tx_local_execution_time, tx_object_debts);
198
199        // TODO: pre-populate local observations with stored data from prior epoch.
200        let mut observer = Self {
201            epoch_store: Arc::downgrade(&epoch_store),
202            consensus_adapter,
203            local_observations: LruCache::new(config.observation_cache_size()),
204            object_utilization_tracker: LruCache::new(config.object_utilization_cache_size()),
205            indebted_objects: Vec::new(),
206            sharing_rate_limiter: RateLimiter::direct_with_clock(
207                Quota::per_second(config.observation_sharing_rate_limit())
208                    .allow_burst(config.observation_sharing_burst_limit()),
209                &MonotonicClock,
210            ),
211            protocol_params,
212            config,
213            next_generation_number: SystemTime::now()
214                .duration_since(std::time::UNIX_EPOCH)
215                .expect("Sui did not exist prior to 1970")
216                .as_micros()
217                .try_into()
218                .expect("This build of sui is not supported in the year 500,000"),
219        };
220        spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
221            loop {
222                tokio::select! {
223                    // TODO: add metrics for messages received.
224                    Some(object_debts) = rx_object_debts.recv() => {
225                        observer.update_indebted_objects(object_debts);
226                    }
227                    Some((tx, timings, total_duration, gas_price)) = rx_local_execution_time.recv() => {
228                        observer
229                            .record_local_observations(&tx, &timings, total_duration, gas_price);
230                    }
231                    else => { break }
232                }
233            }
234            info!("shutting down ExecutionTimeObserver");
235        }));
236    }
237
238    #[cfg(test)]
239    fn new_for_testing(
240        epoch_store: Arc<AuthorityPerEpochStore>,
241        consensus_adapter: Box<dyn SubmitToConsensus>,
242        observation_sharing_object_utilization_threshold: Duration,
243        enable_gas_price_weighting: bool,
244    ) -> Self {
245        let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) = epoch_store
246            .protocol_config()
247            .per_object_congestion_control_mode()
248        else {
249            panic!(
250                "tried to construct test ExecutionTimeObserver when congestion control mode is not ExecutionTimeEstimate"
251            );
252        };
253        Self {
254            epoch_store: Arc::downgrade(&epoch_store),
255            consensus_adapter,
256            protocol_params,
257            config: ExecutionTimeObserverConfig {
258                observation_sharing_object_utilization_threshold: Some(
259                    observation_sharing_object_utilization_threshold,
260                ),
261                enable_gas_price_weighting: Some(enable_gas_price_weighting),
262                ..ExecutionTimeObserverConfig::default()
263            },
264            local_observations: LruCache::new(NonZeroUsize::new(10000).unwrap()),
265            object_utilization_tracker: LruCache::new(NonZeroUsize::new(50000).unwrap()),
266            indebted_objects: Vec::new(),
267            sharing_rate_limiter: RateLimiter::direct_with_clock(
268                Quota::per_hour(std::num::NonZeroU32::MAX),
269                &MonotonicClock,
270            ),
271            next_generation_number: SystemTime::now()
272                .duration_since(std::time::UNIX_EPOCH)
273                .expect("Sui did not exist prior to 1970")
274                .as_micros()
275                .try_into()
276                .expect("This build of sui is not supported in the year 500,000"),
277        }
278    }
279
280    // Used by execution to report observed per-entry-point execution times to the estimator.
281    // Updates moving averages and submits observation to consensus if local observation differs
282    // from consensus median.
283    // TODO: Consider more detailed heuristic to account for overhead outside of commands.
284    fn record_local_observations(
285        &mut self,
286        tx: &ProgrammableTransaction,
287        timings: &[ExecutionTiming],
288        total_duration: Duration,
289        gas_price: u64,
290    ) {
291        let _scope = monitored_scope("ExecutionTimeObserver::record_local_observations");
292
293        // Simulate timing in test contexts to trigger congestion control.
294        #[cfg(msim)]
295        let should_inject = self.config.inject_synthetic_execution_time();
296        #[cfg(not(msim))]
297        let should_inject = antithesis_enable_injecting_synthetic_execution_time();
298
299        if should_inject {
300            let (generated_timings, generated_duration) = self.generate_test_timings(tx, timings);
301            self.record_local_observations_timing(
302                tx,
303                &generated_timings,
304                generated_duration,
305                gas_price,
306            )
307        } else {
308            self.record_local_observations_timing(tx, timings, total_duration, gas_price)
309        }
310    }
311
312    fn record_local_observations_timing(
313        &mut self,
314        tx: &ProgrammableTransaction,
315        timings: &[ExecutionTiming],
316        total_duration: Duration,
317        gas_price: u64,
318    ) {
319        let Some(epoch_store) = self.epoch_store.upgrade() else {
320            debug!("epoch is ending, dropping execution time observation");
321            return;
322        };
323        let timings = if timings.len() > tx.commands.len() {
324            warn!(
325                executed_commands = timings.len(),
326                original_commands = tx.commands.len(),
327                "execution produced more timings than the original PTB commands; using the trailing timings for local execution-time observations"
328            );
329            &timings[timings.len() - tx.commands.len()..]
330        } else {
331            timings
332        };
333
334        let mut uses_indebted_object = false;
335
336        // Update the accumulated excess execution time for shared object
337        // used for exclusive access in this transaction, and determine the max overage.
338        let max_excess_per_object_execution_time = tx
339            .shared_input_objects()
340            .filter_map(|obj| obj.is_accessed_exclusively().then_some(obj.id))
341            .map(|id| {
342                // Mark if any object used in the tx is indebted.
343                if !uses_indebted_object && self.indebted_objects.binary_search(&id).is_ok() {
344                    uses_indebted_object = true;
345                }
346
347                // For each object:
348                // - add the execution time of the current transaction to the tracker
349                // - subtract the maximum amount of time available for execution according
350                //   to our utilization target since the last report was received
351                //   (clamping to zero)
352                //
353                // What remains is the amount of excess time spent executing transactions on
354                // the object above the intended limit. If this value is greater than zero,
355                // it means the object is overutilized.
356                let now = Instant::now();
357                let utilization =
358                    self.object_utilization_tracker
359                        .get_or_insert_mut(id, || ObjectUtilization {
360                            excess_execution_time: Duration::ZERO,
361                            last_measured: None,
362                            was_overutilized: false,
363                        });
364                let overutilized_at_start = utilization.overutilized(&self.config);
365                utilization.excess_execution_time += total_duration;
366                utilization.excess_execution_time =
367                    utilization.excess_execution_time.saturating_sub(
368                        utilization
369                            .last_measured
370                            .map(|last_measured| {
371                                now.duration_since(last_measured)
372                                    .mul_f64(self.protocol_params.target_utilization as f64 / 100.0)
373                            })
374                            .unwrap_or(Duration::MAX),
375                    );
376                utilization.last_measured = Some(now);
377                if utilization.overutilized(&self.config) {
378                    utilization.was_overutilized = true;
379                }
380
381                // Update overutilized objects metrics.
382                if !overutilized_at_start && utilization.overutilized(&self.config) {
383                    trace!("object {id:?} is overutilized");
384                    epoch_store
385                        .metrics
386                        .epoch_execution_time_observer_overutilized_objects
387                        .inc();
388                } else if overutilized_at_start && !utilization.overutilized(&self.config) {
389                    epoch_store
390                        .metrics
391                        .epoch_execution_time_observer_overutilized_objects
392                        .dec();
393                }
394                if utilization.was_overutilized {
395                    let key = if self.config.report_object_utilization_metric_with_full_id() {
396                        id.to_string()
397                    } else {
398                        let key_lsb = id.into_bytes()[ObjectID::LENGTH - 1];
399                        let hash = key_lsb % OBJECT_UTILIZATION_METRIC_HASH_MODULUS;
400                        format!("{:x}", hash)
401                    };
402
403                    epoch_store
404                        .metrics
405                        .epoch_execution_time_observer_object_utilization
406                        .with_label_values(&[key.as_str()])
407                        .inc_by(total_duration.as_secs_f64());
408                }
409
410                utilization.excess_execution_time
411            })
412            .max()
413            .unwrap_or(Duration::ZERO);
414        epoch_store
415            .metrics
416            .epoch_execution_time_observer_utilization_cache_size
417            .set(self.object_utilization_tracker.len() as i64);
418
419        let total_command_duration: Duration = timings.iter().map(|t| t.duration()).sum();
420        let extra_overhead = total_duration - total_command_duration;
421
422        let mut to_share = Vec::with_capacity(tx.commands.len());
423        for (i, timing) in timings.iter().enumerate() {
424            let command = &tx.commands[i];
425
426            // Special-case handling for Publish command: only use hard-coded default estimate.
427            if matches!(command, Command::Publish(_, _)) {
428                continue;
429            }
430
431            // TODO: Consider using failure/success information in computing estimates.
432            let mut command_duration = timing.duration();
433
434            // Distribute overhead proportionally to each command's measured duration.
435            let overhead_factor = if total_command_duration > Duration::ZERO {
436                command_duration.as_secs_f64() / total_command_duration.as_secs_f64()
437            } else {
438                // divisor here must be >0 or this loop would not be running at all
439                1.0 / (tx.commands.len() as f64)
440            };
441            command_duration += extra_overhead.mul_f64(overhead_factor);
442
443            // For native commands, adjust duration by length of command's inputs/outputs.
444            // This is sort of arbitrary, but hopefully works okay as a heuristic.
445            command_duration = command_duration.div_f64(command_length(command).get() as f64);
446
447            // Update gas-weighted moving-average observation for the command.
448            let key = ExecutionTimeObservationKey::from_command(command);
449            let local_observation = self.local_observations.get_or_insert_mut(key.clone(), || {
450                LocalObservations::new(self.config.clone(), Duration::ZERO)
451            });
452            local_observation.add_sample(command_duration, gas_price);
453
454            // Send a new observation through consensus if:
455            // - our current moving average differs too much from the last one we shared, and
456            // - the tx has at least one mutable shared object with utilization that's too high
457            // TODO: Consider only sharing observations that disagree with consensus estimate.
458            let new_average = local_observation.get_average();
459            let mut should_share = false;
460
461            // Share upward adjustments if an object is overutilized.
462            if max_excess_per_object_execution_time
463                >= self
464                    .config
465                    .observation_sharing_object_utilization_threshold()
466                && local_observation.diff_exceeds_threshold(
467                    new_average,
468                    self.config.observation_sharing_diff_threshold(),
469                    self.config.observation_sharing_min_interval(),
470                )
471            {
472                should_share = true;
473                epoch_store
474                    .metrics
475                    .epoch_execution_time_observations_sharing_reason
476                    .with_label_values(&["utilization"])
477                    .inc();
478            };
479
480            // Share downward adjustments if an object is indebted.
481            if uses_indebted_object
482                && local_observation.diff_exceeds_threshold(
483                    new_average,
484                    -self.config.observation_sharing_diff_threshold(),
485                    self.config.observation_sharing_min_interval(),
486                )
487            {
488                should_share = true;
489                epoch_store
490                    .metrics
491                    .epoch_execution_time_observations_sharing_reason
492                    .with_label_values(&["indebted"])
493                    .inc();
494            }
495
496            if should_share {
497                debug!("sharing new execution time observation for {key:?}: {new_average:?}");
498                to_share.push((key, new_average));
499                local_observation.last_shared = Some((new_average, Instant::now()));
500            }
501        }
502
503        // Share new observations.
504        self.share_observations(to_share);
505    }
506
507    fn generate_test_timings(
508        &self,
509        tx: &ProgrammableTransaction,
510        timings: &[ExecutionTiming],
511    ) -> (Vec<ExecutionTiming>, Duration) {
512        #[allow(clippy::disallowed_methods)]
513        let generated_timings: Vec<_> = tx
514            .commands
515            .iter()
516            // TODO: migrate to zip_debug_eq once PR #26125 fixes the timings/commands length mismatch
517            .zip(timings.iter())
518            .map(|(command, timing)| {
519                let key = ExecutionTimeObservationKey::from_command(command);
520                let duration = self.get_test_duration(&key);
521                if timing.is_abort() {
522                    ExecutionTiming::Abort(duration)
523                } else {
524                    ExecutionTiming::Success(duration)
525                }
526            })
527            .collect();
528
529        let total_duration = generated_timings
530            .iter()
531            .map(|t| t.duration())
532            .sum::<Duration>()
533            + thread_rng().gen_range(Duration::from_millis(10)..Duration::from_millis(50));
534
535        (generated_timings, total_duration)
536    }
537
538    fn get_test_duration(&self, key: &ExecutionTimeObservationKey) -> Duration {
539        #[cfg(msim)]
540        let should_inject = self.config.inject_synthetic_execution_time();
541        #[cfg(not(msim))]
542        let should_inject = false;
543
544        if !in_test_configuration() && !should_inject {
545            panic!("get_test_duration called in non-test configuration");
546        }
547
548        thread_local! {
549            static PER_TEST_SEED: u64 = random::<u64>();
550        }
551
552        let mut hasher = std::collections::hash_map::DefaultHasher::new();
553
554        let checkpoint_digest_used = self
555            .epoch_store
556            .upgrade()
557            .and_then(|store| {
558                store
559                    .get_lowest_non_genesis_checkpoint_summary()
560                    .ok()
561                    .flatten()
562            })
563            .map(|summary| summary.content_digest.hash(&mut hasher))
564            .is_some();
565
566        if !checkpoint_digest_used {
567            PER_TEST_SEED.with(|seed| seed.hash(&mut hasher));
568        }
569
570        key.hash(&mut hasher);
571        let mut rng = rngs::StdRng::seed_from_u64(hasher.finish());
572        rng.gen_range(Duration::from_millis(100)..Duration::from_millis(600))
573    }
574
575    fn share_observations(&mut self, to_share: Vec<(ExecutionTimeObservationKey, Duration)>) {
576        if to_share.is_empty() {
577            return;
578        }
579        let Some(epoch_store) = self.epoch_store.upgrade() else {
580            debug!("epoch is ending, dropping execution time observation");
581            return;
582        };
583
584        let num_observations = to_share.len() as u64;
585
586        // Enforce global observation-sharing rate limit.
587        if let Err(e) = self.sharing_rate_limiter.check() {
588            epoch_store
589                .metrics
590                .epoch_execution_time_observations_dropped
591                .with_label_values(&["global_rate_limit"])
592                .inc_by(num_observations);
593            debug!("rate limit exceeded, dropping execution time observation; {e:?}");
594            return;
595        }
596
597        let epoch_store = epoch_store.clone();
598        let transaction = ConsensusTransaction::new_execution_time_observation(
599            ExecutionTimeObservation::new(epoch_store.name, self.next_generation_number, to_share),
600        );
601        self.next_generation_number += 1;
602
603        if let Err(e) = self.consensus_adapter.submit_best_effort(
604            &transaction,
605            &epoch_store,
606            Duration::from_secs(5),
607        ) {
608            if !matches!(e.as_inner(), SuiErrorKind::EpochEnded(_)) {
609                epoch_store
610                    .metrics
611                    .epoch_execution_time_observations_dropped
612                    .with_label_values(&["submit_to_consensus"])
613                    .inc_by(num_observations);
614                warn!("failed to submit execution time observation: {e:?}");
615            }
616        } else {
617            // Note: it is not actually guaranteed that the observation has been submitted at this point,
618            // but that is also not true with ConsensusAdapter::submit_to_consensus. The only way to know
619            // for sure is to observe that the message is processed by consensus handler.
620            assert_reachable!("successfully shares execution time observations");
621            epoch_store
622                .metrics
623                .epoch_execution_time_observations_shared
624                .inc_by(num_observations);
625        }
626    }
627
628    fn update_indebted_objects(&mut self, mut object_debts: Vec<ObjectID>) {
629        let _scope = monitored_scope("ExecutionTimeObserver::update_indebted_objects");
630
631        let Some(epoch_store) = self.epoch_store.upgrade() else {
632            debug!("epoch is ending, dropping indebted object update");
633            return;
634        };
635
636        object_debts.sort_unstable();
637        object_debts.dedup();
638        self.indebted_objects = object_debts;
639        epoch_store
640            .metrics
641            .epoch_execution_time_observer_indebted_objects
642            .set(self.indebted_objects.len() as i64);
643    }
644}
645
646// Key used to save StoredExecutionTimeObservations in the Sui system state object's
647// `extra_fields` Bag.
648pub const EXTRA_FIELD_EXECUTION_TIME_ESTIMATES_KEY: u64 = 0;
649
650// Key used to save the chunk count for chunked execution time observations
651pub const EXTRA_FIELD_EXECUTION_TIME_ESTIMATES_CHUNK_COUNT_KEY: u64 = 1;
652
653// Tracks global execution time observations provided by validators from consensus
654// and computes deterministic per-command estimates for use in congestion control.
655pub struct ExecutionTimeEstimator {
656    committee: Arc<Committee>,
657    protocol_params: ExecutionTimeEstimateParams,
658
659    consensus_observations: HashMap<ExecutionTimeObservationKey, ConsensusObservations>,
660}
661
662#[derive(Debug, Clone, Serialize, Deserialize)]
663pub struct ConsensusObservations {
664    observations: Vec<(u64 /* generation */, Option<Duration>)>, // keyed by authority index
665    stake_weighted_median: Option<Duration>,                     // cached value
666}
667
668impl ConsensusObservations {
669    fn update_stake_weighted_median(
670        &mut self,
671        committee: &Committee,
672        config: &ExecutionTimeEstimateParams,
673    ) {
674        let mut stake_with_observations = 0;
675        let sorted_observations: Vec<_> = self
676            .observations
677            .iter()
678            .enumerate()
679            .filter_map(|(i, (_, duration))| {
680                duration.map(|duration| {
681                    let authority_index: AuthorityIndex = i.try_into().unwrap();
682                    stake_with_observations += committee.stake_by_index(authority_index).unwrap();
683                    (duration, authority_index)
684                })
685            })
686            .sorted()
687            .collect();
688
689        // Don't use observations until we have received enough.
690        if stake_with_observations < config.stake_weighted_median_threshold {
691            self.stake_weighted_median = None;
692            return;
693        }
694
695        // Compute stake-weighted median.
696        let median_stake = stake_with_observations / 2;
697        let mut running_stake = 0;
698        for (duration, authority_index) in sorted_observations {
699            running_stake += committee.stake_by_index(authority_index).unwrap();
700            if running_stake > median_stake {
701                self.stake_weighted_median = Some(duration);
702                break;
703            }
704        }
705    }
706}
707
708impl ExecutionTimeEstimator {
709    pub fn new(
710        committee: Arc<Committee>,
711        protocol_params: ExecutionTimeEstimateParams,
712        initial_observations: impl Iterator<
713            Item = (
714                AuthorityIndex,
715                Option<u64>,
716                ExecutionTimeObservationKey,
717                Duration,
718            ),
719        >,
720    ) -> Self {
721        let mut estimator = Self {
722            committee,
723            protocol_params,
724            consensus_observations: HashMap::new(),
725        };
726        for (source, generation, key, duration) in initial_observations {
727            estimator.process_observation_from_consensus(
728                source,
729                generation,
730                key.to_owned(),
731                duration,
732                true,
733            );
734        }
735        for observation in estimator.consensus_observations.values_mut() {
736            observation
737                .update_stake_weighted_median(&estimator.committee, &estimator.protocol_params);
738        }
739        estimator
740    }
741
742    #[cfg(test)]
743    pub fn new_for_testing() -> Self {
744        let (committee, _) = Committee::new_simple_test_committee_of_size(1);
745        Self {
746            committee: Arc::new(committee),
747            protocol_params: ExecutionTimeEstimateParams {
748                target_utilization: 100,
749                max_estimate_us: u64::MAX,
750                ..ExecutionTimeEstimateParams::default()
751            },
752            consensus_observations: HashMap::new(),
753        }
754    }
755
756    pub fn process_observations_from_consensus(
757        &mut self,
758        source: AuthorityIndex,
759        generation: Option<u64>,
760        observations: &[(ExecutionTimeObservationKey, Duration)],
761    ) {
762        for (key, duration) in observations {
763            self.process_observation_from_consensus(
764                source,
765                generation,
766                key.to_owned(),
767                *duration,
768                false,
769            );
770        }
771    }
772
773    fn process_observation_from_consensus(
774        &mut self,
775        source: AuthorityIndex,
776        generation: Option<u64>,
777        observation_key: ExecutionTimeObservationKey,
778        duration: Duration,
779        skip_update: bool,
780    ) {
781        if matches!(observation_key, ExecutionTimeObservationKey::Publish) {
782            // Special-case handling for Publish command: only use hard-coded default estimate.
783            warn!(
784                "dropping Publish observation received from possibly-Byzanitine authority {source}"
785            );
786            return;
787        }
788
789        assert_reachable!("receives some valid execution time observations");
790
791        let observations = self
792            .consensus_observations
793            .entry(observation_key)
794            .or_insert_with(|| {
795                let len = self.committee.num_members();
796                let mut empty_observations = Vec::with_capacity(len);
797                empty_observations.resize(len, (0, None));
798                ConsensusObservations {
799                    observations: empty_observations,
800                    stake_weighted_median: if self
801                        .protocol_params
802                        .default_none_duration_for_new_keys
803                    {
804                        None
805                    } else {
806                        Some(Duration::ZERO)
807                    },
808                }
809            });
810
811        let (obs_generation, obs_duration) =
812            &mut observations.observations[TryInto::<usize>::try_into(source).unwrap()];
813        if generation.is_some_and(|generation| *obs_generation >= generation) {
814            // Ignore outdated observation.
815            return;
816        }
817        *obs_generation = generation.unwrap_or(0);
818        *obs_duration = Some(duration);
819        if !skip_update {
820            observations.update_stake_weighted_median(&self.committee, &self.protocol_params);
821        }
822    }
823
824    pub fn get_estimate(&self, tx: &TransactionData) -> Duration {
825        let TransactionKind::ProgrammableTransaction(tx) = tx.kind() else {
826            debug_fatal!("get_estimate called on non-ProgrammableTransaction");
827            return Duration::ZERO;
828        };
829        tx.commands
830            .iter()
831            .map(|command| {
832                let key = ExecutionTimeObservationKey::from_command(command);
833                self.consensus_observations
834                    .get(&key)
835                    .and_then(|obs| obs.stake_weighted_median)
836                    .unwrap_or_else(|| key.default_duration())
837                    // For native commands, adjust duration by length of command's inputs/outputs.
838                    // This is sort of arbitrary, but hopefully works okay as a heuristic.
839                    .mul_f64(command_length(command).get() as f64)
840            })
841            .sum::<Duration>()
842            .min(Duration::from_micros(self.protocol_params.max_estimate_us))
843    }
844
845    pub fn take_observations(&mut self) -> StoredExecutionTimeObservations {
846        StoredExecutionTimeObservations::V1(
847            self.consensus_observations
848                .drain()
849                .map(|(key, observations)| {
850                    let observations = observations
851                        .observations
852                        .into_iter()
853                        .enumerate()
854                        .filter_map(|(idx, (_, duration))| {
855                            duration.map(|d| {
856                                (
857                                    self.committee
858                                        .authority_by_index(idx.try_into().unwrap())
859                                        .cloned()
860                                        .unwrap(),
861                                    d,
862                                )
863                            })
864                        })
865                        .collect();
866                    (key, observations)
867                })
868                .collect(),
869        )
870    }
871
872    pub fn get_observations(&self) -> Vec<(ExecutionTimeObservationKey, ConsensusObservations)> {
873        self.consensus_observations
874            .iter()
875            .map(|(key, observations)| (key.clone(), observations.clone()))
876            .collect()
877    }
878}
879
880fn command_length(command: &Command) -> NonZeroUsize {
881    // Commands with variable-length inputs/outputs are reported as +1
882    // to account for fixed overhead and prevent divide-by-zero.
883    NonZeroUsize::new(match command {
884        Command::MoveCall(_) => 1,
885        Command::TransferObjects(src, _) => src.len() + 1,
886        Command::SplitCoins(_, amts) => amts.len() + 1,
887        Command::MergeCoins(_, src) => src.len() + 1,
888        Command::Publish(_, _) => 1,
889        Command::MakeMoveVec(_, src) => src.len() + 1,
890        Command::Upgrade(_, _, _, _) => 1,
891    })
892    .unwrap()
893}
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898    use crate::authority::test_authority_builder::TestAuthorityBuilder;
899    use crate::checkpoints::CheckpointStore;
900    use crate::consensus_adapter::{
901        ConsensusAdapter, ConsensusAdapterMetrics, MockConsensusClient,
902    };
903    use sui_protocol_config::ProtocolConfig;
904    use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress};
905    use sui_types::transaction::{
906        Argument, CallArg, ObjectArg, ProgrammableMoveCall, SharedObjectMutability,
907    };
908    use {
909        rand::{Rng, SeedableRng},
910        sui_protocol_config::ProtocolVersion,
911        sui_types::supported_protocol_versions::Chain,
912    };
913
914    #[tokio::test]
915    async fn test_record_local_observations() {
916        telemetry_subscribers::init_for_testing();
917
918        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
919            config.set_per_object_congestion_control_mode_for_testing(
920                PerObjectCongestionControlMode::ExecutionTimeEstimate(
921                    ExecutionTimeEstimateParams {
922                        target_utilization: 100,
923                        allowed_txn_cost_overage_burst_limit_us: 0,
924                        randomness_scalar: 100,
925                        max_estimate_us: u64::MAX,
926                        stored_observations_num_included_checkpoints: 10,
927                        stored_observations_limit: u64::MAX,
928                        stake_weighted_median_threshold: 0,
929                        default_none_duration_for_new_keys: true,
930                        observations_chunk_size: None,
931                    },
932                ),
933            );
934            config
935        });
936
937        let mock_consensus_client = MockConsensusClient::new();
938        let authority = TestAuthorityBuilder::new().build().await;
939        let epoch_store = authority.epoch_store_for_testing();
940        let consensus_adapter = Arc::new(ConsensusAdapter::new(
941            Arc::new(mock_consensus_client),
942            CheckpointStore::new_for_tests(),
943            authority.name,
944            100_000,
945            100_000,
946            ConsensusAdapterMetrics::new_test(),
947            Arc::new(tokio::sync::Notify::new()),
948        ));
949        let mut observer = ExecutionTimeObserver::new_for_testing(
950            epoch_store.clone(),
951            Box::new(consensus_adapter.clone()),
952            Duration::ZERO, // disable object utilization thresholds for this test
953            false,          // disable gas price weighting for this test
954        );
955
956        // Create a simple PTB with one move call
957        let package = ObjectID::random();
958        let module = "test_module".to_string();
959        let function = "test_function".to_string();
960        let ptb = ProgrammableTransaction {
961            inputs: vec![],
962            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
963                package,
964                module: module.clone(),
965                function: function.clone(),
966                type_arguments: vec![],
967                arguments: vec![],
968            }))],
969        };
970
971        // Record an observation
972        let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
973        let total_duration = Duration::from_millis(110);
974        observer.record_local_observations(&ptb, &timings, total_duration, 1);
975
976        let key = ExecutionTimeObservationKey::MoveEntryPoint {
977            package,
978            module: module.clone(),
979            function: function.clone(),
980            type_arguments: vec![],
981        };
982
983        // Check that local observation was recorded and shared
984        let local_obs = observer.local_observations.get(&key).unwrap();
985        assert_eq!(
986            local_obs.get_average(),
987            // 10ms overhead should be entirely apportioned to the one command in the PTB
988            Duration::from_millis(110)
989        );
990        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
991
992        // Record another observation
993        let timings = vec![ExecutionTiming::Success(Duration::from_millis(110))];
994        let total_duration = Duration::from_millis(120);
995        observer.record_local_observations(&ptb, &timings, total_duration, 1);
996
997        // Check that moving average was updated
998        let local_obs = observer.local_observations.get(&key).unwrap();
999        assert_eq!(
1000            local_obs.get_average(),
1001            // average of 110ms and 120ms observations
1002            Duration::from_millis(115)
1003        );
1004        // new 115ms average should not be shared; it's <5% different from 110ms
1005        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1006
1007        // Record another observation
1008        let timings = vec![ExecutionTiming::Success(Duration::from_millis(120))];
1009        let total_duration = Duration::from_millis(130);
1010        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1011
1012        // Check that moving average was updated
1013        let local_obs = observer.local_observations.get(&key).unwrap();
1014        assert_eq!(
1015            local_obs.get_average(),
1016            // average of [110ms, 120ms, 130ms]
1017            Duration::from_millis(120)
1018        );
1019        // new 120ms average should not be shared; it's >5% different from 110ms,
1020        // but not enough time has passed
1021        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1022
1023        // Manually update last-shared time to long ago
1024        observer
1025            .local_observations
1026            .get_mut(&key)
1027            .unwrap()
1028            .last_shared = Some((
1029            Duration::from_millis(110),
1030            Instant::now() - Duration::from_secs(60),
1031        ));
1032
1033        // Record last observation
1034        let timings = vec![ExecutionTiming::Success(Duration::from_millis(120))];
1035        let total_duration = Duration::from_millis(160);
1036        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1037
1038        // Verify that moving average is the same and a new observation was shared, as
1039        // enough time has now elapsed
1040        let local_obs = observer.local_observations.get(&key).unwrap();
1041        assert_eq!(
1042            local_obs.get_average(),
1043            // average of [110ms, 120ms, 130ms, 160ms]
1044            Duration::from_millis(130)
1045        );
1046        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(130));
1047    }
1048
1049    #[tokio::test]
1050    async fn test_record_local_observations_with_gas_price_weighting() {
1051        telemetry_subscribers::init_for_testing();
1052
1053        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1054            config.set_per_object_congestion_control_mode_for_testing(
1055                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1056                    ExecutionTimeEstimateParams {
1057                        target_utilization: 100,
1058                        allowed_txn_cost_overage_burst_limit_us: 0,
1059                        randomness_scalar: 100,
1060                        max_estimate_us: u64::MAX,
1061                        stored_observations_num_included_checkpoints: 10,
1062                        stored_observations_limit: u64::MAX,
1063                        stake_weighted_median_threshold: 0,
1064                        default_none_duration_for_new_keys: true,
1065                        observations_chunk_size: None,
1066                    },
1067                ),
1068            );
1069            config
1070        });
1071
1072        let mock_consensus_client = MockConsensusClient::new();
1073        let authority = TestAuthorityBuilder::new().build().await;
1074        let epoch_store = authority.epoch_store_for_testing();
1075        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1076            Arc::new(mock_consensus_client),
1077            CheckpointStore::new_for_tests(),
1078            authority.name,
1079            100_000,
1080            100_000,
1081            ConsensusAdapterMetrics::new_test(),
1082            Arc::new(tokio::sync::Notify::new()),
1083        ));
1084        let mut observer = ExecutionTimeObserver::new_for_testing(
1085            epoch_store.clone(),
1086            Box::new(consensus_adapter.clone()),
1087            Duration::ZERO, // disable object utilization thresholds for this test
1088            true,           // enable gas price weighting for this test
1089        );
1090
1091        // Create a simple PTB with one move call
1092        let package = ObjectID::random();
1093        let module = "test_module".to_string();
1094        let function = "test_function".to_string();
1095        let ptb = ProgrammableTransaction {
1096            inputs: vec![],
1097            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1098                package,
1099                module: module.clone(),
1100                function: function.clone(),
1101                type_arguments: vec![],
1102                arguments: vec![],
1103            }))],
1104        };
1105
1106        // Record an observation
1107        let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
1108        let total_duration = Duration::from_millis(110);
1109        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1110
1111        let key = ExecutionTimeObservationKey::MoveEntryPoint {
1112            package,
1113            module: module.clone(),
1114            function: function.clone(),
1115            type_arguments: vec![],
1116        };
1117
1118        // Check that local observation was recorded and shared
1119        let local_obs = observer.local_observations.get(&key).unwrap();
1120        assert_eq!(
1121            local_obs.get_average(),
1122            // 10ms overhead should be entirely apportioned to the one command in the PTB
1123            Duration::from_millis(110)
1124        );
1125        assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1126
1127        // Record another observation
1128        let timings = vec![ExecutionTiming::Success(Duration::from_millis(110))];
1129        let total_duration = Duration::from_millis(120);
1130        observer.record_local_observations(&ptb, &timings, total_duration, 2);
1131
1132        // Check that weighted moving average was updated
1133        let local_obs = observer.local_observations.get(&key).unwrap();
1134        assert_eq!(
1135            local_obs.get_average(),
1136            // Our local observation averages are weighted by gas price:
1137            // 110ms * 1 + 110ms * 2 / (1 + 2) = 116.666ms
1138            Duration::from_micros(116_666)
1139        );
1140    }
1141
1142    #[tokio::test]
1143    async fn test_record_local_observations_with_multiple_commands() {
1144        telemetry_subscribers::init_for_testing();
1145
1146        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1147            config.set_per_object_congestion_control_mode_for_testing(
1148                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1149                    ExecutionTimeEstimateParams {
1150                        target_utilization: 100,
1151                        allowed_txn_cost_overage_burst_limit_us: 0,
1152                        randomness_scalar: 0,
1153                        max_estimate_us: u64::MAX,
1154                        stored_observations_num_included_checkpoints: 10,
1155                        stored_observations_limit: u64::MAX,
1156                        stake_weighted_median_threshold: 0,
1157                        default_none_duration_for_new_keys: true,
1158                        observations_chunk_size: None,
1159                    },
1160                ),
1161            );
1162            config
1163        });
1164
1165        let mock_consensus_client = MockConsensusClient::new();
1166        let authority = TestAuthorityBuilder::new().build().await;
1167        let epoch_store = authority.epoch_store_for_testing();
1168        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1169            Arc::new(mock_consensus_client),
1170            CheckpointStore::new_for_tests(),
1171            authority.name,
1172            100_000,
1173            100_000,
1174            ConsensusAdapterMetrics::new_test(),
1175            Arc::new(tokio::sync::Notify::new()),
1176        ));
1177        let mut observer = ExecutionTimeObserver::new_for_testing(
1178            epoch_store.clone(),
1179            Box::new(consensus_adapter.clone()),
1180            Duration::ZERO, // disable object utilization thresholds for this test
1181            false,          // disable gas price weighting for this test
1182        );
1183
1184        // Create a PTB with multiple commands.
1185        let package = ObjectID::random();
1186        let module = "test_module".to_string();
1187        let function = "test_function".to_string();
1188        let ptb = ProgrammableTransaction {
1189            inputs: vec![],
1190            commands: vec![
1191                Command::MoveCall(Box::new(ProgrammableMoveCall {
1192                    package,
1193                    module: module.clone(),
1194                    function: function.clone(),
1195                    type_arguments: vec![],
1196                    arguments: vec![],
1197                })),
1198                Command::TransferObjects(
1199                    // Inputs don't exist above, but doesn't matter for this test.
1200                    vec![Argument::Input(1), Argument::Input(2)],
1201                    Argument::Input(0),
1202                ),
1203            ],
1204        };
1205        let timings = vec![
1206            ExecutionTiming::Success(Duration::from_millis(100)),
1207            ExecutionTiming::Success(Duration::from_millis(50)),
1208        ];
1209        let total_duration = Duration::from_millis(180);
1210        observer.record_local_observations(&ptb, &timings, total_duration, 1);
1211
1212        // Check that both commands were recorded
1213        let move_key = ExecutionTimeObservationKey::MoveEntryPoint {
1214            package,
1215            module: module.clone(),
1216            function: function.clone(),
1217            type_arguments: vec![],
1218        };
1219        let move_obs = observer.local_observations.get(&move_key).unwrap();
1220        assert_eq!(
1221            move_obs.get_average(),
1222            // 100/150 == 2/3 of 30ms overhead distributed to Move command
1223            Duration::from_millis(120)
1224        );
1225
1226        let transfer_obs = observer
1227            .local_observations
1228            .get(&ExecutionTimeObservationKey::TransferObjects)
1229            .unwrap();
1230        assert_eq!(
1231            transfer_obs.get_average(),
1232            // 50ms time before adjustments
1233            // 50/150 == 1/3 of 30ms overhead distributed to object xfer
1234            // 60ms adjusetd time / 3 command length == 20ms
1235            Duration::from_millis(20)
1236        );
1237    }
1238
1239    #[tokio::test]
1240    async fn test_record_local_observations_with_object_utilization_threshold() {
1241        telemetry_subscribers::init_for_testing();
1242
1243        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1244            config.set_per_object_congestion_control_mode_for_testing(
1245                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1246                    ExecutionTimeEstimateParams {
1247                        target_utilization: 100,
1248                        allowed_txn_cost_overage_burst_limit_us: 0,
1249                        randomness_scalar: 0,
1250                        max_estimate_us: u64::MAX,
1251                        stored_observations_num_included_checkpoints: 10,
1252                        stored_observations_limit: u64::MAX,
1253                        stake_weighted_median_threshold: 0,
1254                        default_none_duration_for_new_keys: true,
1255                        observations_chunk_size: None,
1256                    },
1257                ),
1258            );
1259            config
1260        });
1261
1262        let mock_consensus_client = MockConsensusClient::new();
1263        let authority = TestAuthorityBuilder::new().build().await;
1264        let epoch_store = authority.epoch_store_for_testing();
1265        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1266            Arc::new(mock_consensus_client),
1267            CheckpointStore::new_for_tests(),
1268            authority.name,
1269            100_000,
1270            100_000,
1271            ConsensusAdapterMetrics::new_test(),
1272            Arc::new(tokio::sync::Notify::new()),
1273        ));
1274        let mut observer = ExecutionTimeObserver::new_for_testing(
1275            epoch_store.clone(),
1276            Box::new(consensus_adapter.clone()),
1277            Duration::from_millis(500), // only share observations with excess utilization >= 500ms
1278            false,                      // disable gas price weighting for this test
1279        );
1280
1281        // Create a simple PTB with one move call and one mutable shared input
1282        let package = ObjectID::random();
1283        let module = "test_module".to_string();
1284        let function = "test_function".to_string();
1285        let shared_object_id = ObjectID::random();
1286        let ptb = ProgrammableTransaction {
1287            inputs: vec![CallArg::Object(ObjectArg::SharedObject {
1288                id: shared_object_id,
1289                initial_shared_version: SequenceNumber::new(),
1290                mutability: SharedObjectMutability::Mutable,
1291            })],
1292            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1293                package,
1294                module: module.clone(),
1295                function: function.clone(),
1296                type_arguments: vec![],
1297                arguments: vec![],
1298            }))],
1299        };
1300        let key = ExecutionTimeObservationKey::MoveEntryPoint {
1301            package,
1302            module: module.clone(),
1303            function: function.clone(),
1304            type_arguments: vec![],
1305        };
1306
1307        tokio::time::pause();
1308
1309        // First observation - should not share due to low utilization
1310        let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1311        observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1312        assert!(
1313            observer
1314                .local_observations
1315                .get(&key)
1316                .unwrap()
1317                .last_shared
1318                .is_none()
1319        );
1320
1321        // Second observation - no time has passed, so now utilization is high; should share upward change
1322        let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1323        observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1324        assert_eq!(
1325            observer
1326                .local_observations
1327                .get(&key)
1328                .unwrap()
1329                .last_shared
1330                .unwrap()
1331                .0,
1332            Duration::from_secs(2)
1333        );
1334
1335        // Third execution with significant upward diff and high utilization - should share again
1336        tokio::time::advance(Duration::from_secs(5)).await;
1337        let timings = vec![ExecutionTiming::Success(Duration::from_secs(3))];
1338        observer.record_local_observations(&ptb, &timings, Duration::from_secs(5), 1);
1339        assert_eq!(
1340            observer
1341                .local_observations
1342                .get(&key)
1343                .unwrap()
1344                .last_shared
1345                .unwrap()
1346                .0,
1347            Duration::from_secs(3)
1348        );
1349
1350        // Fourth execution with significant downward diff but still overutilized - should NOT share downward change
1351        // (downward changes are only shared for indebted objects, not overutilized ones)
1352        tokio::time::advance(Duration::from_millis(150)).await;
1353        let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
1354        observer.record_local_observations(&ptb, &timings, Duration::from_millis(500), 1);
1355        assert_eq!(
1356            observer
1357                .local_observations
1358                .get(&key)
1359                .unwrap()
1360                .last_shared
1361                .unwrap()
1362                .0,
1363            Duration::from_secs(3) // still the old value, no sharing of downward change
1364        );
1365
1366        // Fifth execution after utilization drops - should not share upward diff since not overutilized
1367        tokio::time::advance(Duration::from_secs(60)).await;
1368        let timings = vec![ExecutionTiming::Success(Duration::from_secs(11))];
1369        observer.record_local_observations(&ptb, &timings, Duration::from_secs(11), 1);
1370        assert_eq!(
1371            observer
1372                .local_observations
1373                .get(&key)
1374                .unwrap()
1375                .last_shared
1376                .unwrap()
1377                .0,
1378            Duration::from_secs(3) // still the old value, no sharing when not overutilized
1379        );
1380    }
1381
1382    #[tokio::test]
1383    async fn test_record_local_observations_with_indebted_objects() {
1384        telemetry_subscribers::init_for_testing();
1385
1386        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1387            config.set_per_object_congestion_control_mode_for_testing(
1388                PerObjectCongestionControlMode::ExecutionTimeEstimate(
1389                    ExecutionTimeEstimateParams {
1390                        target_utilization: 100,
1391                        allowed_txn_cost_overage_burst_limit_us: 0,
1392                        randomness_scalar: 0,
1393                        max_estimate_us: u64::MAX,
1394                        stored_observations_num_included_checkpoints: 10,
1395                        stored_observations_limit: u64::MAX,
1396                        stake_weighted_median_threshold: 0,
1397                        default_none_duration_for_new_keys: true,
1398                        observations_chunk_size: None,
1399                    },
1400                ),
1401            );
1402            config
1403        });
1404
1405        let mock_consensus_client = MockConsensusClient::new();
1406        let authority = TestAuthorityBuilder::new().build().await;
1407        let epoch_store = authority.epoch_store_for_testing();
1408        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1409            Arc::new(mock_consensus_client),
1410            CheckpointStore::new_for_tests(),
1411            authority.name,
1412            100_000,
1413            100_000,
1414            ConsensusAdapterMetrics::new_test(),
1415            Arc::new(tokio::sync::Notify::new()),
1416        ));
1417        let mut observer = ExecutionTimeObserver::new_for_testing(
1418            epoch_store.clone(),
1419            Box::new(consensus_adapter.clone()),
1420            Duration::from_millis(500), // Low utilization threshold to enable overutilized sharing initially
1421            false,                      // disable gas price weighting for this test
1422        );
1423
1424        // Create a simple PTB with one move call and one mutable shared input
1425        let package = ObjectID::random();
1426        let module = "test_module".to_string();
1427        let function = "test_function".to_string();
1428        let shared_object_id = ObjectID::random();
1429        let ptb = ProgrammableTransaction {
1430            inputs: vec![CallArg::Object(ObjectArg::SharedObject {
1431                id: shared_object_id,
1432                initial_shared_version: SequenceNumber::new(),
1433                mutability: SharedObjectMutability::Mutable,
1434            })],
1435            commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1436                package,
1437                module: module.clone(),
1438                function: function.clone(),
1439                type_arguments: vec![],
1440                arguments: vec![],
1441            }))],
1442        };
1443        let key = ExecutionTimeObservationKey::MoveEntryPoint {
1444            package,
1445            module: module.clone(),
1446            function: function.clone(),
1447            type_arguments: vec![],
1448        };
1449
1450        tokio::time::pause();
1451
1452        // First observation - should not share due to low utilization
1453        let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1454        observer.record_local_observations(&ptb, &timings, Duration::from_secs(1), 1);
1455        assert!(
1456            observer
1457                .local_observations
1458                .get(&key)
1459                .unwrap()
1460                .last_shared
1461                .is_none()
1462        );
1463
1464        // Second observation - no time has passed, so now utilization is high; should share upward change
1465        let timings = vec![ExecutionTiming::Success(Duration::from_secs(2))];
1466        observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1467        assert_eq!(
1468            observer
1469                .local_observations
1470                .get(&key)
1471                .unwrap()
1472                .last_shared
1473                .unwrap()
1474                .0,
1475            Duration::from_millis(1500) // (1s + 2s) / 2 = 1.5s
1476        );
1477
1478        // Mark the shared object as indebted and increase utilization threshold to prevent overutilized sharing
1479        observer.update_indebted_objects(vec![shared_object_id]);
1480        observer
1481            .config
1482            .observation_sharing_object_utilization_threshold = Some(Duration::from_secs(1000));
1483
1484        // Wait for min interval and record a significant downward change
1485        // This should share because the object is indebted
1486        tokio::time::advance(Duration::from_secs(60)).await;
1487        let timings = vec![ExecutionTiming::Success(Duration::from_millis(300))];
1488        observer.record_local_observations(&ptb, &timings, Duration::from_millis(300), 1);
1489
1490        // Moving average should be (1s + 2s + 0.3s) / 3 = 1.1s
1491        // This downward change should have been shared for indebted object
1492        assert_eq!(
1493            observer
1494                .local_observations
1495                .get(&key)
1496                .unwrap()
1497                .last_shared
1498                .unwrap()
1499                .0,
1500            Duration::from_millis(1100)
1501        );
1502    }
1503
1504    #[tokio::test]
1505    async fn test_stake_weighted_median() {
1506        telemetry_subscribers::init_for_testing();
1507
1508        let (committee, _) =
1509            Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1510
1511        let params = ExecutionTimeEstimateParams {
1512            stake_weighted_median_threshold: 0,
1513            ..Default::default()
1514        };
1515
1516        let mut tracker = ConsensusObservations {
1517            observations: vec![
1518                (0, Some(Duration::from_secs(1))), // 10% stake
1519                (0, Some(Duration::from_secs(2))), // 20% stake
1520                (0, Some(Duration::from_secs(3))), // 30% stake
1521                (0, Some(Duration::from_secs(4))), // 40% stake
1522            ],
1523            stake_weighted_median: None,
1524        };
1525        tracker.update_stake_weighted_median(&committee, &params);
1526        // With stake weights [10,20,30,40]:
1527        // - Duration 1 covers 10% of stake
1528        // - Duration 2 covers 30% of stake (10+20)
1529        // - Duration 3 covers 60% of stake (10+20+30)
1530        // - Duration 4 covers 100% of stake
1531        // Median should be 3 since that's where we cross 50% of stake
1532        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1533
1534        // Test duration sorting
1535        let mut tracker = ConsensusObservations {
1536            observations: vec![
1537                (0, Some(Duration::from_secs(3))), // 10% stake
1538                (0, Some(Duration::from_secs(4))), // 20% stake
1539                (0, Some(Duration::from_secs(1))), // 30% stake
1540                (0, Some(Duration::from_secs(2))), // 40% stake
1541            ],
1542            stake_weighted_median: None,
1543        };
1544        tracker.update_stake_weighted_median(&committee, &params);
1545        // With sorted stake weights [30,40,10,20]:
1546        // - Duration 1 covers 30% of stake
1547        // - Duration 2 covers 70% of stake (30+40)
1548        // - Duration 3 covers 80% of stake (30+40+10)
1549        // - Duration 4 covers 100% of stake
1550        // Median should be 2 since that's where we cross 50% of stake
1551        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(2)));
1552
1553        // Test with one missing observation
1554        let mut tracker = ConsensusObservations {
1555            observations: vec![
1556                (0, Some(Duration::from_secs(1))), // 10% stake
1557                (0, None),                         // 20% stake (missing)
1558                (0, Some(Duration::from_secs(3))), // 30% stake
1559                (0, Some(Duration::from_secs(4))), // 40% stake
1560            ],
1561            stake_weighted_median: None,
1562        };
1563        tracker.update_stake_weighted_median(&committee, &params);
1564        // With missing observation for 20% stake:
1565        // - Duration 1 covers 10% of stake
1566        // - Duration 3 covers 40% of stake (10+30)
1567        // - Duration 4 covers 80% of stake (10+30+40)
1568        // Median should be 4 since that's where we pass half of available stake (80% / 2 == 40%)
1569        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(4)));
1570
1571        // Test with multiple missing observations
1572        let mut tracker = ConsensusObservations {
1573            observations: vec![
1574                (0, Some(Duration::from_secs(1))), // 10% stake
1575                (0, Some(Duration::from_secs(2))), // 20% stake
1576                (0, None),                         // 30% stake (missing)
1577                (0, None),                         // 40% stake (missing)
1578            ],
1579            stake_weighted_median: None,
1580        };
1581        tracker.update_stake_weighted_median(&committee, &params);
1582        // With missing observations:
1583        // - Duration 1 covers 10% of stake
1584        // - Duration 2 covers 30% of stake (10+20)
1585        // Median should be 2 since that's where we cross half of available stake (40% / 2 == 20%)
1586        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(2)));
1587
1588        // Test with one observation
1589        let mut tracker = ConsensusObservations {
1590            observations: vec![
1591                (0, None),                         // 10% stake
1592                (0, None),                         // 20% stake
1593                (0, Some(Duration::from_secs(3))), // 30% stake
1594                (0, None),                         // 40% stake
1595            ],
1596            stake_weighted_median: None,
1597        };
1598        tracker.update_stake_weighted_median(&committee, &params);
1599        // With only one observation, median should be that observation
1600        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1601
1602        // Test with all same durations
1603        let mut tracker = ConsensusObservations {
1604            observations: vec![
1605                (0, Some(Duration::from_secs(5))), // 10% stake
1606                (0, Some(Duration::from_secs(5))), // 20% stake
1607                (0, Some(Duration::from_secs(5))), // 30% stake
1608                (0, Some(Duration::from_secs(5))), // 40% stake
1609            ],
1610            stake_weighted_median: None,
1611        };
1612        tracker.update_stake_weighted_median(&committee, &params);
1613        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(5)));
1614    }
1615
1616    #[tokio::test]
1617    async fn test_stake_weighted_median_threshold() {
1618        telemetry_subscribers::init_for_testing();
1619
1620        let (committee, _) =
1621            Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1622
1623        // Test with threshold requiring at least 50% stake
1624        let params = ExecutionTimeEstimateParams {
1625            stake_weighted_median_threshold: 5000,
1626            ..Default::default()
1627        };
1628
1629        // Test with insufficient stake (only 30% have observations)
1630        let mut tracker = ConsensusObservations {
1631            observations: vec![
1632                (0, Some(Duration::from_secs(1))), // 10% stake
1633                (0, Some(Duration::from_secs(2))), // 20% stake
1634                (0, None),                         // 30% stake (missing)
1635                (0, None),                         // 40% stake (missing)
1636            ],
1637            stake_weighted_median: None,
1638        };
1639        tracker.update_stake_weighted_median(&committee, &params);
1640        // Should not compute median since only 30% stake has observations (< 50% threshold)
1641        assert_eq!(tracker.stake_weighted_median, None);
1642
1643        // Test with sufficient stake (60% have observations)
1644        let mut tracker = ConsensusObservations {
1645            observations: vec![
1646                (0, Some(Duration::from_secs(1))), // 10% stake
1647                (0, Some(Duration::from_secs(2))), // 20% stake
1648                (0, Some(Duration::from_secs(3))), // 30% stake
1649                (0, None),                         // 40% stake (missing)
1650            ],
1651            stake_weighted_median: None,
1652        };
1653        tracker.update_stake_weighted_median(&committee, &params);
1654        // Should compute median since 60% stake has observations (>= 50% threshold)
1655        assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1656    }
1657
1658    #[tokio::test]
1659    async fn test_execution_time_estimator() {
1660        telemetry_subscribers::init_for_testing();
1661
1662        let (committee, _) =
1663            Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1664        let mut estimator = ExecutionTimeEstimator::new(
1665            Arc::new(committee),
1666            ExecutionTimeEstimateParams {
1667                target_utilization: 50,
1668                max_estimate_us: 1_500_000,
1669
1670                // Not used in this test.
1671                allowed_txn_cost_overage_burst_limit_us: 0,
1672                randomness_scalar: 0,
1673                stored_observations_num_included_checkpoints: 10,
1674                stored_observations_limit: u64::MAX,
1675                stake_weighted_median_threshold: 0,
1676                default_none_duration_for_new_keys: true,
1677                observations_chunk_size: None,
1678            },
1679            std::iter::empty(),
1680        );
1681        // Create test keys
1682        let package = ObjectID::random();
1683        let module = "test_module".to_string();
1684        let function = "test_function".to_string();
1685        let move_key = ExecutionTimeObservationKey::MoveEntryPoint {
1686            package,
1687            module: module.clone(),
1688            function: function.clone(),
1689            type_arguments: vec![],
1690        };
1691        let transfer_key = ExecutionTimeObservationKey::TransferObjects;
1692
1693        // Record observations from different validators
1694        // First record some old observations that should be ignored
1695        estimator.process_observation_from_consensus(
1696            0,
1697            Some(1),
1698            move_key.clone(),
1699            Duration::from_millis(1000),
1700            false,
1701        );
1702        estimator.process_observation_from_consensus(
1703            1,
1704            Some(1),
1705            move_key.clone(),
1706            Duration::from_millis(1000),
1707            false,
1708        );
1709        estimator.process_observation_from_consensus(
1710            2,
1711            Some(1),
1712            move_key.clone(),
1713            Duration::from_millis(1000),
1714            false,
1715        );
1716
1717        estimator.process_observation_from_consensus(
1718            0,
1719            Some(1),
1720            transfer_key.clone(),
1721            Duration::from_millis(500),
1722            false,
1723        );
1724        estimator.process_observation_from_consensus(
1725            1,
1726            Some(1),
1727            transfer_key.clone(),
1728            Duration::from_millis(500),
1729            false,
1730        );
1731        estimator.process_observation_from_consensus(
1732            2,
1733            Some(1),
1734            transfer_key.clone(),
1735            Duration::from_millis(500),
1736            false,
1737        );
1738
1739        // Now record newer observations that should be used
1740        estimator.process_observation_from_consensus(
1741            0,
1742            Some(2),
1743            move_key.clone(),
1744            Duration::from_millis(100),
1745            false,
1746        );
1747        estimator.process_observation_from_consensus(
1748            1,
1749            Some(2),
1750            move_key.clone(),
1751            Duration::from_millis(200),
1752            false,
1753        );
1754        estimator.process_observation_from_consensus(
1755            2,
1756            Some(2),
1757            move_key.clone(),
1758            Duration::from_millis(300),
1759            false,
1760        );
1761
1762        estimator.process_observation_from_consensus(
1763            0,
1764            Some(2),
1765            transfer_key.clone(),
1766            Duration::from_millis(50),
1767            false,
1768        );
1769        estimator.process_observation_from_consensus(
1770            1,
1771            Some(2),
1772            transfer_key.clone(),
1773            Duration::from_millis(60),
1774            false,
1775        );
1776        estimator.process_observation_from_consensus(
1777            2,
1778            Some(2),
1779            transfer_key.clone(),
1780            Duration::from_millis(70),
1781            false,
1782        );
1783
1784        // Try to record old observations again - these should be ignored
1785        estimator.process_observation_from_consensus(
1786            0,
1787            Some(1),
1788            move_key.clone(),
1789            Duration::from_millis(1000),
1790            false,
1791        );
1792        estimator.process_observation_from_consensus(
1793            1,
1794            Some(1),
1795            transfer_key.clone(),
1796            Duration::from_millis(500),
1797            false,
1798        );
1799        estimator.process_observation_from_consensus(
1800            2,
1801            Some(1),
1802            move_key.clone(),
1803            Duration::from_millis(1000),
1804            false,
1805        );
1806
1807        // Test single command transaction
1808        let single_move_tx = TransactionData::new_programmable(
1809            SuiAddress::ZERO,
1810            vec![],
1811            ProgrammableTransaction {
1812                inputs: vec![],
1813                commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1814                    package,
1815                    module: module.clone(),
1816                    function: function.clone(),
1817                    type_arguments: vec![],
1818                    arguments: vec![],
1819                }))],
1820            },
1821            100,
1822            100,
1823        );
1824
1825        // Should return median of move call observations (300ms)
1826        assert_eq!(
1827            estimator.get_estimate(&single_move_tx),
1828            Duration::from_millis(300)
1829        );
1830
1831        // Test multi-command transaction
1832        let multi_command_tx = TransactionData::new_programmable(
1833            SuiAddress::ZERO,
1834            vec![],
1835            ProgrammableTransaction {
1836                inputs: vec![],
1837                commands: vec![
1838                    Command::MoveCall(Box::new(ProgrammableMoveCall {
1839                        package,
1840                        module: module.clone(),
1841                        function: function.clone(),
1842                        type_arguments: vec![],
1843                        arguments: vec![],
1844                    })),
1845                    Command::TransferObjects(
1846                        vec![Argument::Input(1), Argument::Input(2)],
1847                        Argument::Input(0),
1848                    ),
1849                ],
1850            },
1851            100,
1852            100,
1853        );
1854
1855        // Should return sum of median move call (300ms)
1856        // plus the median transfer (70ms) * command length (3)
1857        assert_eq!(
1858            estimator.get_estimate(&multi_command_tx),
1859            Duration::from_millis(510)
1860        );
1861    }
1862
1863    #[derive(Debug, Clone, Serialize, Deserialize)]
1864    struct ExecutionTimeObserverSnapshot {
1865        protocol_version: u64,
1866        consensus_observations: Vec<(ExecutionTimeObservationKey, ConsensusObservations)>,
1867        transaction_estimates: Vec<(String, Duration)>, // (transaction_description, estimated_duration)
1868    }
1869
1870    fn generate_test_inputs(
1871        seed: u64,
1872        num_validators: usize,
1873        generation_override: Option<u64>,
1874    ) -> Vec<(
1875        AuthorityIndex,
1876        Option<u64>,
1877        ExecutionTimeObservationKey,
1878        Duration,
1879    )> {
1880        let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
1881
1882        let observation_keys = [
1883            ExecutionTimeObservationKey::MoveEntryPoint {
1884                package: ObjectID::from_hex_literal("0x1").unwrap(),
1885                module: "coin".to_string(),
1886                function: "transfer".to_string(),
1887                type_arguments: vec![],
1888            },
1889            ExecutionTimeObservationKey::MoveEntryPoint {
1890                package: ObjectID::from_hex_literal("0x2").unwrap(),
1891                module: "nft".to_string(),
1892                function: "mint".to_string(),
1893                type_arguments: vec![],
1894            },
1895            ExecutionTimeObservationKey::TransferObjects,
1896            ExecutionTimeObservationKey::SplitCoins,
1897            ExecutionTimeObservationKey::MergeCoins,
1898            ExecutionTimeObservationKey::MakeMoveVec,
1899            ExecutionTimeObservationKey::Upgrade,
1900        ];
1901
1902        let mut inputs = Vec::new();
1903        let target_samples = 25;
1904
1905        for _ in 0..target_samples {
1906            let key = observation_keys[rng.gen_range(0..observation_keys.len())].clone();
1907            let authority_index =
1908                AuthorityIndex::try_from(rng.gen_range(0..num_validators)).unwrap();
1909
1910            // Use realistic range where newer generations might replace older ones
1911            let generation = generation_override.unwrap_or_else(|| rng.gen_range(1..=10));
1912
1913            // Generate duration based on key type with realistic variance
1914            // Sometimes generate zero values to test corner cases with byzantine validators
1915            let base_duration = if rng.gen_ratio(1, 20) {
1916                // 5% chance of zero duration to test corner cases
1917                0
1918            } else {
1919                match &key {
1920                    ExecutionTimeObservationKey::MoveEntryPoint { .. } => rng.gen_range(50..=500),
1921                    ExecutionTimeObservationKey::TransferObjects => rng.gen_range(10..=100),
1922                    ExecutionTimeObservationKey::SplitCoins => rng.gen_range(20..=80),
1923                    ExecutionTimeObservationKey::MergeCoins => rng.gen_range(15..=70),
1924                    ExecutionTimeObservationKey::MakeMoveVec => rng.gen_range(5..=30),
1925                    ExecutionTimeObservationKey::Upgrade => rng.gen_range(100..=1000),
1926                    ExecutionTimeObservationKey::Publish => rng.gen_range(200..=2000),
1927                }
1928            };
1929
1930            let duration = Duration::from_millis(base_duration);
1931
1932            inputs.push((authority_index, Some(generation), key, duration));
1933        }
1934
1935        inputs
1936    }
1937
1938    fn generate_test_transactions(seed: u64) -> Vec<(String, TransactionData)> {
1939        let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
1940        let mut transactions = Vec::new();
1941
1942        let package3 = ObjectID::from_hex_literal("0x3").unwrap();
1943        transactions.push((
1944            "coin_transfer_call".to_string(),
1945            TransactionData::new_programmable(
1946                SuiAddress::ZERO,
1947                vec![],
1948                ProgrammableTransaction {
1949                    inputs: vec![],
1950                    commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1951                        package: ObjectID::from_hex_literal("0x1").unwrap(),
1952                        module: "coin".to_string(),
1953                        function: "transfer".to_string(),
1954                        type_arguments: vec![],
1955                        arguments: vec![],
1956                    }))],
1957                },
1958                rng.gen_range(100..1000),
1959                rng.gen_range(100..1000),
1960            ),
1961        ));
1962
1963        transactions.push((
1964            "mixed_move_calls".to_string(),
1965            TransactionData::new_programmable(
1966                SuiAddress::ZERO,
1967                vec![],
1968                ProgrammableTransaction {
1969                    inputs: vec![],
1970                    commands: vec![
1971                        Command::MoveCall(Box::new(ProgrammableMoveCall {
1972                            package: ObjectID::from_hex_literal("0x1").unwrap(),
1973                            module: "coin".to_string(),
1974                            function: "transfer".to_string(),
1975                            type_arguments: vec![],
1976                            arguments: vec![],
1977                        })),
1978                        Command::MoveCall(Box::new(ProgrammableMoveCall {
1979                            package: ObjectID::from_hex_literal("0x2").unwrap(),
1980                            module: "nft".to_string(),
1981                            function: "mint".to_string(),
1982                            type_arguments: vec![],
1983                            arguments: vec![],
1984                        })),
1985                    ],
1986                },
1987                rng.gen_range(100..1000),
1988                rng.gen_range(100..1000),
1989            ),
1990        ));
1991
1992        transactions.push((
1993            "native_commands_with_observations".to_string(),
1994            TransactionData::new_programmable(
1995                SuiAddress::ZERO,
1996                vec![],
1997                ProgrammableTransaction {
1998                    inputs: vec![],
1999                    commands: vec![
2000                        Command::TransferObjects(vec![Argument::Input(0)], Argument::Input(1)),
2001                        Command::SplitCoins(Argument::Input(2), vec![Argument::Input(3)]),
2002                        Command::MergeCoins(Argument::Input(4), vec![Argument::Input(5)]),
2003                        Command::MakeMoveVec(None, vec![Argument::Input(6)]),
2004                    ],
2005                },
2006                rng.gen_range(100..1000),
2007                rng.gen_range(100..1000),
2008            ),
2009        ));
2010
2011        let num_objects = rng.gen_range(1..=5);
2012        transactions.push((
2013            format!("transfer_objects_{}_items", num_objects),
2014            TransactionData::new_programmable(
2015                SuiAddress::ZERO,
2016                vec![],
2017                ProgrammableTransaction {
2018                    inputs: vec![],
2019                    commands: vec![Command::TransferObjects(
2020                        (0..num_objects).map(Argument::Input).collect(),
2021                        Argument::Input(num_objects),
2022                    )],
2023                },
2024                rng.gen_range(100..1000),
2025                rng.gen_range(100..1000),
2026            ),
2027        ));
2028
2029        let num_amounts = rng.gen_range(1..=4);
2030        transactions.push((
2031            format!("split_coins_{}_amounts", num_amounts),
2032            TransactionData::new_programmable(
2033                SuiAddress::ZERO,
2034                vec![],
2035                ProgrammableTransaction {
2036                    inputs: vec![],
2037                    commands: vec![Command::SplitCoins(
2038                        Argument::Input(0),
2039                        (1..=num_amounts).map(Argument::Input).collect(),
2040                    )],
2041                },
2042                rng.gen_range(100..1000),
2043                rng.gen_range(100..1000),
2044            ),
2045        ));
2046
2047        let num_sources = rng.gen_range(1..=3);
2048        transactions.push((
2049            format!("merge_coins_{}_sources", num_sources),
2050            TransactionData::new_programmable(
2051                SuiAddress::ZERO,
2052                vec![],
2053                ProgrammableTransaction {
2054                    inputs: vec![],
2055                    commands: vec![Command::MergeCoins(
2056                        Argument::Input(0),
2057                        (1..=num_sources).map(Argument::Input).collect(),
2058                    )],
2059                },
2060                rng.gen_range(100..1000),
2061                rng.gen_range(100..1000),
2062            ),
2063        ));
2064
2065        let num_elements = rng.gen_range(0..=6);
2066        transactions.push((
2067            format!("make_move_vec_{}_elements", num_elements),
2068            TransactionData::new_programmable(
2069                SuiAddress::ZERO,
2070                vec![],
2071                ProgrammableTransaction {
2072                    inputs: vec![],
2073                    commands: vec![Command::MakeMoveVec(
2074                        None,
2075                        (0..num_elements).map(Argument::Input).collect(),
2076                    )],
2077                },
2078                rng.gen_range(100..1000),
2079                rng.gen_range(100..1000),
2080            ),
2081        ));
2082
2083        transactions.push((
2084            "mixed_commands".to_string(),
2085            TransactionData::new_programmable(
2086                SuiAddress::ZERO,
2087                vec![],
2088                ProgrammableTransaction {
2089                    inputs: vec![],
2090                    commands: vec![
2091                        Command::MoveCall(Box::new(ProgrammableMoveCall {
2092                            package: package3,
2093                            module: "game".to_string(),
2094                            function: "play".to_string(),
2095                            type_arguments: vec![],
2096                            arguments: vec![],
2097                        })),
2098                        Command::TransferObjects(
2099                            vec![Argument::Input(1), Argument::Input(2)],
2100                            Argument::Input(0),
2101                        ),
2102                        Command::SplitCoins(Argument::Input(3), vec![Argument::Input(4)]),
2103                    ],
2104                },
2105                rng.gen_range(100..1000),
2106                rng.gen_range(100..1000),
2107            ),
2108        ));
2109
2110        transactions.push((
2111            "upgrade_package".to_string(),
2112            TransactionData::new_programmable(
2113                SuiAddress::ZERO,
2114                vec![],
2115                ProgrammableTransaction {
2116                    inputs: vec![],
2117                    commands: vec![Command::Upgrade(
2118                        vec![],
2119                        vec![],
2120                        package3,
2121                        Argument::Input(0),
2122                    )],
2123                },
2124                rng.gen_range(100..1000),
2125                rng.gen_range(100..1000),
2126            ),
2127        ));
2128
2129        transactions
2130    }
2131
2132    // Safeguard against forking because of changes to the execution time estimator.
2133    //
2134    // Within an epoch, each estimator must reach the same conclusion about the observations and
2135    // stake_weighted_median from the observations shared by other validators, as this is used
2136    // for transaction ordering.
2137    //
2138    // Therefore; any change in the calculation of the observations or stake_weighted_median
2139    // not accompanied by a protocol version change may fork.
2140    //
2141    // This test uses snapshots of computed stake weighted median at particular protocol versions
2142    // to attempt to discover regressions that might fork.
2143    #[test]
2144    fn snapshot_tests() {
2145        println!("\n============================================================================");
2146        println!("!                                                                          !");
2147        println!("! IMPORTANT: never update snapshots from this test. only add new versions! !");
2148        println!("!                                                                          !");
2149        println!("============================================================================\n");
2150
2151        let max_version = ProtocolVersion::MAX.as_u64();
2152
2153        let test_versions: Vec<u64> = (max_version.saturating_sub(9)..=max_version).collect();
2154
2155        for version in test_versions {
2156            let protocol_version = ProtocolVersion::new(version);
2157            let protocol_config = ProtocolConfig::get_for_version(protocol_version, Chain::Unknown);
2158            let (committee, _) = Committee::new_simple_test_committee_of_size(4);
2159            let committee = Arc::new(committee);
2160
2161            let initial_generation =
2162                if let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) =
2163                    protocol_config.per_object_congestion_control_mode()
2164                {
2165                    if params.default_none_duration_for_new_keys {
2166                        None
2167                    } else {
2168                        Some(0)
2169                    }
2170                } else {
2171                    Some(0) // fallback for versions without execution time estimate mode
2172                };
2173
2174            let initial_observations =
2175                generate_test_inputs(0, committee.num_members(), initial_generation);
2176            let mut estimator = ExecutionTimeEstimator::new(
2177                committee.clone(),
2178                ExecutionTimeEstimateParams {
2179                    max_estimate_us: u64::MAX, // Allow unlimited estimates for testing
2180                    ..ExecutionTimeEstimateParams::default()
2181                },
2182                initial_observations.into_iter(),
2183            );
2184
2185            let test_inputs = generate_test_inputs(version, committee.num_members(), None);
2186
2187            for (source, generation, observation_key, duration) in test_inputs {
2188                estimator.process_observation_from_consensus(
2189                    source,
2190                    generation,
2191                    observation_key,
2192                    duration,
2193                    false,
2194                );
2195            }
2196
2197            let mut final_observations = estimator.get_observations();
2198            final_observations.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
2199
2200            let test_transactions = generate_test_transactions(version);
2201            let mut transaction_estimates = Vec::new();
2202            for (description, tx_data) in test_transactions {
2203                let estimate = estimator.get_estimate(&tx_data);
2204                transaction_estimates.push((description, estimate));
2205            }
2206
2207            let snapshot_data = ExecutionTimeObserverSnapshot {
2208                protocol_version: version,
2209                consensus_observations: final_observations.clone(),
2210                transaction_estimates,
2211            };
2212            insta::assert_yaml_snapshot!(
2213                format!("execution_time_observer_v{}", version),
2214                snapshot_data
2215            );
2216        }
2217    }
2218}