1use std::{
5 collections::HashMap,
6 hash::{Hash, Hasher},
7 num::NonZeroUsize,
8 sync::{Arc, Weak},
9 time::{Duration, SystemTime},
10};
11
12use serde::{Deserialize, Serialize};
13
14use super::authority_per_epoch_store::AuthorityPerEpochStore;
15use super::weighted_moving_average::WeightedMovingAverage;
16use crate::consensus_adapter::SubmitToConsensus;
17use governor::{Quota, RateLimiter, clock::MonotonicClock};
18use itertools::Itertools;
19use lru::LruCache;
20#[cfg(not(msim))]
21use mysten_common::in_antithesis;
22use mysten_common::{assert_reachable, debug_fatal, in_test_configuration};
23use mysten_metrics::{monitored_scope, spawn_monitored_task};
24use rand::{Rng, SeedableRng, random, rngs, thread_rng};
25use simple_moving_average::{SMA, SingleSumSMA};
26use sui_config::node::ExecutionTimeObserverConfig;
27use sui_protocol_config::{ExecutionTimeEstimateParams, PerObjectCongestionControlMode};
28use sui_types::{
29 base_types::ObjectID,
30 committee::Committee,
31 error::SuiErrorKind,
32 execution::{ExecutionTimeObservationKey, ExecutionTiming},
33 messages_consensus::{AuthorityIndex, ConsensusTransaction, ExecutionTimeObservation},
34 transaction::{
35 Command, ProgrammableTransaction, StoredExecutionTimeObservations, TransactionData,
36 TransactionDataAPI, TransactionKind,
37 },
38};
39use tokio::{sync::mpsc, time::Instant};
40use tracing::{debug, info, trace, warn};
41
42const SMA_LOCAL_OBSERVATION_WINDOW_SIZE: usize = 20;
45const OBJECT_UTILIZATION_METRIC_HASH_MODULUS: u8 = 32;
46
47#[cfg(not(msim))]
54fn antithesis_enable_injecting_synthetic_execution_time() -> bool {
55 use std::sync::OnceLock;
56 static ENABLE_INJECTION: OnceLock<bool> = OnceLock::new();
57 *ENABLE_INJECTION.get_or_init(|| {
58 if !in_antithesis() {
59 return false;
60 }
61
62 std::env::var("ANTITHESIS_ENABLE_EXECUTION_TIME_INJECTION")
63 .map(|v| v.to_lowercase() == "true" || v == "1")
64 .unwrap_or(true)
65 })
66}
67
68pub struct ExecutionTimeObserver {
70 epoch_store: Weak<AuthorityPerEpochStore>,
71 consensus_adapter: Box<dyn SubmitToConsensus>,
72
73 protocol_params: ExecutionTimeEstimateParams,
74 config: ExecutionTimeObserverConfig,
75
76 local_observations: LruCache<ExecutionTimeObservationKey, LocalObservations>,
77
78 object_utilization_tracker: LruCache<ObjectID, ObjectUtilization>,
82
83 indebted_objects: Vec<ObjectID>,
85
86 sharing_rate_limiter: RateLimiter<
87 governor::state::NotKeyed,
88 governor::state::InMemoryState,
89 governor::clock::MonotonicClock,
90 governor::middleware::NoOpMiddleware<
91 <governor::clock::MonotonicClock as governor::clock::Clock>::Instant,
92 >,
93 >,
94
95 next_generation_number: u64,
96}
97
98#[derive(Debug, Clone)]
99pub struct LocalObservations {
100 moving_average: SingleSumSMA<Duration, u32, SMA_LOCAL_OBSERVATION_WINDOW_SIZE>,
101 weighted_moving_average: WeightedMovingAverage,
102 last_shared: Option<(Duration, Instant)>,
103 config: ExecutionTimeObserverConfig,
104}
105
106impl LocalObservations {
107 fn new(config: ExecutionTimeObserverConfig, default_duration: Duration) -> Self {
108 let window_size = config.weighted_moving_average_window_size();
109 Self {
110 moving_average: SingleSumSMA::from_zero(Duration::ZERO),
111 weighted_moving_average: WeightedMovingAverage::new(
112 default_duration.as_micros() as u64,
113 window_size,
114 ),
115 last_shared: None,
116 config,
117 }
118 }
119
120 fn add_sample(&mut self, duration: Duration, gas_price: u64) {
121 self.moving_average.add_sample(duration);
122 self.weighted_moving_average
123 .add_sample(duration.as_micros() as u64, gas_price);
124 }
125
126 fn get_average(&self) -> Duration {
127 if self.config.enable_gas_price_weighting() {
128 Duration::from_micros(self.weighted_moving_average.get_weighted_average())
129 } else {
130 self.moving_average.get_average()
131 }
132 }
133
134 fn diff_exceeds_threshold(
135 &self,
136 new_average: Duration,
137 threshold: f64,
138 min_interval: Duration,
139 ) -> bool {
140 let Some((last_shared, last_shared_timestamp)) = self.last_shared else {
141 return true;
143 };
144
145 if last_shared_timestamp.elapsed() < min_interval {
146 return false;
147 }
148
149 if threshold >= 0.0 {
150 new_average
152 .checked_sub(last_shared)
153 .is_some_and(|diff| diff > last_shared.mul_f64(threshold))
154 } else {
155 last_shared
157 .checked_sub(new_average)
158 .is_some_and(|diff| diff > last_shared.mul_f64(-threshold))
159 }
160 }
161}
162
163#[derive(Debug, Clone)]
164pub struct ObjectUtilization {
165 excess_execution_time: Duration,
166 last_measured: Option<Instant>,
167 was_overutilized: bool, }
169
170impl ObjectUtilization {
171 pub fn overutilized(&self, config: &ExecutionTimeObserverConfig) -> bool {
172 self.excess_execution_time > config.observation_sharing_object_utilization_threshold()
173 }
174}
175
176impl ExecutionTimeObserver {
178 pub fn spawn(
179 epoch_store: Arc<AuthorityPerEpochStore>,
180 consensus_adapter: Box<dyn SubmitToConsensus>,
181 config: ExecutionTimeObserverConfig,
182 ) {
183 let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) = epoch_store
184 .protocol_config()
185 .per_object_congestion_control_mode()
186 else {
187 info!(
188 "ExecutionTimeObserver disabled because per-object congestion control mode is not ExecutionTimeEstimate"
189 );
190 return;
191 };
192
193 let (tx_local_execution_time, mut rx_local_execution_time) =
194 mpsc::channel(config.observation_channel_capacity().into());
195 let (tx_object_debts, mut rx_object_debts) =
196 mpsc::channel(config.object_debt_channel_capacity().into());
197 epoch_store.set_local_execution_time_channels(tx_local_execution_time, tx_object_debts);
198
199 let mut observer = Self {
201 epoch_store: Arc::downgrade(&epoch_store),
202 consensus_adapter,
203 local_observations: LruCache::new(config.observation_cache_size()),
204 object_utilization_tracker: LruCache::new(config.object_utilization_cache_size()),
205 indebted_objects: Vec::new(),
206 sharing_rate_limiter: RateLimiter::direct_with_clock(
207 Quota::per_second(config.observation_sharing_rate_limit())
208 .allow_burst(config.observation_sharing_burst_limit()),
209 &MonotonicClock,
210 ),
211 protocol_params,
212 config,
213 next_generation_number: SystemTime::now()
214 .duration_since(std::time::UNIX_EPOCH)
215 .expect("Sui did not exist prior to 1970")
216 .as_micros()
217 .try_into()
218 .expect("This build of sui is not supported in the year 500,000"),
219 };
220 spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
221 loop {
222 tokio::select! {
223 Some(object_debts) = rx_object_debts.recv() => {
225 observer.update_indebted_objects(object_debts);
226 }
227 Some((tx, timings, total_duration, gas_price)) = rx_local_execution_time.recv() => {
228 observer
229 .record_local_observations(&tx, &timings, total_duration, gas_price);
230 }
231 else => { break }
232 }
233 }
234 info!("shutting down ExecutionTimeObserver");
235 }));
236 }
237
238 #[cfg(test)]
239 fn new_for_testing(
240 epoch_store: Arc<AuthorityPerEpochStore>,
241 consensus_adapter: Box<dyn SubmitToConsensus>,
242 observation_sharing_object_utilization_threshold: Duration,
243 enable_gas_price_weighting: bool,
244 ) -> Self {
245 let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) = epoch_store
246 .protocol_config()
247 .per_object_congestion_control_mode()
248 else {
249 panic!(
250 "tried to construct test ExecutionTimeObserver when congestion control mode is not ExecutionTimeEstimate"
251 );
252 };
253 Self {
254 epoch_store: Arc::downgrade(&epoch_store),
255 consensus_adapter,
256 protocol_params,
257 config: ExecutionTimeObserverConfig {
258 observation_sharing_object_utilization_threshold: Some(
259 observation_sharing_object_utilization_threshold,
260 ),
261 enable_gas_price_weighting: Some(enable_gas_price_weighting),
262 ..ExecutionTimeObserverConfig::default()
263 },
264 local_observations: LruCache::new(NonZeroUsize::new(10000).unwrap()),
265 object_utilization_tracker: LruCache::new(NonZeroUsize::new(50000).unwrap()),
266 indebted_objects: Vec::new(),
267 sharing_rate_limiter: RateLimiter::direct_with_clock(
268 Quota::per_hour(std::num::NonZeroU32::MAX),
269 &MonotonicClock,
270 ),
271 next_generation_number: SystemTime::now()
272 .duration_since(std::time::UNIX_EPOCH)
273 .expect("Sui did not exist prior to 1970")
274 .as_micros()
275 .try_into()
276 .expect("This build of sui is not supported in the year 500,000"),
277 }
278 }
279
280 fn record_local_observations(
285 &mut self,
286 tx: &ProgrammableTransaction,
287 timings: &[ExecutionTiming],
288 total_duration: Duration,
289 gas_price: u64,
290 ) {
291 let _scope = monitored_scope("ExecutionTimeObserver::record_local_observations");
292
293 #[cfg(msim)]
295 let should_inject = self.config.inject_synthetic_execution_time();
296 #[cfg(not(msim))]
297 let should_inject = antithesis_enable_injecting_synthetic_execution_time();
298
299 if should_inject {
300 let (generated_timings, generated_duration) = self.generate_test_timings(tx, timings);
301 self.record_local_observations_timing(
302 tx,
303 &generated_timings,
304 generated_duration,
305 gas_price,
306 )
307 } else {
308 self.record_local_observations_timing(tx, timings, total_duration, gas_price)
309 }
310 }
311
312 fn record_local_observations_timing(
313 &mut self,
314 tx: &ProgrammableTransaction,
315 timings: &[ExecutionTiming],
316 total_duration: Duration,
317 gas_price: u64,
318 ) {
319 let Some(epoch_store) = self.epoch_store.upgrade() else {
320 debug!("epoch is ending, dropping execution time observation");
321 return;
322 };
323 let timings = if timings.len() > tx.commands.len() {
324 warn!(
325 executed_commands = timings.len(),
326 original_commands = tx.commands.len(),
327 "execution produced more timings than the original PTB commands; using the trailing timings for local execution-time observations"
328 );
329 &timings[timings.len() - tx.commands.len()..]
330 } else {
331 timings
332 };
333
334 let mut uses_indebted_object = false;
335
336 let max_excess_per_object_execution_time = tx
339 .shared_input_objects()
340 .filter_map(|obj| obj.is_accessed_exclusively().then_some(obj.id))
341 .map(|id| {
342 if !uses_indebted_object && self.indebted_objects.binary_search(&id).is_ok() {
344 uses_indebted_object = true;
345 }
346
347 let now = Instant::now();
357 let utilization =
358 self.object_utilization_tracker
359 .get_or_insert_mut(id, || ObjectUtilization {
360 excess_execution_time: Duration::ZERO,
361 last_measured: None,
362 was_overutilized: false,
363 });
364 let overutilized_at_start = utilization.overutilized(&self.config);
365 utilization.excess_execution_time += total_duration;
366 utilization.excess_execution_time =
367 utilization.excess_execution_time.saturating_sub(
368 utilization
369 .last_measured
370 .map(|last_measured| {
371 now.duration_since(last_measured)
372 .mul_f64(self.protocol_params.target_utilization as f64 / 100.0)
373 })
374 .unwrap_or(Duration::MAX),
375 );
376 utilization.last_measured = Some(now);
377 if utilization.overutilized(&self.config) {
378 utilization.was_overutilized = true;
379 }
380
381 if !overutilized_at_start && utilization.overutilized(&self.config) {
383 trace!("object {id:?} is overutilized");
384 epoch_store
385 .metrics
386 .epoch_execution_time_observer_overutilized_objects
387 .inc();
388 } else if overutilized_at_start && !utilization.overutilized(&self.config) {
389 epoch_store
390 .metrics
391 .epoch_execution_time_observer_overutilized_objects
392 .dec();
393 }
394 if utilization.was_overutilized {
395 let key = if self.config.report_object_utilization_metric_with_full_id() {
396 id.to_string()
397 } else {
398 let key_lsb = id.into_bytes()[ObjectID::LENGTH - 1];
399 let hash = key_lsb % OBJECT_UTILIZATION_METRIC_HASH_MODULUS;
400 format!("{:x}", hash)
401 };
402
403 epoch_store
404 .metrics
405 .epoch_execution_time_observer_object_utilization
406 .with_label_values(&[key.as_str()])
407 .inc_by(total_duration.as_secs_f64());
408 }
409
410 utilization.excess_execution_time
411 })
412 .max()
413 .unwrap_or(Duration::ZERO);
414 epoch_store
415 .metrics
416 .epoch_execution_time_observer_utilization_cache_size
417 .set(self.object_utilization_tracker.len() as i64);
418
419 let total_command_duration: Duration = timings.iter().map(|t| t.duration()).sum();
420 let extra_overhead = total_duration - total_command_duration;
421
422 let mut to_share = Vec::with_capacity(tx.commands.len());
423 for (i, timing) in timings.iter().enumerate() {
424 let command = &tx.commands[i];
425
426 if matches!(command, Command::Publish(_, _)) {
428 continue;
429 }
430
431 let mut command_duration = timing.duration();
433
434 let overhead_factor = if total_command_duration > Duration::ZERO {
436 command_duration.as_secs_f64() / total_command_duration.as_secs_f64()
437 } else {
438 1.0 / (tx.commands.len() as f64)
440 };
441 command_duration += extra_overhead.mul_f64(overhead_factor);
442
443 command_duration = command_duration.div_f64(command_length(command).get() as f64);
446
447 let key = ExecutionTimeObservationKey::from_command(command);
449 let local_observation = self.local_observations.get_or_insert_mut(key.clone(), || {
450 LocalObservations::new(self.config.clone(), Duration::ZERO)
451 });
452 local_observation.add_sample(command_duration, gas_price);
453
454 let new_average = local_observation.get_average();
459 let mut should_share = false;
460
461 if max_excess_per_object_execution_time
463 >= self
464 .config
465 .observation_sharing_object_utilization_threshold()
466 && local_observation.diff_exceeds_threshold(
467 new_average,
468 self.config.observation_sharing_diff_threshold(),
469 self.config.observation_sharing_min_interval(),
470 )
471 {
472 should_share = true;
473 epoch_store
474 .metrics
475 .epoch_execution_time_observations_sharing_reason
476 .with_label_values(&["utilization"])
477 .inc();
478 };
479
480 if uses_indebted_object
482 && local_observation.diff_exceeds_threshold(
483 new_average,
484 -self.config.observation_sharing_diff_threshold(),
485 self.config.observation_sharing_min_interval(),
486 )
487 {
488 should_share = true;
489 epoch_store
490 .metrics
491 .epoch_execution_time_observations_sharing_reason
492 .with_label_values(&["indebted"])
493 .inc();
494 }
495
496 if should_share {
497 debug!("sharing new execution time observation for {key:?}: {new_average:?}");
498 to_share.push((key, new_average));
499 local_observation.last_shared = Some((new_average, Instant::now()));
500 }
501 }
502
503 self.share_observations(to_share);
505 }
506
507 fn generate_test_timings(
508 &self,
509 tx: &ProgrammableTransaction,
510 timings: &[ExecutionTiming],
511 ) -> (Vec<ExecutionTiming>, Duration) {
512 #[allow(clippy::disallowed_methods)]
513 let generated_timings: Vec<_> = tx
514 .commands
515 .iter()
516 .zip(timings.iter())
518 .map(|(command, timing)| {
519 let key = ExecutionTimeObservationKey::from_command(command);
520 let duration = self.get_test_duration(&key);
521 if timing.is_abort() {
522 ExecutionTiming::Abort(duration)
523 } else {
524 ExecutionTiming::Success(duration)
525 }
526 })
527 .collect();
528
529 let total_duration = generated_timings
530 .iter()
531 .map(|t| t.duration())
532 .sum::<Duration>()
533 + thread_rng().gen_range(Duration::from_millis(10)..Duration::from_millis(50));
534
535 (generated_timings, total_duration)
536 }
537
538 fn get_test_duration(&self, key: &ExecutionTimeObservationKey) -> Duration {
539 #[cfg(msim)]
540 let should_inject = self.config.inject_synthetic_execution_time();
541 #[cfg(not(msim))]
542 let should_inject = false;
543
544 if !in_test_configuration() && !should_inject {
545 panic!("get_test_duration called in non-test configuration");
546 }
547
548 thread_local! {
549 static PER_TEST_SEED: u64 = random::<u64>();
550 }
551
552 let mut hasher = std::collections::hash_map::DefaultHasher::new();
553
554 let checkpoint_digest_used = self
555 .epoch_store
556 .upgrade()
557 .and_then(|store| {
558 store
559 .get_lowest_non_genesis_checkpoint_summary()
560 .ok()
561 .flatten()
562 })
563 .map(|summary| summary.content_digest.hash(&mut hasher))
564 .is_some();
565
566 if !checkpoint_digest_used {
567 PER_TEST_SEED.with(|seed| seed.hash(&mut hasher));
568 }
569
570 key.hash(&mut hasher);
571 let mut rng = rngs::StdRng::seed_from_u64(hasher.finish());
572 rng.gen_range(Duration::from_millis(100)..Duration::from_millis(600))
573 }
574
575 fn share_observations(&mut self, to_share: Vec<(ExecutionTimeObservationKey, Duration)>) {
576 if to_share.is_empty() {
577 return;
578 }
579 let Some(epoch_store) = self.epoch_store.upgrade() else {
580 debug!("epoch is ending, dropping execution time observation");
581 return;
582 };
583
584 let num_observations = to_share.len() as u64;
585
586 if let Err(e) = self.sharing_rate_limiter.check() {
588 epoch_store
589 .metrics
590 .epoch_execution_time_observations_dropped
591 .with_label_values(&["global_rate_limit"])
592 .inc_by(num_observations);
593 debug!("rate limit exceeded, dropping execution time observation; {e:?}");
594 return;
595 }
596
597 let epoch_store = epoch_store.clone();
598 let transaction = ConsensusTransaction::new_execution_time_observation(
599 ExecutionTimeObservation::new(epoch_store.name, self.next_generation_number, to_share),
600 );
601 self.next_generation_number += 1;
602
603 if let Err(e) = self.consensus_adapter.submit_best_effort(
604 &transaction,
605 &epoch_store,
606 Duration::from_secs(5),
607 ) {
608 if !matches!(e.as_inner(), SuiErrorKind::EpochEnded(_)) {
609 epoch_store
610 .metrics
611 .epoch_execution_time_observations_dropped
612 .with_label_values(&["submit_to_consensus"])
613 .inc_by(num_observations);
614 warn!("failed to submit execution time observation: {e:?}");
615 }
616 } else {
617 assert_reachable!("successfully shares execution time observations");
621 epoch_store
622 .metrics
623 .epoch_execution_time_observations_shared
624 .inc_by(num_observations);
625 }
626 }
627
628 fn update_indebted_objects(&mut self, mut object_debts: Vec<ObjectID>) {
629 let _scope = monitored_scope("ExecutionTimeObserver::update_indebted_objects");
630
631 let Some(epoch_store) = self.epoch_store.upgrade() else {
632 debug!("epoch is ending, dropping indebted object update");
633 return;
634 };
635
636 object_debts.sort_unstable();
637 object_debts.dedup();
638 self.indebted_objects = object_debts;
639 epoch_store
640 .metrics
641 .epoch_execution_time_observer_indebted_objects
642 .set(self.indebted_objects.len() as i64);
643 }
644}
645
646pub const EXTRA_FIELD_EXECUTION_TIME_ESTIMATES_KEY: u64 = 0;
649
650pub const EXTRA_FIELD_EXECUTION_TIME_ESTIMATES_CHUNK_COUNT_KEY: u64 = 1;
652
653pub struct ExecutionTimeEstimator {
656 committee: Arc<Committee>,
657 protocol_params: ExecutionTimeEstimateParams,
658
659 consensus_observations: HashMap<ExecutionTimeObservationKey, ConsensusObservations>,
660}
661
662#[derive(Debug, Clone, Serialize, Deserialize)]
663pub struct ConsensusObservations {
664 observations: Vec<(u64 , Option<Duration>)>, stake_weighted_median: Option<Duration>, }
667
668impl ConsensusObservations {
669 fn update_stake_weighted_median(
670 &mut self,
671 committee: &Committee,
672 config: &ExecutionTimeEstimateParams,
673 ) {
674 let mut stake_with_observations = 0;
675 let sorted_observations: Vec<_> = self
676 .observations
677 .iter()
678 .enumerate()
679 .filter_map(|(i, (_, duration))| {
680 duration.map(|duration| {
681 let authority_index: AuthorityIndex = i.try_into().unwrap();
682 stake_with_observations += committee.stake_by_index(authority_index).unwrap();
683 (duration, authority_index)
684 })
685 })
686 .sorted()
687 .collect();
688
689 if stake_with_observations < config.stake_weighted_median_threshold {
691 self.stake_weighted_median = None;
692 return;
693 }
694
695 let median_stake = stake_with_observations / 2;
697 let mut running_stake = 0;
698 for (duration, authority_index) in sorted_observations {
699 running_stake += committee.stake_by_index(authority_index).unwrap();
700 if running_stake > median_stake {
701 self.stake_weighted_median = Some(duration);
702 break;
703 }
704 }
705 }
706}
707
708impl ExecutionTimeEstimator {
709 pub fn new(
710 committee: Arc<Committee>,
711 protocol_params: ExecutionTimeEstimateParams,
712 initial_observations: impl Iterator<
713 Item = (
714 AuthorityIndex,
715 Option<u64>,
716 ExecutionTimeObservationKey,
717 Duration,
718 ),
719 >,
720 ) -> Self {
721 let mut estimator = Self {
722 committee,
723 protocol_params,
724 consensus_observations: HashMap::new(),
725 };
726 for (source, generation, key, duration) in initial_observations {
727 estimator.process_observation_from_consensus(
728 source,
729 generation,
730 key.to_owned(),
731 duration,
732 true,
733 );
734 }
735 for observation in estimator.consensus_observations.values_mut() {
736 observation
737 .update_stake_weighted_median(&estimator.committee, &estimator.protocol_params);
738 }
739 estimator
740 }
741
742 #[cfg(test)]
743 pub fn new_for_testing() -> Self {
744 let (committee, _) = Committee::new_simple_test_committee_of_size(1);
745 Self {
746 committee: Arc::new(committee),
747 protocol_params: ExecutionTimeEstimateParams {
748 target_utilization: 100,
749 max_estimate_us: u64::MAX,
750 ..ExecutionTimeEstimateParams::default()
751 },
752 consensus_observations: HashMap::new(),
753 }
754 }
755
756 pub fn process_observations_from_consensus(
757 &mut self,
758 source: AuthorityIndex,
759 generation: Option<u64>,
760 observations: &[(ExecutionTimeObservationKey, Duration)],
761 ) {
762 for (key, duration) in observations {
763 self.process_observation_from_consensus(
764 source,
765 generation,
766 key.to_owned(),
767 *duration,
768 false,
769 );
770 }
771 }
772
773 fn process_observation_from_consensus(
774 &mut self,
775 source: AuthorityIndex,
776 generation: Option<u64>,
777 observation_key: ExecutionTimeObservationKey,
778 duration: Duration,
779 skip_update: bool,
780 ) {
781 if matches!(observation_key, ExecutionTimeObservationKey::Publish) {
782 warn!(
784 "dropping Publish observation received from possibly-Byzanitine authority {source}"
785 );
786 return;
787 }
788
789 assert_reachable!("receives some valid execution time observations");
790
791 let observations = self
792 .consensus_observations
793 .entry(observation_key)
794 .or_insert_with(|| {
795 let len = self.committee.num_members();
796 let mut empty_observations = Vec::with_capacity(len);
797 empty_observations.resize(len, (0, None));
798 ConsensusObservations {
799 observations: empty_observations,
800 stake_weighted_median: if self
801 .protocol_params
802 .default_none_duration_for_new_keys
803 {
804 None
805 } else {
806 Some(Duration::ZERO)
807 },
808 }
809 });
810
811 let (obs_generation, obs_duration) =
812 &mut observations.observations[TryInto::<usize>::try_into(source).unwrap()];
813 if generation.is_some_and(|generation| *obs_generation >= generation) {
814 return;
816 }
817 *obs_generation = generation.unwrap_or(0);
818 *obs_duration = Some(duration);
819 if !skip_update {
820 observations.update_stake_weighted_median(&self.committee, &self.protocol_params);
821 }
822 }
823
824 pub fn get_estimate(&self, tx: &TransactionData) -> Duration {
825 let TransactionKind::ProgrammableTransaction(tx) = tx.kind() else {
826 debug_fatal!("get_estimate called on non-ProgrammableTransaction");
827 return Duration::ZERO;
828 };
829 tx.commands
830 .iter()
831 .map(|command| {
832 let key = ExecutionTimeObservationKey::from_command(command);
833 self.consensus_observations
834 .get(&key)
835 .and_then(|obs| obs.stake_weighted_median)
836 .unwrap_or_else(|| key.default_duration())
837 .mul_f64(command_length(command).get() as f64)
840 })
841 .sum::<Duration>()
842 .min(Duration::from_micros(self.protocol_params.max_estimate_us))
843 }
844
845 pub fn take_observations(&mut self) -> StoredExecutionTimeObservations {
846 StoredExecutionTimeObservations::V1(
847 self.consensus_observations
848 .drain()
849 .map(|(key, observations)| {
850 let observations = observations
851 .observations
852 .into_iter()
853 .enumerate()
854 .filter_map(|(idx, (_, duration))| {
855 duration.map(|d| {
856 (
857 self.committee
858 .authority_by_index(idx.try_into().unwrap())
859 .cloned()
860 .unwrap(),
861 d,
862 )
863 })
864 })
865 .collect();
866 (key, observations)
867 })
868 .collect(),
869 )
870 }
871
872 pub fn get_observations(&self) -> Vec<(ExecutionTimeObservationKey, ConsensusObservations)> {
873 self.consensus_observations
874 .iter()
875 .map(|(key, observations)| (key.clone(), observations.clone()))
876 .collect()
877 }
878}
879
880fn command_length(command: &Command) -> NonZeroUsize {
881 NonZeroUsize::new(match command {
884 Command::MoveCall(_) => 1,
885 Command::TransferObjects(src, _) => src.len() + 1,
886 Command::SplitCoins(_, amts) => amts.len() + 1,
887 Command::MergeCoins(_, src) => src.len() + 1,
888 Command::Publish(_, _) => 1,
889 Command::MakeMoveVec(_, src) => src.len() + 1,
890 Command::Upgrade(_, _, _, _) => 1,
891 })
892 .unwrap()
893}
894
895#[cfg(test)]
896mod tests {
897 use super::*;
898 use crate::authority::test_authority_builder::TestAuthorityBuilder;
899 use crate::checkpoints::CheckpointStore;
900 use crate::consensus_adapter::{
901 ConsensusAdapter, ConsensusAdapterMetrics, MockConsensusClient,
902 };
903 use sui_protocol_config::ProtocolConfig;
904 use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress};
905 use sui_types::transaction::{
906 Argument, CallArg, ObjectArg, ProgrammableMoveCall, SharedObjectMutability,
907 };
908 use {
909 rand::{Rng, SeedableRng},
910 sui_protocol_config::ProtocolVersion,
911 sui_types::supported_protocol_versions::Chain,
912 };
913
914 #[tokio::test]
915 async fn test_record_local_observations() {
916 telemetry_subscribers::init_for_testing();
917
918 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
919 config.set_per_object_congestion_control_mode_for_testing(
920 PerObjectCongestionControlMode::ExecutionTimeEstimate(
921 ExecutionTimeEstimateParams {
922 target_utilization: 100,
923 allowed_txn_cost_overage_burst_limit_us: 0,
924 randomness_scalar: 100,
925 max_estimate_us: u64::MAX,
926 stored_observations_num_included_checkpoints: 10,
927 stored_observations_limit: u64::MAX,
928 stake_weighted_median_threshold: 0,
929 default_none_duration_for_new_keys: true,
930 observations_chunk_size: None,
931 },
932 ),
933 );
934 config
935 });
936
937 let mock_consensus_client = MockConsensusClient::new();
938 let authority = TestAuthorityBuilder::new().build().await;
939 let epoch_store = authority.epoch_store_for_testing();
940 let consensus_adapter = Arc::new(ConsensusAdapter::new(
941 Arc::new(mock_consensus_client),
942 CheckpointStore::new_for_tests(),
943 authority.name,
944 100_000,
945 100_000,
946 ConsensusAdapterMetrics::new_test(),
947 Arc::new(tokio::sync::Notify::new()),
948 ));
949 let mut observer = ExecutionTimeObserver::new_for_testing(
950 epoch_store.clone(),
951 Box::new(consensus_adapter.clone()),
952 Duration::ZERO, false, );
955
956 let package = ObjectID::random();
958 let module = "test_module".to_string();
959 let function = "test_function".to_string();
960 let ptb = ProgrammableTransaction {
961 inputs: vec![],
962 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
963 package,
964 module: module.clone(),
965 function: function.clone(),
966 type_arguments: vec![],
967 arguments: vec![],
968 }))],
969 };
970
971 let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
973 let total_duration = Duration::from_millis(110);
974 observer.record_local_observations(&ptb, &timings, total_duration, 1);
975
976 let key = ExecutionTimeObservationKey::MoveEntryPoint {
977 package,
978 module: module.clone(),
979 function: function.clone(),
980 type_arguments: vec![],
981 };
982
983 let local_obs = observer.local_observations.get(&key).unwrap();
985 assert_eq!(
986 local_obs.get_average(),
987 Duration::from_millis(110)
989 );
990 assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
991
992 let timings = vec![ExecutionTiming::Success(Duration::from_millis(110))];
994 let total_duration = Duration::from_millis(120);
995 observer.record_local_observations(&ptb, &timings, total_duration, 1);
996
997 let local_obs = observer.local_observations.get(&key).unwrap();
999 assert_eq!(
1000 local_obs.get_average(),
1001 Duration::from_millis(115)
1003 );
1004 assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1006
1007 let timings = vec![ExecutionTiming::Success(Duration::from_millis(120))];
1009 let total_duration = Duration::from_millis(130);
1010 observer.record_local_observations(&ptb, &timings, total_duration, 1);
1011
1012 let local_obs = observer.local_observations.get(&key).unwrap();
1014 assert_eq!(
1015 local_obs.get_average(),
1016 Duration::from_millis(120)
1018 );
1019 assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1022
1023 observer
1025 .local_observations
1026 .get_mut(&key)
1027 .unwrap()
1028 .last_shared = Some((
1029 Duration::from_millis(110),
1030 Instant::now() - Duration::from_secs(60),
1031 ));
1032
1033 let timings = vec![ExecutionTiming::Success(Duration::from_millis(120))];
1035 let total_duration = Duration::from_millis(160);
1036 observer.record_local_observations(&ptb, &timings, total_duration, 1);
1037
1038 let local_obs = observer.local_observations.get(&key).unwrap();
1041 assert_eq!(
1042 local_obs.get_average(),
1043 Duration::from_millis(130)
1045 );
1046 assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(130));
1047 }
1048
1049 #[tokio::test]
1050 async fn test_record_local_observations_with_gas_price_weighting() {
1051 telemetry_subscribers::init_for_testing();
1052
1053 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1054 config.set_per_object_congestion_control_mode_for_testing(
1055 PerObjectCongestionControlMode::ExecutionTimeEstimate(
1056 ExecutionTimeEstimateParams {
1057 target_utilization: 100,
1058 allowed_txn_cost_overage_burst_limit_us: 0,
1059 randomness_scalar: 100,
1060 max_estimate_us: u64::MAX,
1061 stored_observations_num_included_checkpoints: 10,
1062 stored_observations_limit: u64::MAX,
1063 stake_weighted_median_threshold: 0,
1064 default_none_duration_for_new_keys: true,
1065 observations_chunk_size: None,
1066 },
1067 ),
1068 );
1069 config
1070 });
1071
1072 let mock_consensus_client = MockConsensusClient::new();
1073 let authority = TestAuthorityBuilder::new().build().await;
1074 let epoch_store = authority.epoch_store_for_testing();
1075 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1076 Arc::new(mock_consensus_client),
1077 CheckpointStore::new_for_tests(),
1078 authority.name,
1079 100_000,
1080 100_000,
1081 ConsensusAdapterMetrics::new_test(),
1082 Arc::new(tokio::sync::Notify::new()),
1083 ));
1084 let mut observer = ExecutionTimeObserver::new_for_testing(
1085 epoch_store.clone(),
1086 Box::new(consensus_adapter.clone()),
1087 Duration::ZERO, true, );
1090
1091 let package = ObjectID::random();
1093 let module = "test_module".to_string();
1094 let function = "test_function".to_string();
1095 let ptb = ProgrammableTransaction {
1096 inputs: vec![],
1097 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1098 package,
1099 module: module.clone(),
1100 function: function.clone(),
1101 type_arguments: vec![],
1102 arguments: vec![],
1103 }))],
1104 };
1105
1106 let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
1108 let total_duration = Duration::from_millis(110);
1109 observer.record_local_observations(&ptb, &timings, total_duration, 1);
1110
1111 let key = ExecutionTimeObservationKey::MoveEntryPoint {
1112 package,
1113 module: module.clone(),
1114 function: function.clone(),
1115 type_arguments: vec![],
1116 };
1117
1118 let local_obs = observer.local_observations.get(&key).unwrap();
1120 assert_eq!(
1121 local_obs.get_average(),
1122 Duration::from_millis(110)
1124 );
1125 assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1126
1127 let timings = vec![ExecutionTiming::Success(Duration::from_millis(110))];
1129 let total_duration = Duration::from_millis(120);
1130 observer.record_local_observations(&ptb, &timings, total_duration, 2);
1131
1132 let local_obs = observer.local_observations.get(&key).unwrap();
1134 assert_eq!(
1135 local_obs.get_average(),
1136 Duration::from_micros(116_666)
1139 );
1140 }
1141
1142 #[tokio::test]
1143 async fn test_record_local_observations_with_multiple_commands() {
1144 telemetry_subscribers::init_for_testing();
1145
1146 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1147 config.set_per_object_congestion_control_mode_for_testing(
1148 PerObjectCongestionControlMode::ExecutionTimeEstimate(
1149 ExecutionTimeEstimateParams {
1150 target_utilization: 100,
1151 allowed_txn_cost_overage_burst_limit_us: 0,
1152 randomness_scalar: 0,
1153 max_estimate_us: u64::MAX,
1154 stored_observations_num_included_checkpoints: 10,
1155 stored_observations_limit: u64::MAX,
1156 stake_weighted_median_threshold: 0,
1157 default_none_duration_for_new_keys: true,
1158 observations_chunk_size: None,
1159 },
1160 ),
1161 );
1162 config
1163 });
1164
1165 let mock_consensus_client = MockConsensusClient::new();
1166 let authority = TestAuthorityBuilder::new().build().await;
1167 let epoch_store = authority.epoch_store_for_testing();
1168 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1169 Arc::new(mock_consensus_client),
1170 CheckpointStore::new_for_tests(),
1171 authority.name,
1172 100_000,
1173 100_000,
1174 ConsensusAdapterMetrics::new_test(),
1175 Arc::new(tokio::sync::Notify::new()),
1176 ));
1177 let mut observer = ExecutionTimeObserver::new_for_testing(
1178 epoch_store.clone(),
1179 Box::new(consensus_adapter.clone()),
1180 Duration::ZERO, false, );
1183
1184 let package = ObjectID::random();
1186 let module = "test_module".to_string();
1187 let function = "test_function".to_string();
1188 let ptb = ProgrammableTransaction {
1189 inputs: vec![],
1190 commands: vec![
1191 Command::MoveCall(Box::new(ProgrammableMoveCall {
1192 package,
1193 module: module.clone(),
1194 function: function.clone(),
1195 type_arguments: vec![],
1196 arguments: vec![],
1197 })),
1198 Command::TransferObjects(
1199 vec![Argument::Input(1), Argument::Input(2)],
1201 Argument::Input(0),
1202 ),
1203 ],
1204 };
1205 let timings = vec![
1206 ExecutionTiming::Success(Duration::from_millis(100)),
1207 ExecutionTiming::Success(Duration::from_millis(50)),
1208 ];
1209 let total_duration = Duration::from_millis(180);
1210 observer.record_local_observations(&ptb, &timings, total_duration, 1);
1211
1212 let move_key = ExecutionTimeObservationKey::MoveEntryPoint {
1214 package,
1215 module: module.clone(),
1216 function: function.clone(),
1217 type_arguments: vec![],
1218 };
1219 let move_obs = observer.local_observations.get(&move_key).unwrap();
1220 assert_eq!(
1221 move_obs.get_average(),
1222 Duration::from_millis(120)
1224 );
1225
1226 let transfer_obs = observer
1227 .local_observations
1228 .get(&ExecutionTimeObservationKey::TransferObjects)
1229 .unwrap();
1230 assert_eq!(
1231 transfer_obs.get_average(),
1232 Duration::from_millis(20)
1236 );
1237 }
1238
1239 #[tokio::test]
1240 async fn test_record_local_observations_with_object_utilization_threshold() {
1241 telemetry_subscribers::init_for_testing();
1242
1243 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1244 config.set_per_object_congestion_control_mode_for_testing(
1245 PerObjectCongestionControlMode::ExecutionTimeEstimate(
1246 ExecutionTimeEstimateParams {
1247 target_utilization: 100,
1248 allowed_txn_cost_overage_burst_limit_us: 0,
1249 randomness_scalar: 0,
1250 max_estimate_us: u64::MAX,
1251 stored_observations_num_included_checkpoints: 10,
1252 stored_observations_limit: u64::MAX,
1253 stake_weighted_median_threshold: 0,
1254 default_none_duration_for_new_keys: true,
1255 observations_chunk_size: None,
1256 },
1257 ),
1258 );
1259 config
1260 });
1261
1262 let mock_consensus_client = MockConsensusClient::new();
1263 let authority = TestAuthorityBuilder::new().build().await;
1264 let epoch_store = authority.epoch_store_for_testing();
1265 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1266 Arc::new(mock_consensus_client),
1267 CheckpointStore::new_for_tests(),
1268 authority.name,
1269 100_000,
1270 100_000,
1271 ConsensusAdapterMetrics::new_test(),
1272 Arc::new(tokio::sync::Notify::new()),
1273 ));
1274 let mut observer = ExecutionTimeObserver::new_for_testing(
1275 epoch_store.clone(),
1276 Box::new(consensus_adapter.clone()),
1277 Duration::from_millis(500), false, );
1280
1281 let package = ObjectID::random();
1283 let module = "test_module".to_string();
1284 let function = "test_function".to_string();
1285 let shared_object_id = ObjectID::random();
1286 let ptb = ProgrammableTransaction {
1287 inputs: vec![CallArg::Object(ObjectArg::SharedObject {
1288 id: shared_object_id,
1289 initial_shared_version: SequenceNumber::new(),
1290 mutability: SharedObjectMutability::Mutable,
1291 })],
1292 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1293 package,
1294 module: module.clone(),
1295 function: function.clone(),
1296 type_arguments: vec![],
1297 arguments: vec![],
1298 }))],
1299 };
1300 let key = ExecutionTimeObservationKey::MoveEntryPoint {
1301 package,
1302 module: module.clone(),
1303 function: function.clone(),
1304 type_arguments: vec![],
1305 };
1306
1307 tokio::time::pause();
1308
1309 let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1311 observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1312 assert!(
1313 observer
1314 .local_observations
1315 .get(&key)
1316 .unwrap()
1317 .last_shared
1318 .is_none()
1319 );
1320
1321 let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1323 observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1324 assert_eq!(
1325 observer
1326 .local_observations
1327 .get(&key)
1328 .unwrap()
1329 .last_shared
1330 .unwrap()
1331 .0,
1332 Duration::from_secs(2)
1333 );
1334
1335 tokio::time::advance(Duration::from_secs(5)).await;
1337 let timings = vec![ExecutionTiming::Success(Duration::from_secs(3))];
1338 observer.record_local_observations(&ptb, &timings, Duration::from_secs(5), 1);
1339 assert_eq!(
1340 observer
1341 .local_observations
1342 .get(&key)
1343 .unwrap()
1344 .last_shared
1345 .unwrap()
1346 .0,
1347 Duration::from_secs(3)
1348 );
1349
1350 tokio::time::advance(Duration::from_millis(150)).await;
1353 let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
1354 observer.record_local_observations(&ptb, &timings, Duration::from_millis(500), 1);
1355 assert_eq!(
1356 observer
1357 .local_observations
1358 .get(&key)
1359 .unwrap()
1360 .last_shared
1361 .unwrap()
1362 .0,
1363 Duration::from_secs(3) );
1365
1366 tokio::time::advance(Duration::from_secs(60)).await;
1368 let timings = vec![ExecutionTiming::Success(Duration::from_secs(11))];
1369 observer.record_local_observations(&ptb, &timings, Duration::from_secs(11), 1);
1370 assert_eq!(
1371 observer
1372 .local_observations
1373 .get(&key)
1374 .unwrap()
1375 .last_shared
1376 .unwrap()
1377 .0,
1378 Duration::from_secs(3) );
1380 }
1381
1382 #[tokio::test]
1383 async fn test_record_local_observations_with_indebted_objects() {
1384 telemetry_subscribers::init_for_testing();
1385
1386 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1387 config.set_per_object_congestion_control_mode_for_testing(
1388 PerObjectCongestionControlMode::ExecutionTimeEstimate(
1389 ExecutionTimeEstimateParams {
1390 target_utilization: 100,
1391 allowed_txn_cost_overage_burst_limit_us: 0,
1392 randomness_scalar: 0,
1393 max_estimate_us: u64::MAX,
1394 stored_observations_num_included_checkpoints: 10,
1395 stored_observations_limit: u64::MAX,
1396 stake_weighted_median_threshold: 0,
1397 default_none_duration_for_new_keys: true,
1398 observations_chunk_size: None,
1399 },
1400 ),
1401 );
1402 config
1403 });
1404
1405 let mock_consensus_client = MockConsensusClient::new();
1406 let authority = TestAuthorityBuilder::new().build().await;
1407 let epoch_store = authority.epoch_store_for_testing();
1408 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1409 Arc::new(mock_consensus_client),
1410 CheckpointStore::new_for_tests(),
1411 authority.name,
1412 100_000,
1413 100_000,
1414 ConsensusAdapterMetrics::new_test(),
1415 Arc::new(tokio::sync::Notify::new()),
1416 ));
1417 let mut observer = ExecutionTimeObserver::new_for_testing(
1418 epoch_store.clone(),
1419 Box::new(consensus_adapter.clone()),
1420 Duration::from_millis(500), false, );
1423
1424 let package = ObjectID::random();
1426 let module = "test_module".to_string();
1427 let function = "test_function".to_string();
1428 let shared_object_id = ObjectID::random();
1429 let ptb = ProgrammableTransaction {
1430 inputs: vec![CallArg::Object(ObjectArg::SharedObject {
1431 id: shared_object_id,
1432 initial_shared_version: SequenceNumber::new(),
1433 mutability: SharedObjectMutability::Mutable,
1434 })],
1435 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1436 package,
1437 module: module.clone(),
1438 function: function.clone(),
1439 type_arguments: vec![],
1440 arguments: vec![],
1441 }))],
1442 };
1443 let key = ExecutionTimeObservationKey::MoveEntryPoint {
1444 package,
1445 module: module.clone(),
1446 function: function.clone(),
1447 type_arguments: vec![],
1448 };
1449
1450 tokio::time::pause();
1451
1452 let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1454 observer.record_local_observations(&ptb, &timings, Duration::from_secs(1), 1);
1455 assert!(
1456 observer
1457 .local_observations
1458 .get(&key)
1459 .unwrap()
1460 .last_shared
1461 .is_none()
1462 );
1463
1464 let timings = vec![ExecutionTiming::Success(Duration::from_secs(2))];
1466 observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1467 assert_eq!(
1468 observer
1469 .local_observations
1470 .get(&key)
1471 .unwrap()
1472 .last_shared
1473 .unwrap()
1474 .0,
1475 Duration::from_millis(1500) );
1477
1478 observer.update_indebted_objects(vec![shared_object_id]);
1480 observer
1481 .config
1482 .observation_sharing_object_utilization_threshold = Some(Duration::from_secs(1000));
1483
1484 tokio::time::advance(Duration::from_secs(60)).await;
1487 let timings = vec![ExecutionTiming::Success(Duration::from_millis(300))];
1488 observer.record_local_observations(&ptb, &timings, Duration::from_millis(300), 1);
1489
1490 assert_eq!(
1493 observer
1494 .local_observations
1495 .get(&key)
1496 .unwrap()
1497 .last_shared
1498 .unwrap()
1499 .0,
1500 Duration::from_millis(1100)
1501 );
1502 }
1503
1504 #[tokio::test]
1505 async fn test_stake_weighted_median() {
1506 telemetry_subscribers::init_for_testing();
1507
1508 let (committee, _) =
1509 Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1510
1511 let params = ExecutionTimeEstimateParams {
1512 stake_weighted_median_threshold: 0,
1513 ..Default::default()
1514 };
1515
1516 let mut tracker = ConsensusObservations {
1517 observations: vec![
1518 (0, Some(Duration::from_secs(1))), (0, Some(Duration::from_secs(2))), (0, Some(Duration::from_secs(3))), (0, Some(Duration::from_secs(4))), ],
1523 stake_weighted_median: None,
1524 };
1525 tracker.update_stake_weighted_median(&committee, ¶ms);
1526 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1533
1534 let mut tracker = ConsensusObservations {
1536 observations: vec![
1537 (0, Some(Duration::from_secs(3))), (0, Some(Duration::from_secs(4))), (0, Some(Duration::from_secs(1))), (0, Some(Duration::from_secs(2))), ],
1542 stake_weighted_median: None,
1543 };
1544 tracker.update_stake_weighted_median(&committee, ¶ms);
1545 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(2)));
1552
1553 let mut tracker = ConsensusObservations {
1555 observations: vec![
1556 (0, Some(Duration::from_secs(1))), (0, None), (0, Some(Duration::from_secs(3))), (0, Some(Duration::from_secs(4))), ],
1561 stake_weighted_median: None,
1562 };
1563 tracker.update_stake_weighted_median(&committee, ¶ms);
1564 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(4)));
1570
1571 let mut tracker = ConsensusObservations {
1573 observations: vec![
1574 (0, Some(Duration::from_secs(1))), (0, Some(Duration::from_secs(2))), (0, None), (0, None), ],
1579 stake_weighted_median: None,
1580 };
1581 tracker.update_stake_weighted_median(&committee, ¶ms);
1582 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(2)));
1587
1588 let mut tracker = ConsensusObservations {
1590 observations: vec![
1591 (0, None), (0, None), (0, Some(Duration::from_secs(3))), (0, None), ],
1596 stake_weighted_median: None,
1597 };
1598 tracker.update_stake_weighted_median(&committee, ¶ms);
1599 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1601
1602 let mut tracker = ConsensusObservations {
1604 observations: vec![
1605 (0, Some(Duration::from_secs(5))), (0, Some(Duration::from_secs(5))), (0, Some(Duration::from_secs(5))), (0, Some(Duration::from_secs(5))), ],
1610 stake_weighted_median: None,
1611 };
1612 tracker.update_stake_weighted_median(&committee, ¶ms);
1613 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(5)));
1614 }
1615
1616 #[tokio::test]
1617 async fn test_stake_weighted_median_threshold() {
1618 telemetry_subscribers::init_for_testing();
1619
1620 let (committee, _) =
1621 Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1622
1623 let params = ExecutionTimeEstimateParams {
1625 stake_weighted_median_threshold: 5000,
1626 ..Default::default()
1627 };
1628
1629 let mut tracker = ConsensusObservations {
1631 observations: vec![
1632 (0, Some(Duration::from_secs(1))), (0, Some(Duration::from_secs(2))), (0, None), (0, None), ],
1637 stake_weighted_median: None,
1638 };
1639 tracker.update_stake_weighted_median(&committee, ¶ms);
1640 assert_eq!(tracker.stake_weighted_median, None);
1642
1643 let mut tracker = ConsensusObservations {
1645 observations: vec![
1646 (0, Some(Duration::from_secs(1))), (0, Some(Duration::from_secs(2))), (0, Some(Duration::from_secs(3))), (0, None), ],
1651 stake_weighted_median: None,
1652 };
1653 tracker.update_stake_weighted_median(&committee, ¶ms);
1654 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1656 }
1657
1658 #[tokio::test]
1659 async fn test_execution_time_estimator() {
1660 telemetry_subscribers::init_for_testing();
1661
1662 let (committee, _) =
1663 Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1664 let mut estimator = ExecutionTimeEstimator::new(
1665 Arc::new(committee),
1666 ExecutionTimeEstimateParams {
1667 target_utilization: 50,
1668 max_estimate_us: 1_500_000,
1669
1670 allowed_txn_cost_overage_burst_limit_us: 0,
1672 randomness_scalar: 0,
1673 stored_observations_num_included_checkpoints: 10,
1674 stored_observations_limit: u64::MAX,
1675 stake_weighted_median_threshold: 0,
1676 default_none_duration_for_new_keys: true,
1677 observations_chunk_size: None,
1678 },
1679 std::iter::empty(),
1680 );
1681 let package = ObjectID::random();
1683 let module = "test_module".to_string();
1684 let function = "test_function".to_string();
1685 let move_key = ExecutionTimeObservationKey::MoveEntryPoint {
1686 package,
1687 module: module.clone(),
1688 function: function.clone(),
1689 type_arguments: vec![],
1690 };
1691 let transfer_key = ExecutionTimeObservationKey::TransferObjects;
1692
1693 estimator.process_observation_from_consensus(
1696 0,
1697 Some(1),
1698 move_key.clone(),
1699 Duration::from_millis(1000),
1700 false,
1701 );
1702 estimator.process_observation_from_consensus(
1703 1,
1704 Some(1),
1705 move_key.clone(),
1706 Duration::from_millis(1000),
1707 false,
1708 );
1709 estimator.process_observation_from_consensus(
1710 2,
1711 Some(1),
1712 move_key.clone(),
1713 Duration::from_millis(1000),
1714 false,
1715 );
1716
1717 estimator.process_observation_from_consensus(
1718 0,
1719 Some(1),
1720 transfer_key.clone(),
1721 Duration::from_millis(500),
1722 false,
1723 );
1724 estimator.process_observation_from_consensus(
1725 1,
1726 Some(1),
1727 transfer_key.clone(),
1728 Duration::from_millis(500),
1729 false,
1730 );
1731 estimator.process_observation_from_consensus(
1732 2,
1733 Some(1),
1734 transfer_key.clone(),
1735 Duration::from_millis(500),
1736 false,
1737 );
1738
1739 estimator.process_observation_from_consensus(
1741 0,
1742 Some(2),
1743 move_key.clone(),
1744 Duration::from_millis(100),
1745 false,
1746 );
1747 estimator.process_observation_from_consensus(
1748 1,
1749 Some(2),
1750 move_key.clone(),
1751 Duration::from_millis(200),
1752 false,
1753 );
1754 estimator.process_observation_from_consensus(
1755 2,
1756 Some(2),
1757 move_key.clone(),
1758 Duration::from_millis(300),
1759 false,
1760 );
1761
1762 estimator.process_observation_from_consensus(
1763 0,
1764 Some(2),
1765 transfer_key.clone(),
1766 Duration::from_millis(50),
1767 false,
1768 );
1769 estimator.process_observation_from_consensus(
1770 1,
1771 Some(2),
1772 transfer_key.clone(),
1773 Duration::from_millis(60),
1774 false,
1775 );
1776 estimator.process_observation_from_consensus(
1777 2,
1778 Some(2),
1779 transfer_key.clone(),
1780 Duration::from_millis(70),
1781 false,
1782 );
1783
1784 estimator.process_observation_from_consensus(
1786 0,
1787 Some(1),
1788 move_key.clone(),
1789 Duration::from_millis(1000),
1790 false,
1791 );
1792 estimator.process_observation_from_consensus(
1793 1,
1794 Some(1),
1795 transfer_key.clone(),
1796 Duration::from_millis(500),
1797 false,
1798 );
1799 estimator.process_observation_from_consensus(
1800 2,
1801 Some(1),
1802 move_key.clone(),
1803 Duration::from_millis(1000),
1804 false,
1805 );
1806
1807 let single_move_tx = TransactionData::new_programmable(
1809 SuiAddress::ZERO,
1810 vec![],
1811 ProgrammableTransaction {
1812 inputs: vec![],
1813 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1814 package,
1815 module: module.clone(),
1816 function: function.clone(),
1817 type_arguments: vec![],
1818 arguments: vec![],
1819 }))],
1820 },
1821 100,
1822 100,
1823 );
1824
1825 assert_eq!(
1827 estimator.get_estimate(&single_move_tx),
1828 Duration::from_millis(300)
1829 );
1830
1831 let multi_command_tx = TransactionData::new_programmable(
1833 SuiAddress::ZERO,
1834 vec![],
1835 ProgrammableTransaction {
1836 inputs: vec![],
1837 commands: vec![
1838 Command::MoveCall(Box::new(ProgrammableMoveCall {
1839 package,
1840 module: module.clone(),
1841 function: function.clone(),
1842 type_arguments: vec![],
1843 arguments: vec![],
1844 })),
1845 Command::TransferObjects(
1846 vec![Argument::Input(1), Argument::Input(2)],
1847 Argument::Input(0),
1848 ),
1849 ],
1850 },
1851 100,
1852 100,
1853 );
1854
1855 assert_eq!(
1858 estimator.get_estimate(&multi_command_tx),
1859 Duration::from_millis(510)
1860 );
1861 }
1862
1863 #[derive(Debug, Clone, Serialize, Deserialize)]
1864 struct ExecutionTimeObserverSnapshot {
1865 protocol_version: u64,
1866 consensus_observations: Vec<(ExecutionTimeObservationKey, ConsensusObservations)>,
1867 transaction_estimates: Vec<(String, Duration)>, }
1869
1870 fn generate_test_inputs(
1871 seed: u64,
1872 num_validators: usize,
1873 generation_override: Option<u64>,
1874 ) -> Vec<(
1875 AuthorityIndex,
1876 Option<u64>,
1877 ExecutionTimeObservationKey,
1878 Duration,
1879 )> {
1880 let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
1881
1882 let observation_keys = [
1883 ExecutionTimeObservationKey::MoveEntryPoint {
1884 package: ObjectID::from_hex_literal("0x1").unwrap(),
1885 module: "coin".to_string(),
1886 function: "transfer".to_string(),
1887 type_arguments: vec![],
1888 },
1889 ExecutionTimeObservationKey::MoveEntryPoint {
1890 package: ObjectID::from_hex_literal("0x2").unwrap(),
1891 module: "nft".to_string(),
1892 function: "mint".to_string(),
1893 type_arguments: vec![],
1894 },
1895 ExecutionTimeObservationKey::TransferObjects,
1896 ExecutionTimeObservationKey::SplitCoins,
1897 ExecutionTimeObservationKey::MergeCoins,
1898 ExecutionTimeObservationKey::MakeMoveVec,
1899 ExecutionTimeObservationKey::Upgrade,
1900 ];
1901
1902 let mut inputs = Vec::new();
1903 let target_samples = 25;
1904
1905 for _ in 0..target_samples {
1906 let key = observation_keys[rng.gen_range(0..observation_keys.len())].clone();
1907 let authority_index =
1908 AuthorityIndex::try_from(rng.gen_range(0..num_validators)).unwrap();
1909
1910 let generation = generation_override.unwrap_or_else(|| rng.gen_range(1..=10));
1912
1913 let base_duration = if rng.gen_ratio(1, 20) {
1916 0
1918 } else {
1919 match &key {
1920 ExecutionTimeObservationKey::MoveEntryPoint { .. } => rng.gen_range(50..=500),
1921 ExecutionTimeObservationKey::TransferObjects => rng.gen_range(10..=100),
1922 ExecutionTimeObservationKey::SplitCoins => rng.gen_range(20..=80),
1923 ExecutionTimeObservationKey::MergeCoins => rng.gen_range(15..=70),
1924 ExecutionTimeObservationKey::MakeMoveVec => rng.gen_range(5..=30),
1925 ExecutionTimeObservationKey::Upgrade => rng.gen_range(100..=1000),
1926 ExecutionTimeObservationKey::Publish => rng.gen_range(200..=2000),
1927 }
1928 };
1929
1930 let duration = Duration::from_millis(base_duration);
1931
1932 inputs.push((authority_index, Some(generation), key, duration));
1933 }
1934
1935 inputs
1936 }
1937
1938 fn generate_test_transactions(seed: u64) -> Vec<(String, TransactionData)> {
1939 let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
1940 let mut transactions = Vec::new();
1941
1942 let package3 = ObjectID::from_hex_literal("0x3").unwrap();
1943 transactions.push((
1944 "coin_transfer_call".to_string(),
1945 TransactionData::new_programmable(
1946 SuiAddress::ZERO,
1947 vec![],
1948 ProgrammableTransaction {
1949 inputs: vec![],
1950 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1951 package: ObjectID::from_hex_literal("0x1").unwrap(),
1952 module: "coin".to_string(),
1953 function: "transfer".to_string(),
1954 type_arguments: vec![],
1955 arguments: vec![],
1956 }))],
1957 },
1958 rng.gen_range(100..1000),
1959 rng.gen_range(100..1000),
1960 ),
1961 ));
1962
1963 transactions.push((
1964 "mixed_move_calls".to_string(),
1965 TransactionData::new_programmable(
1966 SuiAddress::ZERO,
1967 vec![],
1968 ProgrammableTransaction {
1969 inputs: vec![],
1970 commands: vec![
1971 Command::MoveCall(Box::new(ProgrammableMoveCall {
1972 package: ObjectID::from_hex_literal("0x1").unwrap(),
1973 module: "coin".to_string(),
1974 function: "transfer".to_string(),
1975 type_arguments: vec![],
1976 arguments: vec![],
1977 })),
1978 Command::MoveCall(Box::new(ProgrammableMoveCall {
1979 package: ObjectID::from_hex_literal("0x2").unwrap(),
1980 module: "nft".to_string(),
1981 function: "mint".to_string(),
1982 type_arguments: vec![],
1983 arguments: vec![],
1984 })),
1985 ],
1986 },
1987 rng.gen_range(100..1000),
1988 rng.gen_range(100..1000),
1989 ),
1990 ));
1991
1992 transactions.push((
1993 "native_commands_with_observations".to_string(),
1994 TransactionData::new_programmable(
1995 SuiAddress::ZERO,
1996 vec![],
1997 ProgrammableTransaction {
1998 inputs: vec![],
1999 commands: vec![
2000 Command::TransferObjects(vec![Argument::Input(0)], Argument::Input(1)),
2001 Command::SplitCoins(Argument::Input(2), vec![Argument::Input(3)]),
2002 Command::MergeCoins(Argument::Input(4), vec![Argument::Input(5)]),
2003 Command::MakeMoveVec(None, vec![Argument::Input(6)]),
2004 ],
2005 },
2006 rng.gen_range(100..1000),
2007 rng.gen_range(100..1000),
2008 ),
2009 ));
2010
2011 let num_objects = rng.gen_range(1..=5);
2012 transactions.push((
2013 format!("transfer_objects_{}_items", num_objects),
2014 TransactionData::new_programmable(
2015 SuiAddress::ZERO,
2016 vec![],
2017 ProgrammableTransaction {
2018 inputs: vec![],
2019 commands: vec![Command::TransferObjects(
2020 (0..num_objects).map(Argument::Input).collect(),
2021 Argument::Input(num_objects),
2022 )],
2023 },
2024 rng.gen_range(100..1000),
2025 rng.gen_range(100..1000),
2026 ),
2027 ));
2028
2029 let num_amounts = rng.gen_range(1..=4);
2030 transactions.push((
2031 format!("split_coins_{}_amounts", num_amounts),
2032 TransactionData::new_programmable(
2033 SuiAddress::ZERO,
2034 vec![],
2035 ProgrammableTransaction {
2036 inputs: vec![],
2037 commands: vec![Command::SplitCoins(
2038 Argument::Input(0),
2039 (1..=num_amounts).map(Argument::Input).collect(),
2040 )],
2041 },
2042 rng.gen_range(100..1000),
2043 rng.gen_range(100..1000),
2044 ),
2045 ));
2046
2047 let num_sources = rng.gen_range(1..=3);
2048 transactions.push((
2049 format!("merge_coins_{}_sources", num_sources),
2050 TransactionData::new_programmable(
2051 SuiAddress::ZERO,
2052 vec![],
2053 ProgrammableTransaction {
2054 inputs: vec![],
2055 commands: vec![Command::MergeCoins(
2056 Argument::Input(0),
2057 (1..=num_sources).map(Argument::Input).collect(),
2058 )],
2059 },
2060 rng.gen_range(100..1000),
2061 rng.gen_range(100..1000),
2062 ),
2063 ));
2064
2065 let num_elements = rng.gen_range(0..=6);
2066 transactions.push((
2067 format!("make_move_vec_{}_elements", num_elements),
2068 TransactionData::new_programmable(
2069 SuiAddress::ZERO,
2070 vec![],
2071 ProgrammableTransaction {
2072 inputs: vec![],
2073 commands: vec![Command::MakeMoveVec(
2074 None,
2075 (0..num_elements).map(Argument::Input).collect(),
2076 )],
2077 },
2078 rng.gen_range(100..1000),
2079 rng.gen_range(100..1000),
2080 ),
2081 ));
2082
2083 transactions.push((
2084 "mixed_commands".to_string(),
2085 TransactionData::new_programmable(
2086 SuiAddress::ZERO,
2087 vec![],
2088 ProgrammableTransaction {
2089 inputs: vec![],
2090 commands: vec![
2091 Command::MoveCall(Box::new(ProgrammableMoveCall {
2092 package: package3,
2093 module: "game".to_string(),
2094 function: "play".to_string(),
2095 type_arguments: vec![],
2096 arguments: vec![],
2097 })),
2098 Command::TransferObjects(
2099 vec![Argument::Input(1), Argument::Input(2)],
2100 Argument::Input(0),
2101 ),
2102 Command::SplitCoins(Argument::Input(3), vec![Argument::Input(4)]),
2103 ],
2104 },
2105 rng.gen_range(100..1000),
2106 rng.gen_range(100..1000),
2107 ),
2108 ));
2109
2110 transactions.push((
2111 "upgrade_package".to_string(),
2112 TransactionData::new_programmable(
2113 SuiAddress::ZERO,
2114 vec![],
2115 ProgrammableTransaction {
2116 inputs: vec![],
2117 commands: vec![Command::Upgrade(
2118 vec![],
2119 vec![],
2120 package3,
2121 Argument::Input(0),
2122 )],
2123 },
2124 rng.gen_range(100..1000),
2125 rng.gen_range(100..1000),
2126 ),
2127 ));
2128
2129 transactions
2130 }
2131
2132 #[test]
2144 fn snapshot_tests() {
2145 println!("\n============================================================================");
2146 println!("! !");
2147 println!("! IMPORTANT: never update snapshots from this test. only add new versions! !");
2148 println!("! !");
2149 println!("============================================================================\n");
2150
2151 let max_version = ProtocolVersion::MAX.as_u64();
2152
2153 let test_versions: Vec<u64> = (max_version.saturating_sub(9)..=max_version).collect();
2154
2155 for version in test_versions {
2156 let protocol_version = ProtocolVersion::new(version);
2157 let protocol_config = ProtocolConfig::get_for_version(protocol_version, Chain::Unknown);
2158 let (committee, _) = Committee::new_simple_test_committee_of_size(4);
2159 let committee = Arc::new(committee);
2160
2161 let initial_generation =
2162 if let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) =
2163 protocol_config.per_object_congestion_control_mode()
2164 {
2165 if params.default_none_duration_for_new_keys {
2166 None
2167 } else {
2168 Some(0)
2169 }
2170 } else {
2171 Some(0) };
2173
2174 let initial_observations =
2175 generate_test_inputs(0, committee.num_members(), initial_generation);
2176 let mut estimator = ExecutionTimeEstimator::new(
2177 committee.clone(),
2178 ExecutionTimeEstimateParams {
2179 max_estimate_us: u64::MAX, ..ExecutionTimeEstimateParams::default()
2181 },
2182 initial_observations.into_iter(),
2183 );
2184
2185 let test_inputs = generate_test_inputs(version, committee.num_members(), None);
2186
2187 for (source, generation, observation_key, duration) in test_inputs {
2188 estimator.process_observation_from_consensus(
2189 source,
2190 generation,
2191 observation_key,
2192 duration,
2193 false,
2194 );
2195 }
2196
2197 let mut final_observations = estimator.get_observations();
2198 final_observations.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
2199
2200 let test_transactions = generate_test_transactions(version);
2201 let mut transaction_estimates = Vec::new();
2202 for (description, tx_data) in test_transactions {
2203 let estimate = estimator.get_estimate(&tx_data);
2204 transaction_estimates.push((description, estimate));
2205 }
2206
2207 let snapshot_data = ExecutionTimeObserverSnapshot {
2208 protocol_version: version,
2209 consensus_observations: final_observations.clone(),
2210 transaction_estimates,
2211 };
2212 insta::assert_yaml_snapshot!(
2213 format!("execution_time_observer_v{}", version),
2214 snapshot_data
2215 );
2216 }
2217 }
2218}