1use std::{
5 collections::HashMap,
6 hash::{Hash, Hasher},
7 num::NonZeroUsize,
8 sync::{Arc, Weak},
9 time::{Duration, SystemTime},
10};
11
12use serde::{Deserialize, Serialize};
13
14use super::authority_per_epoch_store::AuthorityPerEpochStore;
15use super::weighted_moving_average::WeightedMovingAverage;
16use crate::consensus_adapter::SubmitToConsensus;
17use governor::{Quota, RateLimiter, clock::MonotonicClock};
18use itertools::Itertools;
19use lru::LruCache;
20#[cfg(not(msim))]
21use mysten_common::in_antithesis;
22use mysten_common::{assert_reachable, debug_fatal, in_test_configuration};
23use mysten_metrics::{monitored_scope, spawn_monitored_task};
24use rand::{Rng, SeedableRng, random, rngs, thread_rng};
25use simple_moving_average::{SMA, SingleSumSMA};
26use sui_config::node::ExecutionTimeObserverConfig;
27use sui_protocol_config::{ExecutionTimeEstimateParams, PerObjectCongestionControlMode};
28use sui_types::{
29 base_types::ObjectID,
30 committee::Committee,
31 error::SuiErrorKind,
32 execution::{ExecutionTimeObservationKey, ExecutionTiming},
33 messages_consensus::{AuthorityIndex, ConsensusTransaction, ExecutionTimeObservation},
34 transaction::{
35 Command, ProgrammableTransaction, StoredExecutionTimeObservations, TransactionData,
36 TransactionDataAPI, TransactionKind,
37 },
38};
39use tokio::{sync::mpsc, time::Instant};
40use tracing::{debug, info, trace, warn};
41
42const SMA_LOCAL_OBSERVATION_WINDOW_SIZE: usize = 20;
45const OBJECT_UTILIZATION_METRIC_HASH_MODULUS: u8 = 32;
46
47#[cfg(not(msim))]
54fn antithesis_enable_injecting_synthetic_execution_time() -> bool {
55 use std::sync::OnceLock;
56 static ENABLE_INJECTION: OnceLock<bool> = OnceLock::new();
57 *ENABLE_INJECTION.get_or_init(|| {
58 if !in_antithesis() {
59 return false;
60 }
61
62 std::env::var("ANTITHESIS_ENABLE_EXECUTION_TIME_INJECTION")
63 .map(|v| v.to_lowercase() == "true" || v == "1")
64 .unwrap_or(true)
65 })
66}
67
68pub struct ExecutionTimeObserver {
70 epoch_store: Weak<AuthorityPerEpochStore>,
71 consensus_adapter: Box<dyn SubmitToConsensus>,
72
73 protocol_params: ExecutionTimeEstimateParams,
74 config: ExecutionTimeObserverConfig,
75
76 local_observations: LruCache<ExecutionTimeObservationKey, LocalObservations>,
77
78 object_utilization_tracker: LruCache<ObjectID, ObjectUtilization>,
82
83 indebted_objects: Vec<ObjectID>,
85
86 sharing_rate_limiter: RateLimiter<
87 governor::state::NotKeyed,
88 governor::state::InMemoryState,
89 governor::clock::MonotonicClock,
90 governor::middleware::NoOpMiddleware<
91 <governor::clock::MonotonicClock as governor::clock::Clock>::Instant,
92 >,
93 >,
94
95 next_generation_number: u64,
96}
97
98#[derive(Debug, Clone)]
99pub struct LocalObservations {
100 moving_average: SingleSumSMA<Duration, u32, SMA_LOCAL_OBSERVATION_WINDOW_SIZE>,
101 weighted_moving_average: WeightedMovingAverage,
102 last_shared: Option<(Duration, Instant)>,
103 config: ExecutionTimeObserverConfig,
104}
105
106impl LocalObservations {
107 fn new(config: ExecutionTimeObserverConfig, default_duration: Duration) -> Self {
108 let window_size = config.weighted_moving_average_window_size();
109 Self {
110 moving_average: SingleSumSMA::from_zero(Duration::ZERO),
111 weighted_moving_average: WeightedMovingAverage::new(
112 default_duration.as_micros() as u64,
113 window_size,
114 ),
115 last_shared: None,
116 config,
117 }
118 }
119
120 fn add_sample(&mut self, duration: Duration, gas_price: u64) {
121 self.moving_average.add_sample(duration);
122 self.weighted_moving_average
123 .add_sample(duration.as_micros() as u64, gas_price);
124 }
125
126 fn get_average(&self) -> Duration {
127 if self.config.enable_gas_price_weighting() {
128 Duration::from_micros(self.weighted_moving_average.get_weighted_average())
129 } else {
130 self.moving_average.get_average()
131 }
132 }
133
134 fn diff_exceeds_threshold(
135 &self,
136 new_average: Duration,
137 threshold: f64,
138 min_interval: Duration,
139 ) -> bool {
140 let Some((last_shared, last_shared_timestamp)) = self.last_shared else {
141 return true;
143 };
144
145 if last_shared_timestamp.elapsed() < min_interval {
146 return false;
147 }
148
149 if threshold >= 0.0 {
150 new_average
152 .checked_sub(last_shared)
153 .is_some_and(|diff| diff > last_shared.mul_f64(threshold))
154 } else {
155 last_shared
157 .checked_sub(new_average)
158 .is_some_and(|diff| diff > last_shared.mul_f64(-threshold))
159 }
160 }
161}
162
163#[derive(Debug, Clone)]
164pub struct ObjectUtilization {
165 excess_execution_time: Duration,
166 last_measured: Option<Instant>,
167 was_overutilized: bool, }
169
170impl ObjectUtilization {
171 pub fn overutilized(&self, config: &ExecutionTimeObserverConfig) -> bool {
172 self.excess_execution_time > config.observation_sharing_object_utilization_threshold()
173 }
174}
175
176impl ExecutionTimeObserver {
178 pub fn spawn(
179 epoch_store: Arc<AuthorityPerEpochStore>,
180 consensus_adapter: Box<dyn SubmitToConsensus>,
181 config: ExecutionTimeObserverConfig,
182 ) {
183 let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) = epoch_store
184 .protocol_config()
185 .per_object_congestion_control_mode()
186 else {
187 info!(
188 "ExecutionTimeObserver disabled because per-object congestion control mode is not ExecutionTimeEstimate"
189 );
190 return;
191 };
192
193 let (tx_local_execution_time, mut rx_local_execution_time) =
194 mpsc::channel(config.observation_channel_capacity().into());
195 let (tx_object_debts, mut rx_object_debts) =
196 mpsc::channel(config.object_debt_channel_capacity().into());
197 epoch_store.set_local_execution_time_channels(tx_local_execution_time, tx_object_debts);
198
199 let mut observer = Self {
201 epoch_store: Arc::downgrade(&epoch_store),
202 consensus_adapter,
203 local_observations: LruCache::new(config.observation_cache_size()),
204 object_utilization_tracker: LruCache::new(config.object_utilization_cache_size()),
205 indebted_objects: Vec::new(),
206 sharing_rate_limiter: RateLimiter::direct_with_clock(
207 Quota::per_second(config.observation_sharing_rate_limit())
208 .allow_burst(config.observation_sharing_burst_limit()),
209 &MonotonicClock,
210 ),
211 protocol_params,
212 config,
213 next_generation_number: SystemTime::now()
214 .duration_since(std::time::UNIX_EPOCH)
215 .expect("Sui did not exist prior to 1970")
216 .as_micros()
217 .try_into()
218 .expect("This build of sui is not supported in the year 500,000"),
219 };
220 spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
221 loop {
222 tokio::select! {
223 Some(object_debts) = rx_object_debts.recv() => {
225 observer.update_indebted_objects(object_debts);
226 }
227 Some((tx, timings, total_duration, gas_price)) = rx_local_execution_time.recv() => {
228 observer
229 .record_local_observations(&tx, &timings, total_duration, gas_price);
230 }
231 else => { break }
232 }
233 }
234 info!("shutting down ExecutionTimeObserver");
235 }));
236 }
237
238 #[cfg(test)]
239 fn new_for_testing(
240 epoch_store: Arc<AuthorityPerEpochStore>,
241 consensus_adapter: Box<dyn SubmitToConsensus>,
242 observation_sharing_object_utilization_threshold: Duration,
243 enable_gas_price_weighting: bool,
244 ) -> Self {
245 let PerObjectCongestionControlMode::ExecutionTimeEstimate(protocol_params) = epoch_store
246 .protocol_config()
247 .per_object_congestion_control_mode()
248 else {
249 panic!(
250 "tried to construct test ExecutionTimeObserver when congestion control mode is not ExecutionTimeEstimate"
251 );
252 };
253 Self {
254 epoch_store: Arc::downgrade(&epoch_store),
255 consensus_adapter,
256 protocol_params,
257 config: ExecutionTimeObserverConfig {
258 observation_sharing_object_utilization_threshold: Some(
259 observation_sharing_object_utilization_threshold,
260 ),
261 enable_gas_price_weighting: Some(enable_gas_price_weighting),
262 ..ExecutionTimeObserverConfig::default()
263 },
264 local_observations: LruCache::new(NonZeroUsize::new(10000).unwrap()),
265 object_utilization_tracker: LruCache::new(NonZeroUsize::new(50000).unwrap()),
266 indebted_objects: Vec::new(),
267 sharing_rate_limiter: RateLimiter::direct_with_clock(
268 Quota::per_hour(std::num::NonZeroU32::MAX),
269 &MonotonicClock,
270 ),
271 next_generation_number: SystemTime::now()
272 .duration_since(std::time::UNIX_EPOCH)
273 .expect("Sui did not exist prior to 1970")
274 .as_micros()
275 .try_into()
276 .expect("This build of sui is not supported in the year 500,000"),
277 }
278 }
279
280 fn record_local_observations(
285 &mut self,
286 tx: &ProgrammableTransaction,
287 timings: &[ExecutionTiming],
288 total_duration: Duration,
289 gas_price: u64,
290 ) {
291 let _scope = monitored_scope("ExecutionTimeObserver::record_local_observations");
292
293 #[cfg(msim)]
295 let should_inject = self.config.inject_synthetic_execution_time();
296 #[cfg(not(msim))]
297 let should_inject = antithesis_enable_injecting_synthetic_execution_time();
298
299 if should_inject {
300 let (generated_timings, generated_duration) = self.generate_test_timings(tx, timings);
301 self.record_local_observations_timing(
302 tx,
303 &generated_timings,
304 generated_duration,
305 gas_price,
306 )
307 } else {
308 self.record_local_observations_timing(tx, timings, total_duration, gas_price)
309 }
310 }
311
312 fn record_local_observations_timing(
313 &mut self,
314 tx: &ProgrammableTransaction,
315 timings: &[ExecutionTiming],
316 total_duration: Duration,
317 gas_price: u64,
318 ) {
319 assert!(tx.commands.len() >= timings.len());
320
321 let Some(epoch_store) = self.epoch_store.upgrade() else {
322 debug!("epoch is ending, dropping execution time observation");
323 return;
324 };
325
326 let mut uses_indebted_object = false;
327
328 let max_excess_per_object_execution_time = tx
331 .shared_input_objects()
332 .filter_map(|obj| obj.is_accessed_exclusively().then_some(obj.id))
333 .map(|id| {
334 if !uses_indebted_object && self.indebted_objects.binary_search(&id).is_ok() {
336 uses_indebted_object = true;
337 }
338
339 let now = Instant::now();
349 let utilization =
350 self.object_utilization_tracker
351 .get_or_insert_mut(id, || ObjectUtilization {
352 excess_execution_time: Duration::ZERO,
353 last_measured: None,
354 was_overutilized: false,
355 });
356 let overutilized_at_start = utilization.overutilized(&self.config);
357 utilization.excess_execution_time += total_duration;
358 utilization.excess_execution_time =
359 utilization.excess_execution_time.saturating_sub(
360 utilization
361 .last_measured
362 .map(|last_measured| {
363 now.duration_since(last_measured)
364 .mul_f64(self.protocol_params.target_utilization as f64 / 100.0)
365 })
366 .unwrap_or(Duration::MAX),
367 );
368 utilization.last_measured = Some(now);
369 if utilization.overutilized(&self.config) {
370 utilization.was_overutilized = true;
371 }
372
373 if !overutilized_at_start && utilization.overutilized(&self.config) {
375 trace!("object {id:?} is overutilized");
376 epoch_store
377 .metrics
378 .epoch_execution_time_observer_overutilized_objects
379 .inc();
380 } else if overutilized_at_start && !utilization.overutilized(&self.config) {
381 epoch_store
382 .metrics
383 .epoch_execution_time_observer_overutilized_objects
384 .dec();
385 }
386 if utilization.was_overutilized {
387 let key = if self.config.report_object_utilization_metric_with_full_id() {
388 id.to_string()
389 } else {
390 let key_lsb = id.into_bytes()[ObjectID::LENGTH - 1];
391 let hash = key_lsb % OBJECT_UTILIZATION_METRIC_HASH_MODULUS;
392 format!("{:x}", hash)
393 };
394
395 epoch_store
396 .metrics
397 .epoch_execution_time_observer_object_utilization
398 .with_label_values(&[key.as_str()])
399 .inc_by(total_duration.as_secs_f64());
400 }
401
402 utilization.excess_execution_time
403 })
404 .max()
405 .unwrap_or(Duration::ZERO);
406 epoch_store
407 .metrics
408 .epoch_execution_time_observer_utilization_cache_size
409 .set(self.object_utilization_tracker.len() as i64);
410
411 let total_command_duration: Duration = timings.iter().map(|t| t.duration()).sum();
412 let extra_overhead = total_duration - total_command_duration;
413
414 let mut to_share = Vec::with_capacity(tx.commands.len());
415 for (i, timing) in timings.iter().enumerate() {
416 let command = &tx.commands[i];
417
418 if matches!(command, Command::Publish(_, _)) {
420 continue;
421 }
422
423 let mut command_duration = timing.duration();
425
426 let overhead_factor = if total_command_duration > Duration::ZERO {
428 command_duration.as_secs_f64() / total_command_duration.as_secs_f64()
429 } else {
430 1.0 / (tx.commands.len() as f64)
432 };
433 command_duration += extra_overhead.mul_f64(overhead_factor);
434
435 command_duration = command_duration.div_f64(command_length(command).get() as f64);
438
439 let key = ExecutionTimeObservationKey::from_command(command);
441 let local_observation = self.local_observations.get_or_insert_mut(key.clone(), || {
442 LocalObservations::new(self.config.clone(), Duration::ZERO)
443 });
444 local_observation.add_sample(command_duration, gas_price);
445
446 let new_average = local_observation.get_average();
451 let mut should_share = false;
452
453 if max_excess_per_object_execution_time
455 >= self
456 .config
457 .observation_sharing_object_utilization_threshold()
458 && local_observation.diff_exceeds_threshold(
459 new_average,
460 self.config.observation_sharing_diff_threshold(),
461 self.config.observation_sharing_min_interval(),
462 )
463 {
464 should_share = true;
465 epoch_store
466 .metrics
467 .epoch_execution_time_observations_sharing_reason
468 .with_label_values(&["utilization"])
469 .inc();
470 };
471
472 if uses_indebted_object
474 && local_observation.diff_exceeds_threshold(
475 new_average,
476 -self.config.observation_sharing_diff_threshold(),
477 self.config.observation_sharing_min_interval(),
478 )
479 {
480 should_share = true;
481 epoch_store
482 .metrics
483 .epoch_execution_time_observations_sharing_reason
484 .with_label_values(&["indebted"])
485 .inc();
486 }
487
488 if should_share {
489 debug!("sharing new execution time observation for {key:?}: {new_average:?}");
490 to_share.push((key, new_average));
491 local_observation.last_shared = Some((new_average, Instant::now()));
492 }
493 }
494
495 self.share_observations(to_share);
497 }
498
499 fn generate_test_timings(
500 &self,
501 tx: &ProgrammableTransaction,
502 timings: &[ExecutionTiming],
503 ) -> (Vec<ExecutionTiming>, Duration) {
504 let generated_timings: Vec<_> = tx
505 .commands
506 .iter()
507 .zip(timings.iter())
508 .map(|(command, timing)| {
509 let key = ExecutionTimeObservationKey::from_command(command);
510 let duration = self.get_test_duration(&key);
511 if timing.is_abort() {
512 ExecutionTiming::Abort(duration)
513 } else {
514 ExecutionTiming::Success(duration)
515 }
516 })
517 .collect();
518
519 let total_duration = generated_timings
520 .iter()
521 .map(|t| t.duration())
522 .sum::<Duration>()
523 + thread_rng().gen_range(Duration::from_millis(10)..Duration::from_millis(50));
524
525 (generated_timings, total_duration)
526 }
527
528 fn get_test_duration(&self, key: &ExecutionTimeObservationKey) -> Duration {
529 #[cfg(msim)]
530 let should_inject = self.config.inject_synthetic_execution_time();
531 #[cfg(not(msim))]
532 let should_inject = false;
533
534 if !in_test_configuration() && !should_inject {
535 panic!("get_test_duration called in non-test configuration");
536 }
537
538 thread_local! {
539 static PER_TEST_SEED: u64 = random::<u64>();
540 }
541
542 let mut hasher = std::collections::hash_map::DefaultHasher::new();
543
544 let checkpoint_digest_used = self
545 .epoch_store
546 .upgrade()
547 .and_then(|store| {
548 store
549 .get_lowest_non_genesis_checkpoint_summary()
550 .ok()
551 .flatten()
552 })
553 .map(|summary| summary.content_digest.hash(&mut hasher))
554 .is_some();
555
556 if !checkpoint_digest_used {
557 PER_TEST_SEED.with(|seed| seed.hash(&mut hasher));
558 }
559
560 key.hash(&mut hasher);
561 let mut rng = rngs::StdRng::seed_from_u64(hasher.finish());
562 rng.gen_range(Duration::from_millis(100)..Duration::from_millis(600))
563 }
564
565 fn share_observations(&mut self, to_share: Vec<(ExecutionTimeObservationKey, Duration)>) {
566 if to_share.is_empty() {
567 return;
568 }
569 let Some(epoch_store) = self.epoch_store.upgrade() else {
570 debug!("epoch is ending, dropping execution time observation");
571 return;
572 };
573
574 let num_observations = to_share.len() as u64;
575
576 if let Err(e) = self.sharing_rate_limiter.check() {
578 epoch_store
579 .metrics
580 .epoch_execution_time_observations_dropped
581 .with_label_values(&["global_rate_limit"])
582 .inc_by(num_observations);
583 debug!("rate limit exceeded, dropping execution time observation; {e:?}");
584 return;
585 }
586
587 let epoch_store = epoch_store.clone();
588 let transaction = ConsensusTransaction::new_execution_time_observation(
589 ExecutionTimeObservation::new(epoch_store.name, self.next_generation_number, to_share),
590 );
591 self.next_generation_number += 1;
592
593 if let Err(e) = self.consensus_adapter.submit_best_effort(
594 &transaction,
595 &epoch_store,
596 Duration::from_secs(5),
597 ) {
598 if !matches!(e.as_inner(), SuiErrorKind::EpochEnded(_)) {
599 epoch_store
600 .metrics
601 .epoch_execution_time_observations_dropped
602 .with_label_values(&["submit_to_consensus"])
603 .inc_by(num_observations);
604 warn!("failed to submit execution time observation: {e:?}");
605 }
606 } else {
607 assert_reachable!("successfully shares execution time observations");
611 epoch_store
612 .metrics
613 .epoch_execution_time_observations_shared
614 .inc_by(num_observations);
615 }
616 }
617
618 fn update_indebted_objects(&mut self, mut object_debts: Vec<ObjectID>) {
619 let _scope = monitored_scope("ExecutionTimeObserver::update_indebted_objects");
620
621 let Some(epoch_store) = self.epoch_store.upgrade() else {
622 debug!("epoch is ending, dropping indebted object update");
623 return;
624 };
625
626 object_debts.sort_unstable();
627 object_debts.dedup();
628 self.indebted_objects = object_debts;
629 epoch_store
630 .metrics
631 .epoch_execution_time_observer_indebted_objects
632 .set(self.indebted_objects.len() as i64);
633 }
634}
635
636pub const EXTRA_FIELD_EXECUTION_TIME_ESTIMATES_KEY: u64 = 0;
639
640pub const EXTRA_FIELD_EXECUTION_TIME_ESTIMATES_CHUNK_COUNT_KEY: u64 = 1;
642
643pub struct ExecutionTimeEstimator {
646 committee: Arc<Committee>,
647 protocol_params: ExecutionTimeEstimateParams,
648
649 consensus_observations: HashMap<ExecutionTimeObservationKey, ConsensusObservations>,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize)]
653pub struct ConsensusObservations {
654 observations: Vec<(u64 , Option<Duration>)>, stake_weighted_median: Option<Duration>, }
657
658impl ConsensusObservations {
659 fn update_stake_weighted_median(
660 &mut self,
661 committee: &Committee,
662 config: &ExecutionTimeEstimateParams,
663 ) {
664 let mut stake_with_observations = 0;
665 let sorted_observations: Vec<_> = self
666 .observations
667 .iter()
668 .enumerate()
669 .filter_map(|(i, (_, duration))| {
670 duration.map(|duration| {
671 let authority_index: AuthorityIndex = i.try_into().unwrap();
672 stake_with_observations += committee.stake_by_index(authority_index).unwrap();
673 (duration, authority_index)
674 })
675 })
676 .sorted()
677 .collect();
678
679 if stake_with_observations < config.stake_weighted_median_threshold {
681 self.stake_weighted_median = None;
682 return;
683 }
684
685 let median_stake = stake_with_observations / 2;
687 let mut running_stake = 0;
688 for (duration, authority_index) in sorted_observations {
689 running_stake += committee.stake_by_index(authority_index).unwrap();
690 if running_stake > median_stake {
691 self.stake_weighted_median = Some(duration);
692 break;
693 }
694 }
695 }
696}
697
698impl ExecutionTimeEstimator {
699 pub fn new(
700 committee: Arc<Committee>,
701 protocol_params: ExecutionTimeEstimateParams,
702 initial_observations: impl Iterator<
703 Item = (
704 AuthorityIndex,
705 Option<u64>,
706 ExecutionTimeObservationKey,
707 Duration,
708 ),
709 >,
710 ) -> Self {
711 let mut estimator = Self {
712 committee,
713 protocol_params,
714 consensus_observations: HashMap::new(),
715 };
716 for (source, generation, key, duration) in initial_observations {
717 estimator.process_observation_from_consensus(
718 source,
719 generation,
720 key.to_owned(),
721 duration,
722 true,
723 );
724 }
725 for observation in estimator.consensus_observations.values_mut() {
726 observation
727 .update_stake_weighted_median(&estimator.committee, &estimator.protocol_params);
728 }
729 estimator
730 }
731
732 #[cfg(test)]
733 pub fn new_for_testing() -> Self {
734 let (committee, _) = Committee::new_simple_test_committee_of_size(1);
735 Self {
736 committee: Arc::new(committee),
737 protocol_params: ExecutionTimeEstimateParams {
738 target_utilization: 100,
739 max_estimate_us: u64::MAX,
740 ..ExecutionTimeEstimateParams::default()
741 },
742 consensus_observations: HashMap::new(),
743 }
744 }
745
746 pub fn process_observations_from_consensus(
747 &mut self,
748 source: AuthorityIndex,
749 generation: Option<u64>,
750 observations: &[(ExecutionTimeObservationKey, Duration)],
751 ) {
752 for (key, duration) in observations {
753 self.process_observation_from_consensus(
754 source,
755 generation,
756 key.to_owned(),
757 *duration,
758 false,
759 );
760 }
761 }
762
763 fn process_observation_from_consensus(
764 &mut self,
765 source: AuthorityIndex,
766 generation: Option<u64>,
767 observation_key: ExecutionTimeObservationKey,
768 duration: Duration,
769 skip_update: bool,
770 ) {
771 if matches!(observation_key, ExecutionTimeObservationKey::Publish) {
772 warn!(
774 "dropping Publish observation received from possibly-Byzanitine authority {source}"
775 );
776 return;
777 }
778
779 assert_reachable!("receives some valid execution time observations");
780
781 let observations = self
782 .consensus_observations
783 .entry(observation_key)
784 .or_insert_with(|| {
785 let len = self.committee.num_members();
786 let mut empty_observations = Vec::with_capacity(len);
787 empty_observations.resize(len, (0, None));
788 ConsensusObservations {
789 observations: empty_observations,
790 stake_weighted_median: if self
791 .protocol_params
792 .default_none_duration_for_new_keys
793 {
794 None
795 } else {
796 Some(Duration::ZERO)
797 },
798 }
799 });
800
801 let (obs_generation, obs_duration) =
802 &mut observations.observations[TryInto::<usize>::try_into(source).unwrap()];
803 if generation.is_some_and(|generation| *obs_generation >= generation) {
804 return;
806 }
807 *obs_generation = generation.unwrap_or(0);
808 *obs_duration = Some(duration);
809 if !skip_update {
810 observations.update_stake_weighted_median(&self.committee, &self.protocol_params);
811 }
812 }
813
814 pub fn get_estimate(&self, tx: &TransactionData) -> Duration {
815 let TransactionKind::ProgrammableTransaction(tx) = tx.kind() else {
816 debug_fatal!("get_estimate called on non-ProgrammableTransaction");
817 return Duration::ZERO;
818 };
819 tx.commands
820 .iter()
821 .map(|command| {
822 let key = ExecutionTimeObservationKey::from_command(command);
823 self.consensus_observations
824 .get(&key)
825 .and_then(|obs| obs.stake_weighted_median)
826 .unwrap_or_else(|| key.default_duration())
827 .mul_f64(command_length(command).get() as f64)
830 })
831 .sum::<Duration>()
832 .min(Duration::from_micros(self.protocol_params.max_estimate_us))
833 }
834
835 pub fn take_observations(&mut self) -> StoredExecutionTimeObservations {
836 StoredExecutionTimeObservations::V1(
837 self.consensus_observations
838 .drain()
839 .map(|(key, observations)| {
840 let observations = observations
841 .observations
842 .into_iter()
843 .enumerate()
844 .filter_map(|(idx, (_, duration))| {
845 duration.map(|d| {
846 (
847 self.committee
848 .authority_by_index(idx.try_into().unwrap())
849 .cloned()
850 .unwrap(),
851 d,
852 )
853 })
854 })
855 .collect();
856 (key, observations)
857 })
858 .collect(),
859 )
860 }
861
862 pub fn get_observations(&self) -> Vec<(ExecutionTimeObservationKey, ConsensusObservations)> {
863 self.consensus_observations
864 .iter()
865 .map(|(key, observations)| (key.clone(), observations.clone()))
866 .collect()
867 }
868}
869
870fn command_length(command: &Command) -> NonZeroUsize {
871 NonZeroUsize::new(match command {
874 Command::MoveCall(_) => 1,
875 Command::TransferObjects(src, _) => src.len() + 1,
876 Command::SplitCoins(_, amts) => amts.len() + 1,
877 Command::MergeCoins(_, src) => src.len() + 1,
878 Command::Publish(_, _) => 1,
879 Command::MakeMoveVec(_, src) => src.len() + 1,
880 Command::Upgrade(_, _, _, _) => 1,
881 })
882 .unwrap()
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888 use crate::authority::test_authority_builder::TestAuthorityBuilder;
889 use crate::checkpoints::CheckpointStore;
890 use crate::consensus_adapter::{
891 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
892 MockConsensusClient,
893 };
894 use sui_protocol_config::ProtocolConfig;
895 use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress};
896 use sui_types::transaction::{
897 Argument, CallArg, ObjectArg, ProgrammableMoveCall, SharedObjectMutability,
898 };
899 use {
900 rand::{Rng, SeedableRng},
901 sui_protocol_config::ProtocolVersion,
902 sui_types::supported_protocol_versions::Chain,
903 };
904
905 #[tokio::test]
906 async fn test_record_local_observations() {
907 telemetry_subscribers::init_for_testing();
908
909 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
910 config.set_per_object_congestion_control_mode_for_testing(
911 PerObjectCongestionControlMode::ExecutionTimeEstimate(
912 ExecutionTimeEstimateParams {
913 target_utilization: 100,
914 allowed_txn_cost_overage_burst_limit_us: 0,
915 randomness_scalar: 100,
916 max_estimate_us: u64::MAX,
917 stored_observations_num_included_checkpoints: 10,
918 stored_observations_limit: u64::MAX,
919 stake_weighted_median_threshold: 0,
920 default_none_duration_for_new_keys: true,
921 observations_chunk_size: None,
922 },
923 ),
924 );
925 config
926 });
927
928 let mock_consensus_client = MockConsensusClient::new();
929 let authority = TestAuthorityBuilder::new().build().await;
930 let epoch_store = authority.epoch_store_for_testing();
931 let consensus_adapter = Arc::new(ConsensusAdapter::new(
932 Arc::new(mock_consensus_client),
933 CheckpointStore::new_for_tests(),
934 authority.name,
935 Arc::new(ConnectionMonitorStatusForTests {}),
936 100_000,
937 100_000,
938 None,
939 None,
940 ConsensusAdapterMetrics::new_test(),
941 epoch_store.protocol_config().clone(),
942 ));
943 let mut observer = ExecutionTimeObserver::new_for_testing(
944 epoch_store.clone(),
945 Box::new(consensus_adapter.clone()),
946 Duration::ZERO, false, );
949
950 let package = ObjectID::random();
952 let module = "test_module".to_string();
953 let function = "test_function".to_string();
954 let ptb = ProgrammableTransaction {
955 inputs: vec![],
956 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
957 package,
958 module: module.clone(),
959 function: function.clone(),
960 type_arguments: vec![],
961 arguments: vec![],
962 }))],
963 };
964
965 let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
967 let total_duration = Duration::from_millis(110);
968 observer.record_local_observations(&ptb, &timings, total_duration, 1);
969
970 let key = ExecutionTimeObservationKey::MoveEntryPoint {
971 package,
972 module: module.clone(),
973 function: function.clone(),
974 type_arguments: vec![],
975 };
976
977 let local_obs = observer.local_observations.get(&key).unwrap();
979 assert_eq!(
980 local_obs.get_average(),
981 Duration::from_millis(110)
983 );
984 assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
985
986 let timings = vec![ExecutionTiming::Success(Duration::from_millis(110))];
988 let total_duration = Duration::from_millis(120);
989 observer.record_local_observations(&ptb, &timings, total_duration, 1);
990
991 let local_obs = observer.local_observations.get(&key).unwrap();
993 assert_eq!(
994 local_obs.get_average(),
995 Duration::from_millis(115)
997 );
998 assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1000
1001 let timings = vec![ExecutionTiming::Success(Duration::from_millis(120))];
1003 let total_duration = Duration::from_millis(130);
1004 observer.record_local_observations(&ptb, &timings, total_duration, 1);
1005
1006 let local_obs = observer.local_observations.get(&key).unwrap();
1008 assert_eq!(
1009 local_obs.get_average(),
1010 Duration::from_millis(120)
1012 );
1013 assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1016
1017 observer
1019 .local_observations
1020 .get_mut(&key)
1021 .unwrap()
1022 .last_shared = Some((
1023 Duration::from_millis(110),
1024 Instant::now() - Duration::from_secs(60),
1025 ));
1026
1027 let timings = vec![ExecutionTiming::Success(Duration::from_millis(120))];
1029 let total_duration = Duration::from_millis(160);
1030 observer.record_local_observations(&ptb, &timings, total_duration, 1);
1031
1032 let local_obs = observer.local_observations.get(&key).unwrap();
1035 assert_eq!(
1036 local_obs.get_average(),
1037 Duration::from_millis(130)
1039 );
1040 assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(130));
1041 }
1042
1043 #[tokio::test]
1044 async fn test_record_local_observations_with_gas_price_weighting() {
1045 telemetry_subscribers::init_for_testing();
1046
1047 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1048 config.set_per_object_congestion_control_mode_for_testing(
1049 PerObjectCongestionControlMode::ExecutionTimeEstimate(
1050 ExecutionTimeEstimateParams {
1051 target_utilization: 100,
1052 allowed_txn_cost_overage_burst_limit_us: 0,
1053 randomness_scalar: 100,
1054 max_estimate_us: u64::MAX,
1055 stored_observations_num_included_checkpoints: 10,
1056 stored_observations_limit: u64::MAX,
1057 stake_weighted_median_threshold: 0,
1058 default_none_duration_for_new_keys: true,
1059 observations_chunk_size: None,
1060 },
1061 ),
1062 );
1063 config
1064 });
1065
1066 let mock_consensus_client = MockConsensusClient::new();
1067 let authority = TestAuthorityBuilder::new().build().await;
1068 let epoch_store = authority.epoch_store_for_testing();
1069 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1070 Arc::new(mock_consensus_client),
1071 CheckpointStore::new_for_tests(),
1072 authority.name,
1073 Arc::new(ConnectionMonitorStatusForTests {}),
1074 100_000,
1075 100_000,
1076 None,
1077 None,
1078 ConsensusAdapterMetrics::new_test(),
1079 epoch_store.protocol_config().clone(),
1080 ));
1081 let mut observer = ExecutionTimeObserver::new_for_testing(
1082 epoch_store.clone(),
1083 Box::new(consensus_adapter.clone()),
1084 Duration::ZERO, true, );
1087
1088 let package = ObjectID::random();
1090 let module = "test_module".to_string();
1091 let function = "test_function".to_string();
1092 let ptb = ProgrammableTransaction {
1093 inputs: vec![],
1094 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1095 package,
1096 module: module.clone(),
1097 function: function.clone(),
1098 type_arguments: vec![],
1099 arguments: vec![],
1100 }))],
1101 };
1102
1103 let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
1105 let total_duration = Duration::from_millis(110);
1106 observer.record_local_observations(&ptb, &timings, total_duration, 1);
1107
1108 let key = ExecutionTimeObservationKey::MoveEntryPoint {
1109 package,
1110 module: module.clone(),
1111 function: function.clone(),
1112 type_arguments: vec![],
1113 };
1114
1115 let local_obs = observer.local_observations.get(&key).unwrap();
1117 assert_eq!(
1118 local_obs.get_average(),
1119 Duration::from_millis(110)
1121 );
1122 assert_eq!(local_obs.last_shared.unwrap().0, Duration::from_millis(110));
1123
1124 let timings = vec![ExecutionTiming::Success(Duration::from_millis(110))];
1126 let total_duration = Duration::from_millis(120);
1127 observer.record_local_observations(&ptb, &timings, total_duration, 2);
1128
1129 let local_obs = observer.local_observations.get(&key).unwrap();
1131 assert_eq!(
1132 local_obs.get_average(),
1133 Duration::from_micros(116_666)
1136 );
1137 }
1138
1139 #[tokio::test]
1140 async fn test_record_local_observations_with_multiple_commands() {
1141 telemetry_subscribers::init_for_testing();
1142
1143 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1144 config.set_per_object_congestion_control_mode_for_testing(
1145 PerObjectCongestionControlMode::ExecutionTimeEstimate(
1146 ExecutionTimeEstimateParams {
1147 target_utilization: 100,
1148 allowed_txn_cost_overage_burst_limit_us: 0,
1149 randomness_scalar: 0,
1150 max_estimate_us: u64::MAX,
1151 stored_observations_num_included_checkpoints: 10,
1152 stored_observations_limit: u64::MAX,
1153 stake_weighted_median_threshold: 0,
1154 default_none_duration_for_new_keys: true,
1155 observations_chunk_size: None,
1156 },
1157 ),
1158 );
1159 config
1160 });
1161
1162 let mock_consensus_client = MockConsensusClient::new();
1163 let authority = TestAuthorityBuilder::new().build().await;
1164 let epoch_store = authority.epoch_store_for_testing();
1165 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1166 Arc::new(mock_consensus_client),
1167 CheckpointStore::new_for_tests(),
1168 authority.name,
1169 Arc::new(ConnectionMonitorStatusForTests {}),
1170 100_000,
1171 100_000,
1172 None,
1173 None,
1174 ConsensusAdapterMetrics::new_test(),
1175 epoch_store.protocol_config().clone(),
1176 ));
1177 let mut observer = ExecutionTimeObserver::new_for_testing(
1178 epoch_store.clone(),
1179 Box::new(consensus_adapter.clone()),
1180 Duration::ZERO, false, );
1183
1184 let package = ObjectID::random();
1186 let module = "test_module".to_string();
1187 let function = "test_function".to_string();
1188 let ptb = ProgrammableTransaction {
1189 inputs: vec![],
1190 commands: vec![
1191 Command::MoveCall(Box::new(ProgrammableMoveCall {
1192 package,
1193 module: module.clone(),
1194 function: function.clone(),
1195 type_arguments: vec![],
1196 arguments: vec![],
1197 })),
1198 Command::TransferObjects(
1199 vec![Argument::Input(1), Argument::Input(2)],
1201 Argument::Input(0),
1202 ),
1203 ],
1204 };
1205 let timings = vec![
1206 ExecutionTiming::Success(Duration::from_millis(100)),
1207 ExecutionTiming::Success(Duration::from_millis(50)),
1208 ];
1209 let total_duration = Duration::from_millis(180);
1210 observer.record_local_observations(&ptb, &timings, total_duration, 1);
1211
1212 let move_key = ExecutionTimeObservationKey::MoveEntryPoint {
1214 package,
1215 module: module.clone(),
1216 function: function.clone(),
1217 type_arguments: vec![],
1218 };
1219 let move_obs = observer.local_observations.get(&move_key).unwrap();
1220 assert_eq!(
1221 move_obs.get_average(),
1222 Duration::from_millis(120)
1224 );
1225
1226 let transfer_obs = observer
1227 .local_observations
1228 .get(&ExecutionTimeObservationKey::TransferObjects)
1229 .unwrap();
1230 assert_eq!(
1231 transfer_obs.get_average(),
1232 Duration::from_millis(20)
1236 );
1237 }
1238
1239 #[tokio::test]
1240 async fn test_record_local_observations_with_object_utilization_threshold() {
1241 telemetry_subscribers::init_for_testing();
1242
1243 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1244 config.set_per_object_congestion_control_mode_for_testing(
1245 PerObjectCongestionControlMode::ExecutionTimeEstimate(
1246 ExecutionTimeEstimateParams {
1247 target_utilization: 100,
1248 allowed_txn_cost_overage_burst_limit_us: 0,
1249 randomness_scalar: 0,
1250 max_estimate_us: u64::MAX,
1251 stored_observations_num_included_checkpoints: 10,
1252 stored_observations_limit: u64::MAX,
1253 stake_weighted_median_threshold: 0,
1254 default_none_duration_for_new_keys: true,
1255 observations_chunk_size: None,
1256 },
1257 ),
1258 );
1259 config
1260 });
1261
1262 let mock_consensus_client = MockConsensusClient::new();
1263 let authority = TestAuthorityBuilder::new().build().await;
1264 let epoch_store = authority.epoch_store_for_testing();
1265 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1266 Arc::new(mock_consensus_client),
1267 CheckpointStore::new_for_tests(),
1268 authority.name,
1269 Arc::new(ConnectionMonitorStatusForTests {}),
1270 100_000,
1271 100_000,
1272 None,
1273 None,
1274 ConsensusAdapterMetrics::new_test(),
1275 epoch_store.protocol_config().clone(),
1276 ));
1277 let mut observer = ExecutionTimeObserver::new_for_testing(
1278 epoch_store.clone(),
1279 Box::new(consensus_adapter.clone()),
1280 Duration::from_millis(500), false, );
1283
1284 let package = ObjectID::random();
1286 let module = "test_module".to_string();
1287 let function = "test_function".to_string();
1288 let shared_object_id = ObjectID::random();
1289 let ptb = ProgrammableTransaction {
1290 inputs: vec![CallArg::Object(ObjectArg::SharedObject {
1291 id: shared_object_id,
1292 initial_shared_version: SequenceNumber::new(),
1293 mutability: SharedObjectMutability::Mutable,
1294 })],
1295 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1296 package,
1297 module: module.clone(),
1298 function: function.clone(),
1299 type_arguments: vec![],
1300 arguments: vec![],
1301 }))],
1302 };
1303 let key = ExecutionTimeObservationKey::MoveEntryPoint {
1304 package,
1305 module: module.clone(),
1306 function: function.clone(),
1307 type_arguments: vec![],
1308 };
1309
1310 tokio::time::pause();
1311
1312 let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1314 observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1315 assert!(
1316 observer
1317 .local_observations
1318 .get(&key)
1319 .unwrap()
1320 .last_shared
1321 .is_none()
1322 );
1323
1324 let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1326 observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1327 assert_eq!(
1328 observer
1329 .local_observations
1330 .get(&key)
1331 .unwrap()
1332 .last_shared
1333 .unwrap()
1334 .0,
1335 Duration::from_secs(2)
1336 );
1337
1338 tokio::time::advance(Duration::from_secs(5)).await;
1340 let timings = vec![ExecutionTiming::Success(Duration::from_secs(3))];
1341 observer.record_local_observations(&ptb, &timings, Duration::from_secs(5), 1);
1342 assert_eq!(
1343 observer
1344 .local_observations
1345 .get(&key)
1346 .unwrap()
1347 .last_shared
1348 .unwrap()
1349 .0,
1350 Duration::from_secs(3)
1351 );
1352
1353 tokio::time::advance(Duration::from_millis(150)).await;
1356 let timings = vec![ExecutionTiming::Success(Duration::from_millis(100))];
1357 observer.record_local_observations(&ptb, &timings, Duration::from_millis(500), 1);
1358 assert_eq!(
1359 observer
1360 .local_observations
1361 .get(&key)
1362 .unwrap()
1363 .last_shared
1364 .unwrap()
1365 .0,
1366 Duration::from_secs(3) );
1368
1369 tokio::time::advance(Duration::from_secs(60)).await;
1371 let timings = vec![ExecutionTiming::Success(Duration::from_secs(11))];
1372 observer.record_local_observations(&ptb, &timings, Duration::from_secs(11), 1);
1373 assert_eq!(
1374 observer
1375 .local_observations
1376 .get(&key)
1377 .unwrap()
1378 .last_shared
1379 .unwrap()
1380 .0,
1381 Duration::from_secs(3) );
1383 }
1384
1385 #[tokio::test]
1386 async fn test_record_local_observations_with_indebted_objects() {
1387 telemetry_subscribers::init_for_testing();
1388
1389 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1390 config.set_per_object_congestion_control_mode_for_testing(
1391 PerObjectCongestionControlMode::ExecutionTimeEstimate(
1392 ExecutionTimeEstimateParams {
1393 target_utilization: 100,
1394 allowed_txn_cost_overage_burst_limit_us: 0,
1395 randomness_scalar: 0,
1396 max_estimate_us: u64::MAX,
1397 stored_observations_num_included_checkpoints: 10,
1398 stored_observations_limit: u64::MAX,
1399 stake_weighted_median_threshold: 0,
1400 default_none_duration_for_new_keys: true,
1401 observations_chunk_size: None,
1402 },
1403 ),
1404 );
1405 config
1406 });
1407
1408 let mock_consensus_client = MockConsensusClient::new();
1409 let authority = TestAuthorityBuilder::new().build().await;
1410 let epoch_store = authority.epoch_store_for_testing();
1411 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1412 Arc::new(mock_consensus_client),
1413 CheckpointStore::new_for_tests(),
1414 authority.name,
1415 Arc::new(ConnectionMonitorStatusForTests {}),
1416 100_000,
1417 100_000,
1418 None,
1419 None,
1420 ConsensusAdapterMetrics::new_test(),
1421 epoch_store.protocol_config().clone(),
1422 ));
1423 let mut observer = ExecutionTimeObserver::new_for_testing(
1424 epoch_store.clone(),
1425 Box::new(consensus_adapter.clone()),
1426 Duration::from_millis(500), false, );
1429
1430 let package = ObjectID::random();
1432 let module = "test_module".to_string();
1433 let function = "test_function".to_string();
1434 let shared_object_id = ObjectID::random();
1435 let ptb = ProgrammableTransaction {
1436 inputs: vec![CallArg::Object(ObjectArg::SharedObject {
1437 id: shared_object_id,
1438 initial_shared_version: SequenceNumber::new(),
1439 mutability: SharedObjectMutability::Mutable,
1440 })],
1441 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1442 package,
1443 module: module.clone(),
1444 function: function.clone(),
1445 type_arguments: vec![],
1446 arguments: vec![],
1447 }))],
1448 };
1449 let key = ExecutionTimeObservationKey::MoveEntryPoint {
1450 package,
1451 module: module.clone(),
1452 function: function.clone(),
1453 type_arguments: vec![],
1454 };
1455
1456 tokio::time::pause();
1457
1458 let timings = vec![ExecutionTiming::Success(Duration::from_secs(1))];
1460 observer.record_local_observations(&ptb, &timings, Duration::from_secs(1), 1);
1461 assert!(
1462 observer
1463 .local_observations
1464 .get(&key)
1465 .unwrap()
1466 .last_shared
1467 .is_none()
1468 );
1469
1470 let timings = vec![ExecutionTiming::Success(Duration::from_secs(2))];
1472 observer.record_local_observations(&ptb, &timings, Duration::from_secs(2), 1);
1473 assert_eq!(
1474 observer
1475 .local_observations
1476 .get(&key)
1477 .unwrap()
1478 .last_shared
1479 .unwrap()
1480 .0,
1481 Duration::from_millis(1500) );
1483
1484 observer.update_indebted_objects(vec![shared_object_id]);
1486 observer
1487 .config
1488 .observation_sharing_object_utilization_threshold = Some(Duration::from_secs(1000));
1489
1490 tokio::time::advance(Duration::from_secs(60)).await;
1493 let timings = vec![ExecutionTiming::Success(Duration::from_millis(300))];
1494 observer.record_local_observations(&ptb, &timings, Duration::from_millis(300), 1);
1495
1496 assert_eq!(
1499 observer
1500 .local_observations
1501 .get(&key)
1502 .unwrap()
1503 .last_shared
1504 .unwrap()
1505 .0,
1506 Duration::from_millis(1100)
1507 );
1508 }
1509
1510 #[tokio::test]
1511 async fn test_stake_weighted_median() {
1513 telemetry_subscribers::init_for_testing();
1514
1515 let (committee, _) =
1516 Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1517
1518 let params = ExecutionTimeEstimateParams {
1519 stake_weighted_median_threshold: 0,
1520 ..Default::default()
1521 };
1522
1523 let mut tracker = ConsensusObservations {
1524 observations: vec![
1525 (0, Some(Duration::from_secs(1))), (0, Some(Duration::from_secs(2))), (0, Some(Duration::from_secs(3))), (0, Some(Duration::from_secs(4))), ],
1530 stake_weighted_median: None,
1531 };
1532 tracker.update_stake_weighted_median(&committee, ¶ms);
1533 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1540
1541 let mut tracker = ConsensusObservations {
1543 observations: vec![
1544 (0, Some(Duration::from_secs(3))), (0, Some(Duration::from_secs(4))), (0, Some(Duration::from_secs(1))), (0, Some(Duration::from_secs(2))), ],
1549 stake_weighted_median: None,
1550 };
1551 tracker.update_stake_weighted_median(&committee, ¶ms);
1552 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(2)));
1559
1560 let mut tracker = ConsensusObservations {
1562 observations: vec![
1563 (0, Some(Duration::from_secs(1))), (0, None), (0, Some(Duration::from_secs(3))), (0, Some(Duration::from_secs(4))), ],
1568 stake_weighted_median: None,
1569 };
1570 tracker.update_stake_weighted_median(&committee, ¶ms);
1571 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(4)));
1577
1578 let mut tracker = ConsensusObservations {
1580 observations: vec![
1581 (0, Some(Duration::from_secs(1))), (0, Some(Duration::from_secs(2))), (0, None), (0, None), ],
1586 stake_weighted_median: None,
1587 };
1588 tracker.update_stake_weighted_median(&committee, ¶ms);
1589 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(2)));
1594
1595 let mut tracker = ConsensusObservations {
1597 observations: vec![
1598 (0, None), (0, None), (0, Some(Duration::from_secs(3))), (0, None), ],
1603 stake_weighted_median: None,
1604 };
1605 tracker.update_stake_weighted_median(&committee, ¶ms);
1606 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1608
1609 let mut tracker = ConsensusObservations {
1611 observations: vec![
1612 (0, Some(Duration::from_secs(5))), (0, Some(Duration::from_secs(5))), (0, Some(Duration::from_secs(5))), (0, Some(Duration::from_secs(5))), ],
1617 stake_weighted_median: None,
1618 };
1619 tracker.update_stake_weighted_median(&committee, ¶ms);
1620 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(5)));
1621 }
1622
1623 #[tokio::test]
1624 async fn test_stake_weighted_median_threshold() {
1625 telemetry_subscribers::init_for_testing();
1626
1627 let (committee, _) =
1628 Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1629
1630 let params = ExecutionTimeEstimateParams {
1632 stake_weighted_median_threshold: 5000,
1633 ..Default::default()
1634 };
1635
1636 let mut tracker = ConsensusObservations {
1638 observations: vec![
1639 (0, Some(Duration::from_secs(1))), (0, Some(Duration::from_secs(2))), (0, None), (0, None), ],
1644 stake_weighted_median: None,
1645 };
1646 tracker.update_stake_weighted_median(&committee, ¶ms);
1647 assert_eq!(tracker.stake_weighted_median, None);
1649
1650 let mut tracker = ConsensusObservations {
1652 observations: vec![
1653 (0, Some(Duration::from_secs(1))), (0, Some(Duration::from_secs(2))), (0, Some(Duration::from_secs(3))), (0, None), ],
1658 stake_weighted_median: None,
1659 };
1660 tracker.update_stake_weighted_median(&committee, ¶ms);
1661 assert_eq!(tracker.stake_weighted_median, Some(Duration::from_secs(3)));
1663 }
1664
1665 #[tokio::test]
1666 async fn test_execution_time_estimator() {
1667 telemetry_subscribers::init_for_testing();
1668
1669 let (committee, _) =
1670 Committee::new_simple_test_committee_with_normalized_voting_power(vec![10, 20, 30, 40]);
1671 let mut estimator = ExecutionTimeEstimator::new(
1672 Arc::new(committee),
1673 ExecutionTimeEstimateParams {
1674 target_utilization: 50,
1675 max_estimate_us: 1_500_000,
1676
1677 allowed_txn_cost_overage_burst_limit_us: 0,
1679 randomness_scalar: 0,
1680 stored_observations_num_included_checkpoints: 10,
1681 stored_observations_limit: u64::MAX,
1682 stake_weighted_median_threshold: 0,
1683 default_none_duration_for_new_keys: true,
1684 observations_chunk_size: None,
1685 },
1686 std::iter::empty(),
1687 );
1688 let package = ObjectID::random();
1690 let module = "test_module".to_string();
1691 let function = "test_function".to_string();
1692 let move_key = ExecutionTimeObservationKey::MoveEntryPoint {
1693 package,
1694 module: module.clone(),
1695 function: function.clone(),
1696 type_arguments: vec![],
1697 };
1698 let transfer_key = ExecutionTimeObservationKey::TransferObjects;
1699
1700 estimator.process_observation_from_consensus(
1703 0,
1704 Some(1),
1705 move_key.clone(),
1706 Duration::from_millis(1000),
1707 false,
1708 );
1709 estimator.process_observation_from_consensus(
1710 1,
1711 Some(1),
1712 move_key.clone(),
1713 Duration::from_millis(1000),
1714 false,
1715 );
1716 estimator.process_observation_from_consensus(
1717 2,
1718 Some(1),
1719 move_key.clone(),
1720 Duration::from_millis(1000),
1721 false,
1722 );
1723
1724 estimator.process_observation_from_consensus(
1725 0,
1726 Some(1),
1727 transfer_key.clone(),
1728 Duration::from_millis(500),
1729 false,
1730 );
1731 estimator.process_observation_from_consensus(
1732 1,
1733 Some(1),
1734 transfer_key.clone(),
1735 Duration::from_millis(500),
1736 false,
1737 );
1738 estimator.process_observation_from_consensus(
1739 2,
1740 Some(1),
1741 transfer_key.clone(),
1742 Duration::from_millis(500),
1743 false,
1744 );
1745
1746 estimator.process_observation_from_consensus(
1748 0,
1749 Some(2),
1750 move_key.clone(),
1751 Duration::from_millis(100),
1752 false,
1753 );
1754 estimator.process_observation_from_consensus(
1755 1,
1756 Some(2),
1757 move_key.clone(),
1758 Duration::from_millis(200),
1759 false,
1760 );
1761 estimator.process_observation_from_consensus(
1762 2,
1763 Some(2),
1764 move_key.clone(),
1765 Duration::from_millis(300),
1766 false,
1767 );
1768
1769 estimator.process_observation_from_consensus(
1770 0,
1771 Some(2),
1772 transfer_key.clone(),
1773 Duration::from_millis(50),
1774 false,
1775 );
1776 estimator.process_observation_from_consensus(
1777 1,
1778 Some(2),
1779 transfer_key.clone(),
1780 Duration::from_millis(60),
1781 false,
1782 );
1783 estimator.process_observation_from_consensus(
1784 2,
1785 Some(2),
1786 transfer_key.clone(),
1787 Duration::from_millis(70),
1788 false,
1789 );
1790
1791 estimator.process_observation_from_consensus(
1793 0,
1794 Some(1),
1795 move_key.clone(),
1796 Duration::from_millis(1000),
1797 false,
1798 );
1799 estimator.process_observation_from_consensus(
1800 1,
1801 Some(1),
1802 transfer_key.clone(),
1803 Duration::from_millis(500),
1804 false,
1805 );
1806 estimator.process_observation_from_consensus(
1807 2,
1808 Some(1),
1809 move_key.clone(),
1810 Duration::from_millis(1000),
1811 false,
1812 );
1813
1814 let single_move_tx = TransactionData::new_programmable(
1816 SuiAddress::ZERO,
1817 vec![],
1818 ProgrammableTransaction {
1819 inputs: vec![],
1820 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1821 package,
1822 module: module.clone(),
1823 function: function.clone(),
1824 type_arguments: vec![],
1825 arguments: vec![],
1826 }))],
1827 },
1828 100,
1829 100,
1830 );
1831
1832 assert_eq!(
1834 estimator.get_estimate(&single_move_tx),
1835 Duration::from_millis(300)
1836 );
1837
1838 let multi_command_tx = TransactionData::new_programmable(
1840 SuiAddress::ZERO,
1841 vec![],
1842 ProgrammableTransaction {
1843 inputs: vec![],
1844 commands: vec![
1845 Command::MoveCall(Box::new(ProgrammableMoveCall {
1846 package,
1847 module: module.clone(),
1848 function: function.clone(),
1849 type_arguments: vec![],
1850 arguments: vec![],
1851 })),
1852 Command::TransferObjects(
1853 vec![Argument::Input(1), Argument::Input(2)],
1854 Argument::Input(0),
1855 ),
1856 ],
1857 },
1858 100,
1859 100,
1860 );
1861
1862 assert_eq!(
1865 estimator.get_estimate(&multi_command_tx),
1866 Duration::from_millis(510)
1867 );
1868 }
1869
1870 #[derive(Debug, Clone, Serialize, Deserialize)]
1871 struct ExecutionTimeObserverSnapshot {
1872 protocol_version: u64,
1873 consensus_observations: Vec<(ExecutionTimeObservationKey, ConsensusObservations)>,
1874 transaction_estimates: Vec<(String, Duration)>, }
1876
1877 fn generate_test_inputs(
1878 seed: u64,
1879 num_validators: usize,
1880 generation_override: Option<u64>,
1881 ) -> Vec<(
1882 AuthorityIndex,
1883 Option<u64>,
1884 ExecutionTimeObservationKey,
1885 Duration,
1886 )> {
1887 let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
1888
1889 let observation_keys = vec![
1890 ExecutionTimeObservationKey::MoveEntryPoint {
1891 package: ObjectID::from_hex_literal("0x1").unwrap(),
1892 module: "coin".to_string(),
1893 function: "transfer".to_string(),
1894 type_arguments: vec![],
1895 },
1896 ExecutionTimeObservationKey::MoveEntryPoint {
1897 package: ObjectID::from_hex_literal("0x2").unwrap(),
1898 module: "nft".to_string(),
1899 function: "mint".to_string(),
1900 type_arguments: vec![],
1901 },
1902 ExecutionTimeObservationKey::TransferObjects,
1903 ExecutionTimeObservationKey::SplitCoins,
1904 ExecutionTimeObservationKey::MergeCoins,
1905 ExecutionTimeObservationKey::MakeMoveVec,
1906 ExecutionTimeObservationKey::Upgrade,
1907 ];
1908
1909 let mut inputs = Vec::new();
1910 let target_samples = 25;
1911
1912 for _ in 0..target_samples {
1913 let key = observation_keys[rng.gen_range(0..observation_keys.len())].clone();
1914 let authority_index =
1915 AuthorityIndex::try_from(rng.gen_range(0..num_validators)).unwrap();
1916
1917 let generation = generation_override.unwrap_or_else(|| rng.gen_range(1..=10));
1919
1920 let base_duration = if rng.gen_ratio(1, 20) {
1923 0
1925 } else {
1926 match &key {
1927 ExecutionTimeObservationKey::MoveEntryPoint { .. } => rng.gen_range(50..=500),
1928 ExecutionTimeObservationKey::TransferObjects => rng.gen_range(10..=100),
1929 ExecutionTimeObservationKey::SplitCoins => rng.gen_range(20..=80),
1930 ExecutionTimeObservationKey::MergeCoins => rng.gen_range(15..=70),
1931 ExecutionTimeObservationKey::MakeMoveVec => rng.gen_range(5..=30),
1932 ExecutionTimeObservationKey::Upgrade => rng.gen_range(100..=1000),
1933 ExecutionTimeObservationKey::Publish => rng.gen_range(200..=2000),
1934 }
1935 };
1936
1937 let duration = Duration::from_millis(base_duration);
1938
1939 inputs.push((authority_index, Some(generation), key, duration));
1940 }
1941
1942 inputs
1943 }
1944
1945 fn generate_test_transactions(seed: u64) -> Vec<(String, TransactionData)> {
1946 let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
1947 let mut transactions = Vec::new();
1948
1949 let package3 = ObjectID::from_hex_literal("0x3").unwrap();
1950 transactions.push((
1951 "coin_transfer_call".to_string(),
1952 TransactionData::new_programmable(
1953 SuiAddress::ZERO,
1954 vec![],
1955 ProgrammableTransaction {
1956 inputs: vec![],
1957 commands: vec![Command::MoveCall(Box::new(ProgrammableMoveCall {
1958 package: ObjectID::from_hex_literal("0x1").unwrap(),
1959 module: "coin".to_string(),
1960 function: "transfer".to_string(),
1961 type_arguments: vec![],
1962 arguments: vec![],
1963 }))],
1964 },
1965 rng.gen_range(100..1000),
1966 rng.gen_range(100..1000),
1967 ),
1968 ));
1969
1970 transactions.push((
1971 "mixed_move_calls".to_string(),
1972 TransactionData::new_programmable(
1973 SuiAddress::ZERO,
1974 vec![],
1975 ProgrammableTransaction {
1976 inputs: vec![],
1977 commands: vec![
1978 Command::MoveCall(Box::new(ProgrammableMoveCall {
1979 package: ObjectID::from_hex_literal("0x1").unwrap(),
1980 module: "coin".to_string(),
1981 function: "transfer".to_string(),
1982 type_arguments: vec![],
1983 arguments: vec![],
1984 })),
1985 Command::MoveCall(Box::new(ProgrammableMoveCall {
1986 package: ObjectID::from_hex_literal("0x2").unwrap(),
1987 module: "nft".to_string(),
1988 function: "mint".to_string(),
1989 type_arguments: vec![],
1990 arguments: vec![],
1991 })),
1992 ],
1993 },
1994 rng.gen_range(100..1000),
1995 rng.gen_range(100..1000),
1996 ),
1997 ));
1998
1999 transactions.push((
2000 "native_commands_with_observations".to_string(),
2001 TransactionData::new_programmable(
2002 SuiAddress::ZERO,
2003 vec![],
2004 ProgrammableTransaction {
2005 inputs: vec![],
2006 commands: vec![
2007 Command::TransferObjects(vec![Argument::Input(0)], Argument::Input(1)),
2008 Command::SplitCoins(Argument::Input(2), vec![Argument::Input(3)]),
2009 Command::MergeCoins(Argument::Input(4), vec![Argument::Input(5)]),
2010 Command::MakeMoveVec(None, vec![Argument::Input(6)]),
2011 ],
2012 },
2013 rng.gen_range(100..1000),
2014 rng.gen_range(100..1000),
2015 ),
2016 ));
2017
2018 let num_objects = rng.gen_range(1..=5);
2019 transactions.push((
2020 format!("transfer_objects_{}_items", num_objects),
2021 TransactionData::new_programmable(
2022 SuiAddress::ZERO,
2023 vec![],
2024 ProgrammableTransaction {
2025 inputs: vec![],
2026 commands: vec![Command::TransferObjects(
2027 (0..num_objects).map(Argument::Input).collect(),
2028 Argument::Input(num_objects),
2029 )],
2030 },
2031 rng.gen_range(100..1000),
2032 rng.gen_range(100..1000),
2033 ),
2034 ));
2035
2036 let num_amounts = rng.gen_range(1..=4);
2037 transactions.push((
2038 format!("split_coins_{}_amounts", num_amounts),
2039 TransactionData::new_programmable(
2040 SuiAddress::ZERO,
2041 vec![],
2042 ProgrammableTransaction {
2043 inputs: vec![],
2044 commands: vec![Command::SplitCoins(
2045 Argument::Input(0),
2046 (1..=num_amounts).map(Argument::Input).collect(),
2047 )],
2048 },
2049 rng.gen_range(100..1000),
2050 rng.gen_range(100..1000),
2051 ),
2052 ));
2053
2054 let num_sources = rng.gen_range(1..=3);
2055 transactions.push((
2056 format!("merge_coins_{}_sources", num_sources),
2057 TransactionData::new_programmable(
2058 SuiAddress::ZERO,
2059 vec![],
2060 ProgrammableTransaction {
2061 inputs: vec![],
2062 commands: vec![Command::MergeCoins(
2063 Argument::Input(0),
2064 (1..=num_sources).map(Argument::Input).collect(),
2065 )],
2066 },
2067 rng.gen_range(100..1000),
2068 rng.gen_range(100..1000),
2069 ),
2070 ));
2071
2072 let num_elements = rng.gen_range(0..=6);
2073 transactions.push((
2074 format!("make_move_vec_{}_elements", num_elements),
2075 TransactionData::new_programmable(
2076 SuiAddress::ZERO,
2077 vec![],
2078 ProgrammableTransaction {
2079 inputs: vec![],
2080 commands: vec![Command::MakeMoveVec(
2081 None,
2082 (0..num_elements).map(Argument::Input).collect(),
2083 )],
2084 },
2085 rng.gen_range(100..1000),
2086 rng.gen_range(100..1000),
2087 ),
2088 ));
2089
2090 transactions.push((
2091 "mixed_commands".to_string(),
2092 TransactionData::new_programmable(
2093 SuiAddress::ZERO,
2094 vec![],
2095 ProgrammableTransaction {
2096 inputs: vec![],
2097 commands: vec![
2098 Command::MoveCall(Box::new(ProgrammableMoveCall {
2099 package: package3,
2100 module: "game".to_string(),
2101 function: "play".to_string(),
2102 type_arguments: vec![],
2103 arguments: vec![],
2104 })),
2105 Command::TransferObjects(
2106 vec![Argument::Input(1), Argument::Input(2)],
2107 Argument::Input(0),
2108 ),
2109 Command::SplitCoins(Argument::Input(3), vec![Argument::Input(4)]),
2110 ],
2111 },
2112 rng.gen_range(100..1000),
2113 rng.gen_range(100..1000),
2114 ),
2115 ));
2116
2117 transactions.push((
2118 "upgrade_package".to_string(),
2119 TransactionData::new_programmable(
2120 SuiAddress::ZERO,
2121 vec![],
2122 ProgrammableTransaction {
2123 inputs: vec![],
2124 commands: vec![Command::Upgrade(
2125 vec![],
2126 vec![],
2127 package3,
2128 Argument::Input(0),
2129 )],
2130 },
2131 rng.gen_range(100..1000),
2132 rng.gen_range(100..1000),
2133 ),
2134 ));
2135
2136 transactions
2137 }
2138
2139 #[test]
2151 fn snapshot_tests() {
2152 println!("\n============================================================================");
2153 println!("! !");
2154 println!("! IMPORTANT: never update snapshots from this test. only add new versions! !");
2155 println!("! !");
2156 println!("============================================================================\n");
2157
2158 let max_version = ProtocolVersion::MAX.as_u64();
2159
2160 let test_versions: Vec<u64> = (max_version.saturating_sub(9)..=max_version).collect();
2161
2162 for version in test_versions {
2163 let protocol_version = ProtocolVersion::new(version);
2164 let protocol_config = ProtocolConfig::get_for_version(protocol_version, Chain::Unknown);
2165 let (committee, _) = Committee::new_simple_test_committee_of_size(4);
2166 let committee = Arc::new(committee);
2167
2168 let initial_generation =
2169 if let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) =
2170 protocol_config.per_object_congestion_control_mode()
2171 {
2172 if params.default_none_duration_for_new_keys {
2173 None
2174 } else {
2175 Some(0)
2176 }
2177 } else {
2178 Some(0) };
2180
2181 let initial_observations =
2182 generate_test_inputs(0, committee.num_members(), initial_generation);
2183 let mut estimator = ExecutionTimeEstimator::new(
2184 committee.clone(),
2185 ExecutionTimeEstimateParams {
2186 max_estimate_us: u64::MAX, ..ExecutionTimeEstimateParams::default()
2188 },
2189 initial_observations.into_iter(),
2190 );
2191
2192 let test_inputs = generate_test_inputs(version, committee.num_members(), None);
2193
2194 for (source, generation, observation_key, duration) in test_inputs {
2195 estimator.process_observation_from_consensus(
2196 source,
2197 generation,
2198 observation_key,
2199 duration,
2200 false,
2201 );
2202 }
2203
2204 let mut final_observations = estimator.get_observations();
2205 final_observations.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
2206
2207 let test_transactions = generate_test_transactions(version);
2208 let mut transaction_estimates = Vec::new();
2209 for (description, tx_data) in test_transactions {
2210 let estimate = estimator.get_estimate(&tx_data);
2211 transaction_estimates.push((description, estimate));
2212 }
2213
2214 let snapshot_data = ExecutionTimeObserverSnapshot {
2215 protocol_version: version,
2216 consensus_observations: final_observations.clone(),
2217 transaction_estimates,
2218 };
2219 insta::assert_yaml_snapshot!(
2220 format!("execution_time_observer_v{}", version),
2221 snapshot_data
2222 );
2223 }
2224 }
2225}