1use super::execution_time_estimator::ExecutionTimeEstimator;
5use crate::authority::transaction_deferral::DeferralKey;
6use crate::consensus_handler::{ConsensusCommitInfo, IndirectStateObserver};
7use mysten_common::fatal;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use sui_protocol_config::{
11 ExecutionTimeEstimateParams, PerObjectCongestionControlMode, ProtocolConfig,
12};
13use sui_types::base_types::{ObjectID, TransactionDigest};
14use sui_types::executable_transaction::VerifiedExecutableTransaction;
15use sui_types::messages_consensus::Round;
16use sui_types::transaction::SharedInputObject;
17use tracing::{debug, trace};
18
19#[derive(PartialEq, Eq, Clone, Debug)]
20struct Params {
21 params: ExecutionTimeEstimateParams,
22 for_randomness: bool,
23}
24
25impl Params {
26 pub fn commit_budget(&self, commit_info: &ConsensusCommitInfo) -> u64 {
29 let estimated_commit_period = commit_info.estimated_commit_period();
30 let commit_period_micros = estimated_commit_period.as_micros() as u64;
31 let mut budget = commit_period_micros.saturating_mul(self.params.target_utilization) / 100;
32 if self.for_randomness {
33 budget = budget.saturating_mul(self.params.randomness_scalar) / 100;
34 }
35 budget
36 }
37
38 pub fn max_burst(&self) -> u64 {
41 let mut burst = self.params.allowed_txn_cost_overage_burst_limit_us;
42 if self.for_randomness {
43 burst = burst.saturating_mul(self.params.randomness_scalar) / 100;
44 }
45 burst
46 }
47}
48
49#[derive(PartialEq, Eq, Clone, Debug)]
62pub struct SharedObjectCongestionTracker {
63 object_execution_cost: HashMap<ObjectID, u64>,
64 params: Params,
65 log_entries: Option<Vec<TransactionCostLogEntry>>,
66}
67
68impl SharedObjectCongestionTracker {
69 pub fn new(
70 initial_object_debts: impl IntoIterator<Item = (ObjectID, u64)>,
71 params: ExecutionTimeEstimateParams,
72 for_randomness: bool,
73 enable_logging: bool,
74 ) -> Self {
75 let object_execution_cost: HashMap<ObjectID, u64> =
76 initial_object_debts.into_iter().collect();
77 trace!(
78 "created SharedObjectCongestionTracker with
79 {} initial object debts,
80 params: {params:?},
81 for_randomness: {for_randomness},",
82 object_execution_cost.len(),
83 );
84 Self {
85 object_execution_cost,
86 params: Params {
87 params,
88 for_randomness,
89 },
90 log_entries: enable_logging.then(Vec::new),
91 }
92 }
93
94 pub fn from_protocol_config(
95 initial_object_debts: impl IntoIterator<Item = (ObjectID, u64)>,
96 protocol_config: &ProtocolConfig,
97 for_randomness: bool,
98 enable_logging: bool,
99 ) -> Self {
100 let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) =
101 protocol_config.per_object_congestion_control_mode()
102 else {
103 fatal!(
104 "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
105 );
106 };
107 Self::new(initial_object_debts, params, for_randomness, enable_logging)
108 }
109
110 pub fn compute_tx_start_at_cost(&self, shared_input_objects: &[SharedInputObject]) -> u64 {
116 shared_input_objects
117 .iter()
118 .map(|obj| *self.object_execution_cost.get(&obj.id).unwrap_or(&0))
119 .max()
120 .expect("There must be at least one object in shared_input_objects.")
121 }
122
123 pub fn get_tx_cost(
124 &self,
125 execution_time_estimator: &ExecutionTimeEstimator,
126 cert: &VerifiedExecutableTransaction,
127 indirect_state_observer: &mut IndirectStateObserver,
128 ) -> u64 {
129 let estimate_us = execution_time_estimator
130 .get_estimate(cert.transaction_data())
131 .as_micros()
132 .try_into()
133 .unwrap_or(u64::MAX);
134 if estimate_us >= 15_000 {
135 let digest = cert.digest();
136 debug!(
137 ?digest,
138 "expensive tx cost estimate detected: {estimate_us}us"
139 );
140 }
141
142 indirect_state_observer.observe_indirect_state(&Some(estimate_us));
144
145 estimate_us
146 }
147
148 pub fn should_defer_due_to_object_congestion(
150 &self,
151 cert: &VerifiedExecutableTransaction,
152 previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
153 commit_info: &ConsensusCommitInfo,
154 ) -> Option<(DeferralKey, Vec<ObjectID>)> {
155 let commit_round = commit_info.round;
156
157 let shared_input_objects: Vec<_> = cert.shared_input_objects().collect();
158 if shared_input_objects.is_empty() {
159 return None;
161 }
162
163 let start_cost = self.compute_tx_start_at_cost(&shared_input_objects);
165 let budget = self.params.commit_budget(commit_info);
166 let burst_limit = budget.saturating_add(self.params.max_burst());
167 if start_cost <= burst_limit {
168 return None;
169 }
170
171 let mut congested_objects = vec![];
179 for obj in shared_input_objects {
180 if &start_cost == self.object_execution_cost.get(&obj.id).unwrap_or(&0) {
186 congested_objects.push(obj.id);
187 }
188 }
189
190 assert!(!congested_objects.is_empty());
191
192 let deferral_key =
193 if let Some(previous_key) = previously_deferred_tx_digests.get(cert.digest()) {
194 DeferralKey::new_for_consensus_round(
196 commit_round + 1,
197 previous_key.deferred_from_round(),
198 )
199 } else {
200 DeferralKey::new_for_consensus_round(commit_round + 1, commit_round)
203 };
204 Some((deferral_key, congested_objects))
205 }
206
207 pub fn bump_object_execution_cost(
210 &mut self,
211 tx_cost: u64,
212 cert: &VerifiedExecutableTransaction,
213 ) {
214 let shared_input_objects: Vec<_> = cert.shared_input_objects().collect();
215 if shared_input_objects.is_empty() {
216 return;
217 }
218
219 let start_cost = self.compute_tx_start_at_cost(&shared_input_objects);
220 let end_cost = start_cost.saturating_add(tx_cost);
221
222 for obj in shared_input_objects {
223 if obj.is_accessed_exclusively() {
224 let old_end_cost = self.object_execution_cost.insert(obj.id, end_cost);
225 assert!(old_end_cost.is_none() || old_end_cost.unwrap() <= end_cost);
226 }
227 }
228
229 if let Some(entries) = &mut self.log_entries {
230 entries.push(TransactionCostLogEntry {
231 tx_digest: *cert.digest(),
232 start_cost,
233 end_cost,
234 });
235 }
236 }
237
238 pub fn finish_commit(self, commit_info: &ConsensusCommitInfo) -> FinishedCommitData {
245 let commit_budget = self.params.commit_budget(commit_info);
246 let log_entries = self.log_entries.unwrap_or_default();
247 let final_object_execution_costs = self.object_execution_cost.clone();
248 let accumulated_debts = self
249 .object_execution_cost
250 .into_iter()
251 .filter_map(|(obj_id, cost)| {
252 let remaining_cost = cost.saturating_sub(commit_budget);
253 if remaining_cost > 0 {
254 Some((obj_id, remaining_cost))
255 } else {
256 None
257 }
258 })
259 .collect();
260 FinishedCommitData {
261 accumulated_debts,
262 log_entries,
263 final_object_execution_costs,
264 commit_budget,
265 }
266 }
267
268 pub fn max_cost(&self) -> u64 {
270 self.object_execution_cost
271 .values()
272 .max()
273 .copied()
274 .unwrap_or(0)
275 }
276}
277
278#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
279pub enum CongestionPerObjectDebt {
280 V1(Round, u64),
281}
282
283impl CongestionPerObjectDebt {
284 pub fn new(round: Round, debt: u64) -> Self {
285 Self::V1(round, debt)
286 }
287
288 pub fn into_v1(self) -> (Round, u64) {
289 match self {
290 Self::V1(round, debt) => (round, debt),
291 }
292 }
293}
294
295#[derive(PartialEq, Eq, Clone, Debug)]
296pub struct TransactionCostLogEntry {
297 pub tx_digest: TransactionDigest,
298 pub start_cost: u64,
299 pub end_cost: u64,
300}
301
302pub struct FinishedCommitData {
303 pub accumulated_debts: Vec<(ObjectID, u64)>,
304 pub log_entries: Vec<TransactionCostLogEntry>,
305 pub final_object_execution_costs: HashMap<ObjectID, u64>,
306 pub commit_budget: u64,
307}
308
309#[cfg(test)]
310mod object_cost_tests {
311 use super::*;
312
313 use std::time::Duration;
314 use sui_protocol_config::ExecutionTimeEstimateParams;
315 use sui_test_transaction_builder::TestTransactionBuilder;
316 use sui_types::Identifier;
317 use sui_types::base_types::{SequenceNumber, random_object_ref};
318 use sui_types::crypto::{AccountKeyPair, get_key_pair};
319 use sui_types::transaction::{CallArg, ObjectArg, SharedObjectMutability, VerifiedTransaction};
320
321 fn default_params() -> ExecutionTimeEstimateParams {
322 ExecutionTimeEstimateParams {
323 target_utilization: 0,
324 allowed_txn_cost_overage_burst_limit_us: 0,
325 max_estimate_us: u64::MAX,
326 randomness_scalar: 0,
327 stored_observations_num_included_checkpoints: 10,
328 stored_observations_limit: u64::MAX,
329 stake_weighted_median_threshold: 0,
330 default_none_duration_for_new_keys: false,
331 observations_chunk_size: None,
332 }
333 }
334
335 fn construct_shared_input_objects(objects: &[(ObjectID, bool)]) -> Vec<SharedInputObject> {
336 objects
337 .iter()
338 .map(|(id, mutable)| SharedInputObject {
339 id: *id,
340 initial_shared_version: SequenceNumber::new(),
341 mutability: if *mutable {
342 SharedObjectMutability::Mutable
343 } else {
344 SharedObjectMutability::Immutable
345 },
346 })
347 .collect()
348 }
349
350 #[test]
351 fn test_compute_tx_start_at_cost() {
352 let object_id_0 = ObjectID::random();
353 let object_id_1 = ObjectID::random();
354 let object_id_2 = ObjectID::random();
355
356 let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
357 [(object_id_0, 5), (object_id_1, 10)],
358 default_params(),
359 false,
360 false,
361 );
362
363 let shared_input_objects = construct_shared_input_objects(&[(object_id_0, false)]);
364 assert_eq!(
365 shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
366 5
367 );
368
369 let shared_input_objects = construct_shared_input_objects(&[(object_id_1, true)]);
370 assert_eq!(
371 shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
372 10
373 );
374
375 let shared_input_objects =
376 construct_shared_input_objects(&[(object_id_0, false), (object_id_1, false)]);
377 assert_eq!(
378 shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
379 10
380 );
381
382 let shared_input_objects =
383 construct_shared_input_objects(&[(object_id_0, true), (object_id_1, true)]);
384 assert_eq!(
385 shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
386 10
387 );
388
389 let shared_input_objects = construct_shared_input_objects(&[(object_id_2, true)]);
391 assert_eq!(
392 shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
393 0
394 );
395 }
396
397 fn build_transaction(
401 objects: &[(ObjectID, bool)],
402 gas_budget: u64,
403 ) -> VerifiedExecutableTransaction {
404 let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
405 let gas_object = random_object_ref();
406 VerifiedExecutableTransaction::new_system(
407 VerifiedTransaction::new_unchecked(
408 TestTransactionBuilder::new(sender, gas_object, 1000)
409 .with_gas_budget(gas_budget)
410 .move_call(
411 ObjectID::random(),
412 "unimportant_module",
413 "unimportant_function",
414 objects
415 .iter()
416 .map(|(id, mutable)| {
417 CallArg::Object(ObjectArg::SharedObject {
418 id: *id,
419 initial_shared_version: SequenceNumber::new(),
420 mutability: if *mutable {
421 SharedObjectMutability::Mutable
422 } else {
423 SharedObjectMutability::Immutable
424 },
425 })
426 })
427 .collect(),
428 )
429 .build_and_sign(&keypair),
430 ),
431 0,
432 )
433 }
434
435 fn build_programmable_transaction(
436 objects: &[(ObjectID, bool)],
437 number_of_commands: u64,
438 gas_budget: u64,
439 ) -> VerifiedExecutableTransaction {
440 let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
441 let gas_object = random_object_ref();
442
443 let package_id = ObjectID::random();
444 let mut tx_builder =
445 TestTransactionBuilder::new(sender, gas_object, 1000).with_gas_budget(gas_budget);
446 {
447 let pt_builder = tx_builder.ptb_builder_mut();
448 let mut arguments = Vec::new();
449 for object in objects {
450 arguments.push(
451 pt_builder
452 .obj(ObjectArg::SharedObject {
453 id: object.0,
454 initial_shared_version: SequenceNumber::new(),
455 mutability: if object.1 {
456 SharedObjectMutability::Mutable
457 } else {
458 SharedObjectMutability::Immutable
459 },
460 })
461 .unwrap(),
462 );
463 }
464 for _ in 0..number_of_commands {
465 pt_builder.programmable_move_call(
466 package_id,
467 Identifier::new("unimportant_module").unwrap(),
468 Identifier::new("unimportant_function").unwrap(),
469 vec![],
470 arguments.clone(),
471 );
472 }
473 }
474
475 VerifiedExecutableTransaction::new_system(
476 VerifiedTransaction::new_unchecked(tx_builder.build_and_sign(&keypair)),
477 0,
478 )
479 }
480
481 #[test]
482 fn test_should_defer_return_correct_congested_objects() {
483 let shared_obj_0 = ObjectID::random();
485 let shared_obj_1 = ObjectID::random();
486
487 let tx_gas_budget = 100;
488
489 let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
493 [(shared_obj_0, 750), (shared_obj_1, 0)],
494 default_params(),
495 false,
496 false,
497 );
498
499 for mutable in [true, false].iter() {
501 let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
502 if let Some((_, congested_objects)) = shared_object_congestion_tracker
503 .should_defer_due_to_object_congestion(
504 &tx,
505 &HashMap::new(),
506 &ConsensusCommitInfo::new_for_congestion_test(
507 0,
508 0,
509 Duration::from_micros(1_500),
510 ),
511 )
512 {
513 assert_eq!(congested_objects.len(), 1);
514 assert_eq!(congested_objects[0], shared_obj_0);
515 } else {
516 panic!("should defer");
517 }
518 }
519
520 for mutable in [true, false].iter() {
522 let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
523 assert!(
524 shared_object_congestion_tracker
525 .should_defer_due_to_object_congestion(
526 &tx,
527 &HashMap::new(),
528 &ConsensusCommitInfo::new_for_congestion_test(
529 0,
530 0,
531 Duration::from_micros(1_500),
532 ),
533 )
534 .is_none()
535 );
536 }
537
538 for mutable_0 in [true, false].iter() {
540 for mutable_1 in [true, false].iter() {
541 let tx = build_transaction(
542 &[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
543 tx_gas_budget,
544 );
545 if let Some((_, congested_objects)) = shared_object_congestion_tracker
546 .should_defer_due_to_object_congestion(
547 &tx,
548 &HashMap::new(),
549 &ConsensusCommitInfo::new_for_congestion_test(
550 0,
551 0,
552 Duration::from_micros(1_500),
553 ),
554 )
555 {
556 assert_eq!(congested_objects.len(), 1);
557 assert_eq!(congested_objects[0], shared_obj_0);
558 } else {
559 panic!("should defer");
560 }
561 }
562 }
563 }
564
565 #[test]
566 fn test_should_defer_return_correct_deferral_key() {
567 let shared_obj_0 = ObjectID::random();
568 let tx = build_transaction(&[(shared_obj_0, true)], 100);
569
570 let shared_object_congestion_tracker =
572 SharedObjectCongestionTracker::new([(shared_obj_0, 1)], default_params(), false, false);
573
574 let mut previously_deferred_tx_digests = HashMap::new();
576 previously_deferred_tx_digests.insert(
577 TransactionDigest::random(),
578 DeferralKey::ConsensusRound {
579 future_round: 10,
580 deferred_from_round: 5,
581 },
582 );
583
584 if let Some((
586 DeferralKey::ConsensusRound {
587 future_round,
588 deferred_from_round,
589 },
590 _,
591 )) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
592 &tx,
593 &previously_deferred_tx_digests,
594 &ConsensusCommitInfo::new_for_congestion_test(
595 10,
596 10,
597 Duration::from_micros(10_000_000),
598 ),
599 ) {
600 assert_eq!(future_round, 11);
601 assert_eq!(deferred_from_round, 10);
602 } else {
603 panic!("should defer");
604 }
605
606 previously_deferred_tx_digests.insert(
608 *tx.digest(),
609 DeferralKey::Randomness {
610 deferred_from_round: 4,
611 },
612 );
613
614 if let Some((
616 DeferralKey::ConsensusRound {
617 future_round,
618 deferred_from_round,
619 },
620 _,
621 )) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
622 &tx,
623 &previously_deferred_tx_digests,
624 &ConsensusCommitInfo::new_for_congestion_test(
625 10,
626 10,
627 Duration::from_micros(10_000_000),
628 ),
629 ) {
630 assert_eq!(future_round, 11);
631 assert_eq!(deferred_from_round, 4);
632 } else {
633 panic!("should defer");
634 }
635
636 previously_deferred_tx_digests.insert(
638 *tx.digest(),
639 DeferralKey::ConsensusRound {
640 future_round: 10,
641 deferred_from_round: 5,
642 },
643 );
644
645 if let Some((
647 DeferralKey::ConsensusRound {
648 future_round,
649 deferred_from_round,
650 },
651 _,
652 )) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
653 &tx,
654 &previously_deferred_tx_digests,
655 &ConsensusCommitInfo::new_for_congestion_test(
656 10,
657 10,
658 Duration::from_micros(10_000_000),
659 ),
660 ) {
661 assert_eq!(future_round, 11);
662 assert_eq!(deferred_from_round, 5);
663 } else {
664 panic!("should defer");
665 }
666 }
667
668 #[test]
669 fn test_should_defer_allow_overage() {
670 telemetry_subscribers::init_for_testing();
671
672 let shared_obj_0 = ObjectID::random();
674 let shared_obj_1 = ObjectID::random();
675
676 let tx_gas_budget = 100;
677
678 let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
682 [(shared_obj_0, 1_700_000), (shared_obj_1, 300_000)],
683 ExecutionTimeEstimateParams {
684 target_utilization: 16,
685 allowed_txn_cost_overage_burst_limit_us: 0,
686 randomness_scalar: 0,
687 max_estimate_us: u64::MAX,
688 stored_observations_num_included_checkpoints: 10,
689 stored_observations_limit: u64::MAX,
690 stake_weighted_median_threshold: 0,
691 default_none_duration_for_new_keys: false,
692 observations_chunk_size: None,
693 },
694 false,
695 false,
696 );
697
698 for mutable in [true, false].iter() {
700 let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
701 if let Some((_, congested_objects)) = shared_object_congestion_tracker
702 .should_defer_due_to_object_congestion(
703 &tx,
704 &HashMap::new(),
705 &ConsensusCommitInfo::new_for_congestion_test(
706 0,
707 0,
708 Duration::from_micros(10_000_000),
709 ),
710 )
711 {
712 assert_eq!(congested_objects.len(), 1);
713 assert_eq!(congested_objects[0], shared_obj_0);
714 } else {
715 panic!("should defer");
716 }
717 }
718
719 for mutable in [true, false].iter() {
721 let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
722 assert!(
723 shared_object_congestion_tracker
724 .should_defer_due_to_object_congestion(
725 &tx,
726 &HashMap::new(),
727 &ConsensusCommitInfo::new_for_congestion_test(
728 0,
729 0,
730 Duration::from_micros(10_000_000)
731 ),
732 )
733 .is_none()
734 );
735 }
736
737 for mutable_0 in [true, false].iter() {
739 for mutable_1 in [true, false].iter() {
740 let tx = build_transaction(
741 &[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
742 tx_gas_budget,
743 );
744 if let Some((_, congested_objects)) = shared_object_congestion_tracker
745 .should_defer_due_to_object_congestion(
746 &tx,
747 &HashMap::new(),
748 &ConsensusCommitInfo::new_for_congestion_test(
749 0,
750 0,
751 Duration::from_micros(10_000_000),
752 ),
753 )
754 {
755 assert_eq!(congested_objects.len(), 1);
756 assert_eq!(congested_objects[0], shared_obj_0);
757 } else {
758 panic!("should defer");
759 }
760 }
761 }
762 }
763
764 #[test]
765 fn test_should_defer_allow_overage_with_burst() {
766 telemetry_subscribers::init_for_testing();
767
768 let shared_obj_0 = ObjectID::random();
769 let shared_obj_1 = ObjectID::random();
770
771 let tx_gas_budget = 100;
772
773 let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
778 [(shared_obj_0, 4_000_000), (shared_obj_1, 2_000_000)],
779 ExecutionTimeEstimateParams {
780 target_utilization: 16,
781 allowed_txn_cost_overage_burst_limit_us: 1_500_000,
782 randomness_scalar: 0,
783 max_estimate_us: u64::MAX,
784 stored_observations_num_included_checkpoints: 10,
785 stored_observations_limit: u64::MAX,
786 stake_weighted_median_threshold: 0,
787 default_none_duration_for_new_keys: false,
788 observations_chunk_size: None,
789 },
790 false,
791 false,
792 );
793
794 for mutable in [true, false].iter() {
796 let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
797 if let Some((_, congested_objects)) = shared_object_congestion_tracker
798 .should_defer_due_to_object_congestion(
799 &tx,
800 &HashMap::new(),
801 &ConsensusCommitInfo::new_for_congestion_test(
802 0,
803 0,
804 Duration::from_micros(10_000_000),
805 ),
806 )
807 {
808 assert_eq!(congested_objects.len(), 1);
809 assert_eq!(congested_objects[0], shared_obj_0);
810 } else {
811 panic!("should defer");
812 }
813 }
814
815 for mutable in [true, false].iter() {
818 let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
819 assert!(
820 shared_object_congestion_tracker
821 .should_defer_due_to_object_congestion(
822 &tx,
823 &HashMap::new(),
824 &ConsensusCommitInfo::new_for_congestion_test(
825 0,
826 0,
827 Duration::from_micros(10_000_000)
828 ),
829 )
830 .is_none()
831 );
832 }
833
834 for mutable_0 in [true, false].iter() {
836 for mutable_1 in [true, false].iter() {
837 let tx = build_transaction(
838 &[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
839 tx_gas_budget,
840 );
841 if let Some((_, congested_objects)) = shared_object_congestion_tracker
842 .should_defer_due_to_object_congestion(
843 &tx,
844 &HashMap::new(),
845 &ConsensusCommitInfo::new_for_congestion_test(
846 0,
847 0,
848 Duration::from_micros(10_000_000),
849 ),
850 )
851 {
852 assert_eq!(congested_objects.len(), 1);
853 assert_eq!(congested_objects[0], shared_obj_0);
854 } else {
855 panic!("should defer");
856 }
857 }
858 }
859 }
860
861 #[test]
862 fn test_bump_object_execution_cost() {
863 telemetry_subscribers::init_for_testing();
864
865 let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
866
867 let object_id_0 = ObjectID::random();
868 let object_id_1 = ObjectID::random();
869 let object_id_2 = ObjectID::random();
870
871 let params = default_params();
872 let mut shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
873 [(object_id_0, 5), (object_id_1, 10)],
874 params,
875 false,
876 false,
877 );
878 assert_eq!(shared_object_congestion_tracker.max_cost(), 10);
879
880 let cert = build_transaction(&[(object_id_0, false), (object_id_1, false)], 10);
882 shared_object_congestion_tracker.bump_object_execution_cost(
883 shared_object_congestion_tracker.get_tx_cost(
884 &execution_time_estimator,
885 &cert,
886 &mut IndirectStateObserver::new(),
887 ),
888 &cert,
889 );
890 assert_eq!(
891 shared_object_congestion_tracker,
892 SharedObjectCongestionTracker::new(
893 [(object_id_0, 5), (object_id_1, 10)],
894 params,
895 false,
896 false,
897 )
898 );
899 assert_eq!(shared_object_congestion_tracker.max_cost(), 10);
900
901 let cert = build_transaction(&[(object_id_0, true), (object_id_1, false)], 10);
903 shared_object_congestion_tracker.bump_object_execution_cost(
904 shared_object_congestion_tracker.get_tx_cost(
905 &execution_time_estimator,
906 &cert,
907 &mut IndirectStateObserver::new(),
908 ),
909 &cert,
910 );
911 let expected_object_0_cost = 1_010;
913 assert_eq!(
914 shared_object_congestion_tracker,
915 SharedObjectCongestionTracker::new(
916 [(object_id_0, expected_object_0_cost), (object_id_1, 10)],
917 params,
918 false,
919 false,
920 )
921 );
922 assert_eq!(
923 shared_object_congestion_tracker.max_cost(),
924 expected_object_0_cost
925 );
926
927 let cert = build_transaction(
929 &[
930 (object_id_0, true),
931 (object_id_1, true),
932 (object_id_2, true),
933 ],
934 10,
935 );
936 let expected_object_cost = 2_010;
938 shared_object_congestion_tracker.bump_object_execution_cost(
939 shared_object_congestion_tracker.get_tx_cost(
940 &execution_time_estimator,
941 &cert,
942 &mut IndirectStateObserver::new(),
943 ),
944 &cert,
945 );
946 assert_eq!(
947 shared_object_congestion_tracker,
948 SharedObjectCongestionTracker::new(
949 [
950 (object_id_0, expected_object_cost),
951 (object_id_1, expected_object_cost),
952 (object_id_2, expected_object_cost)
953 ],
954 params,
955 false,
956 false,
957 )
958 );
959 assert_eq!(
960 shared_object_congestion_tracker.max_cost(),
961 expected_object_cost
962 );
963
964 let cert = build_programmable_transaction(
966 &[
967 (object_id_0, true),
968 (object_id_1, true),
969 (object_id_2, true),
970 ],
971 7,
972 30,
973 );
974 let expected_object_cost = 9_010;
976 shared_object_congestion_tracker.bump_object_execution_cost(
977 shared_object_congestion_tracker.get_tx_cost(
978 &execution_time_estimator,
979 &cert,
980 &mut IndirectStateObserver::new(),
981 ),
982 &cert,
983 );
984 assert_eq!(
985 shared_object_congestion_tracker,
986 SharedObjectCongestionTracker::new(
987 [
988 (object_id_0, expected_object_cost),
989 (object_id_1, expected_object_cost),
990 (object_id_2, expected_object_cost)
991 ],
992 params,
993 false,
994 false,
995 )
996 );
997 assert_eq!(
998 shared_object_congestion_tracker.max_cost(),
999 expected_object_cost
1000 );
1001 }
1002
1003 #[test]
1004 fn test_finish_commit_empty_debts() {
1005 let object_id_0 = ObjectID::random();
1006 let object_id_1 = ObjectID::random();
1007 let object_id_2 = ObjectID::random();
1008
1009 let tracker = SharedObjectCongestionTracker::new(
1010 [(object_id_0, 0), (object_id_1, 0), (object_id_2, 0)],
1011 default_params(),
1012 false,
1013 false,
1014 );
1015
1016 let data = tracker.finish_commit(&ConsensusCommitInfo::new_for_congestion_test(
1017 0,
1018 0,
1019 Duration::ZERO,
1020 ));
1021 assert!(data.accumulated_debts.is_empty());
1022 }
1023
1024 #[test]
1025 fn test_finish_commit_returns_correct_debts() {
1026 telemetry_subscribers::init_for_testing();
1027
1028 let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
1029
1030 let shared_obj_0 = ObjectID::random();
1031 let shared_obj_1 = ObjectID::random();
1032
1033 let params = ExecutionTimeEstimateParams {
1034 target_utilization: 100,
1035 allowed_txn_cost_overage_burst_limit_us: 1_600 * 5,
1036 randomness_scalar: 0,
1037 max_estimate_us: u64::MAX,
1038 stored_observations_num_included_checkpoints: 10,
1039 stored_observations_limit: u64::MAX,
1040 stake_weighted_median_threshold: 0,
1041 default_none_duration_for_new_keys: false,
1042 observations_chunk_size: None,
1043 };
1044 let mut tracker = SharedObjectCongestionTracker::new(
1045 [(shared_obj_0, 500), (shared_obj_1, 500)],
1046 params,
1047 false,
1048 false,
1049 );
1050
1051 let tx = build_transaction(&[(shared_obj_0, true)], 100);
1052 tracker.bump_object_execution_cost(
1053 tracker.get_tx_cost(
1054 &execution_time_estimator,
1055 &tx,
1056 &mut IndirectStateObserver::new(),
1057 ),
1058 &tx,
1059 );
1060
1061 let commit_info =
1062 ConsensusCommitInfo::new_for_congestion_test(0, 0, Duration::from_micros(800));
1063 let data = tracker.finish_commit(&commit_info);
1064
1065 assert_eq!(data.accumulated_debts.len(), 1);
1066 assert_eq!(data.accumulated_debts[0], (shared_obj_0, 700));
1067 assert_eq!(data.commit_budget, 800);
1068 }
1069
1070 #[test]
1071 fn test_finish_commit_with_logging() {
1072 telemetry_subscribers::init_for_testing();
1073
1074 let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
1075
1076 let shared_obj_0 = ObjectID::random();
1077 let shared_obj_1 = ObjectID::random();
1078
1079 let params = ExecutionTimeEstimateParams {
1080 target_utilization: 100,
1081 allowed_txn_cost_overage_burst_limit_us: 1_600 * 5,
1082 randomness_scalar: 0,
1083 max_estimate_us: u64::MAX,
1084 stored_observations_num_included_checkpoints: 10,
1085 stored_observations_limit: u64::MAX,
1086 stake_weighted_median_threshold: 0,
1087 default_none_duration_for_new_keys: false,
1088 observations_chunk_size: None,
1089 };
1090 let mut tracker = SharedObjectCongestionTracker::new(
1091 [(shared_obj_0, 0), (shared_obj_1, 0)],
1092 params,
1093 false,
1094 true,
1095 );
1096
1097 let tx = build_transaction(&[(shared_obj_0, true), (shared_obj_1, true)], 100);
1098 let tx_digest = *tx.digest();
1099 tracker.bump_object_execution_cost(
1100 tracker.get_tx_cost(
1101 &execution_time_estimator,
1102 &tx,
1103 &mut IndirectStateObserver::new(),
1104 ),
1105 &tx,
1106 );
1107
1108 let commit_info =
1109 ConsensusCommitInfo::new_for_congestion_test(0, 0, Duration::from_micros(800));
1110 let data = tracker.finish_commit(&commit_info);
1111
1112 assert_eq!(data.log_entries.len(), 1);
1113 assert_eq!(data.log_entries[0].tx_digest, tx_digest);
1114 assert_eq!(data.log_entries[0].start_cost, 0);
1115 assert!(
1116 data.final_object_execution_costs
1117 .contains_key(&shared_obj_0)
1118 );
1119 assert!(
1120 data.final_object_execution_costs
1121 .contains_key(&shared_obj_1)
1122 );
1123 }
1124}