sui_core/authority/
shared_object_congestion_tracker.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::execution_time_estimator::ExecutionTimeEstimator;
5use crate::authority::transaction_deferral::DeferralKey;
6use crate::consensus_handler::{ConsensusCommitInfo, IndirectStateObserver};
7use mysten_common::fatal;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use sui_protocol_config::{
11    ExecutionTimeEstimateParams, PerObjectCongestionControlMode, ProtocolConfig,
12};
13use sui_types::base_types::{ObjectID, TransactionDigest};
14use sui_types::executable_transaction::VerifiedExecutableTransaction;
15use sui_types::messages_consensus::Round;
16use sui_types::transaction::SharedInputObject;
17use tracing::{debug, trace};
18
19#[derive(PartialEq, Eq, Clone, Debug)]
20struct Params {
21    params: ExecutionTimeEstimateParams,
22    for_randomness: bool,
23}
24
25impl Params {
26    // Get the target budget per commit. Over the long term, the scheduler will try to
27    // schedule no more than this much work per object per commit on average.
28    pub fn commit_budget(&self, commit_info: &ConsensusCommitInfo) -> u64 {
29        let estimated_commit_period = commit_info.estimated_commit_period();
30        let commit_period_micros = estimated_commit_period.as_micros() as u64;
31        let mut budget = commit_period_micros.saturating_mul(self.params.target_utilization) / 100;
32        if self.for_randomness {
33            budget = budget.saturating_mul(self.params.randomness_scalar) / 100;
34        }
35        budget
36    }
37
38    // The amount scheduled in a commit can "burst" up to this much over the target budget.
39    // The per-object debt limit will enforce the average limit over time.
40    pub fn max_burst(&self) -> u64 {
41        let mut burst = self.params.allowed_txn_cost_overage_burst_limit_us;
42        if self.for_randomness {
43            burst = burst.saturating_mul(self.params.randomness_scalar) / 100;
44        }
45        burst
46    }
47}
48
49// SharedObjectCongestionTracker stores the accumulated cost of executing transactions on an object, for
50// all transactions in a consensus commit.
51//
52// Cost is an indication of transaction execution latency. When transactions are scheduled by
53// the consensus handler, each scheduled transaction adds cost (execution latency) to all the objects it
54// reads or writes.
55//
56// The goal of this data structure is to capture the critical path of transaction execution latency on each
57// objects.
58//
59// The mode field determines how the cost is calculated. The cost can be calculated based on the total gas
60// budget, or total number of transaction count.
61#[derive(PartialEq, Eq, Clone, Debug)]
62pub struct SharedObjectCongestionTracker {
63    object_execution_cost: HashMap<ObjectID, u64>,
64    params: Params,
65}
66
67impl SharedObjectCongestionTracker {
68    pub fn new(
69        initial_object_debts: impl IntoIterator<Item = (ObjectID, u64)>,
70        params: ExecutionTimeEstimateParams,
71        for_randomness: bool,
72    ) -> Self {
73        let object_execution_cost: HashMap<ObjectID, u64> =
74            initial_object_debts.into_iter().collect();
75        trace!(
76            "created SharedObjectCongestionTracker with
77             {} initial object debts,
78             params: {params:?},
79             for_randomness: {for_randomness},",
80            object_execution_cost.len(),
81        );
82        Self {
83            object_execution_cost,
84            params: Params {
85                params,
86                for_randomness,
87            },
88        }
89    }
90
91    pub fn from_protocol_config(
92        initial_object_debts: impl IntoIterator<Item = (ObjectID, u64)>,
93        protocol_config: &ProtocolConfig,
94        for_randomness: bool,
95    ) -> Self {
96        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) =
97            protocol_config.per_object_congestion_control_mode()
98        else {
99            fatal!(
100                "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
101            );
102        };
103        Self::new(initial_object_debts, params, for_randomness)
104    }
105
106    // Given a list of shared input objects, returns the starting cost of a transaction that operates on
107    // these objects.
108    //
109    // Starting cost is a proxy for the starting time of the transaction. It is determined by all the input
110    // shared objects' last write.
111    pub fn compute_tx_start_at_cost(&self, shared_input_objects: &[SharedInputObject]) -> u64 {
112        shared_input_objects
113            .iter()
114            .map(|obj| *self.object_execution_cost.get(&obj.id).unwrap_or(&0))
115            .max()
116            .expect("There must be at least one object in shared_input_objects.")
117    }
118
119    pub fn get_tx_cost(
120        &self,
121        execution_time_estimator: &ExecutionTimeEstimator,
122        cert: &VerifiedExecutableTransaction,
123        indirect_state_observer: &mut IndirectStateObserver,
124    ) -> u64 {
125        let estimate_us = execution_time_estimator
126            .get_estimate(cert.transaction_data())
127            .as_micros()
128            .try_into()
129            .unwrap_or(u64::MAX);
130        if estimate_us >= 15_000 {
131            let digest = cert.digest();
132            debug!(
133                ?digest,
134                "expensive tx cost estimate detected: {estimate_us}us"
135            );
136        }
137
138        // Historically this was an Option<u64>, must keep it that way for consistency.
139        indirect_state_observer.observe_indirect_state(&Some(estimate_us));
140
141        estimate_us
142    }
143
144    // Given a transaction, returns the deferral key and the congested objects if the transaction should be deferred.
145    pub fn should_defer_due_to_object_congestion(
146        &self,
147        cert: &VerifiedExecutableTransaction,
148        previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
149        commit_info: &ConsensusCommitInfo,
150    ) -> Option<(DeferralKey, Vec<ObjectID>)> {
151        let commit_round = commit_info.round;
152
153        let shared_input_objects: Vec<_> = cert.shared_input_objects().collect();
154        if shared_input_objects.is_empty() {
155            // No shared object used by this transaction. No need to defer.
156            return None;
157        }
158
159        // Allow tx if it's within configured limits.
160        let start_cost = self.compute_tx_start_at_cost(&shared_input_objects);
161        let budget = self.params.commit_budget(commit_info);
162        let burst_limit = budget.saturating_add(self.params.max_burst());
163        if start_cost <= burst_limit {
164            return None;
165        }
166
167        // Finds out the congested objects.
168        //
169        // Note that the congested objects here may be caused by transaction dependency of other congested objects.
170        // Consider in a consensus commit, there are many transactions touching object A, and later in processing the
171        // consensus commit, there is a transaction touching both object A and B. Although there are fewer transactions
172        // touching object B, because it's starting execution is delayed due to dependency to other transactions on
173        // object A, it may be shown up as congested objects.
174        let mut congested_objects = vec![];
175        for obj in shared_input_objects {
176            // TODO: right now, we only return objects that are on the execution critical path in this consensus commit.
177            // However, for objects that are no on the critical path, they may potentially also be congested (e.g., an
178            // object has start cost == start_cost - 1, and adding the gas budget will exceed the limit). We don't
179            // return them for now because it's unclear how they can be used to return suggested gas price for the
180            // user. We need to revisit this later once we have a clear idea of how to determine the suggested gas price.
181            if &start_cost == self.object_execution_cost.get(&obj.id).unwrap_or(&0) {
182                congested_objects.push(obj.id);
183            }
184        }
185
186        assert!(!congested_objects.is_empty());
187
188        let deferral_key =
189            if let Some(previous_key) = previously_deferred_tx_digests.get(cert.digest()) {
190                // This transaction has been deferred in previous consensus commit. Use its previous deferred_from_round.
191                DeferralKey::new_for_consensus_round(
192                    commit_round + 1,
193                    previous_key.deferred_from_round(),
194                )
195            } else {
196                // This transaction has not been deferred before. Use the current commit round
197                // as the deferred_from_round.
198                DeferralKey::new_for_consensus_round(commit_round + 1, commit_round)
199            };
200        Some((deferral_key, congested_objects))
201    }
202
203    // Update shared objects' execution cost used in `cert` using `cert`'s execution cost.
204    // This is called when `cert` is scheduled for execution.
205    pub fn bump_object_execution_cost(
206        &mut self,
207        tx_cost: u64,
208        cert: &VerifiedExecutableTransaction,
209    ) {
210        let shared_input_objects: Vec<_> = cert.shared_input_objects().collect();
211        if shared_input_objects.is_empty() {
212            return;
213        }
214
215        let start_cost = self.compute_tx_start_at_cost(&shared_input_objects);
216        let end_cost = start_cost.saturating_add(tx_cost);
217
218        for obj in shared_input_objects {
219            if obj.is_accessed_exclusively() {
220                let old_end_cost = self.object_execution_cost.insert(obj.id, end_cost);
221                assert!(old_end_cost.is_none() || old_end_cost.unwrap() <= end_cost);
222            }
223        }
224    }
225
226    // Returns accumulated debts for objects whose budgets have been exceeded over the course
227    // of the commit. Consumes the tracker object, since this should only be called once after
228    // all tx have been processed.
229    pub fn accumulated_debts(self, commit_info: &ConsensusCommitInfo) -> Vec<(ObjectID, u64)> {
230        self.object_execution_cost
231            .into_iter()
232            .filter_map(|(obj_id, cost)| {
233                let remaining_cost = cost.saturating_sub(self.params.commit_budget(commit_info));
234                if remaining_cost > 0 {
235                    Some((obj_id, remaining_cost))
236                } else {
237                    None
238                }
239            })
240            .collect()
241    }
242
243    // Returns the maximum cost of all objects.
244    pub fn max_cost(&self) -> u64 {
245        self.object_execution_cost
246            .values()
247            .max()
248            .copied()
249            .unwrap_or(0)
250    }
251}
252
253#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
254pub enum CongestionPerObjectDebt {
255    V1(Round, u64),
256}
257
258impl CongestionPerObjectDebt {
259    pub fn new(round: Round, debt: u64) -> Self {
260        Self::V1(round, debt)
261    }
262
263    pub fn into_v1(self) -> (Round, u64) {
264        match self {
265            Self::V1(round, debt) => (round, debt),
266        }
267    }
268}
269
270#[cfg(test)]
271mod object_cost_tests {
272    use super::*;
273
274    use std::time::Duration;
275    use sui_protocol_config::ExecutionTimeEstimateParams;
276    use sui_test_transaction_builder::TestTransactionBuilder;
277    use sui_types::Identifier;
278    use sui_types::base_types::{SequenceNumber, random_object_ref};
279    use sui_types::crypto::{AccountKeyPair, get_key_pair};
280    use sui_types::transaction::{CallArg, ObjectArg, SharedObjectMutability, VerifiedTransaction};
281
282    fn default_params() -> ExecutionTimeEstimateParams {
283        ExecutionTimeEstimateParams {
284            target_utilization: 0,
285            allowed_txn_cost_overage_burst_limit_us: 0,
286            max_estimate_us: u64::MAX,
287            randomness_scalar: 0,
288            stored_observations_num_included_checkpoints: 10,
289            stored_observations_limit: u64::MAX,
290            stake_weighted_median_threshold: 0,
291            default_none_duration_for_new_keys: false,
292            observations_chunk_size: None,
293        }
294    }
295
296    fn construct_shared_input_objects(objects: &[(ObjectID, bool)]) -> Vec<SharedInputObject> {
297        objects
298            .iter()
299            .map(|(id, mutable)| SharedInputObject {
300                id: *id,
301                initial_shared_version: SequenceNumber::new(),
302                mutability: if *mutable {
303                    SharedObjectMutability::Mutable
304                } else {
305                    SharedObjectMutability::Immutable
306                },
307            })
308            .collect()
309    }
310
311    #[test]
312    fn test_compute_tx_start_at_cost() {
313        let object_id_0 = ObjectID::random();
314        let object_id_1 = ObjectID::random();
315        let object_id_2 = ObjectID::random();
316
317        let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
318            [(object_id_0, 5), (object_id_1, 10)],
319            default_params(),
320            false,
321        );
322
323        let shared_input_objects = construct_shared_input_objects(&[(object_id_0, false)]);
324        assert_eq!(
325            shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
326            5
327        );
328
329        let shared_input_objects = construct_shared_input_objects(&[(object_id_1, true)]);
330        assert_eq!(
331            shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
332            10
333        );
334
335        let shared_input_objects =
336            construct_shared_input_objects(&[(object_id_0, false), (object_id_1, false)]);
337        assert_eq!(
338            shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
339            10
340        );
341
342        let shared_input_objects =
343            construct_shared_input_objects(&[(object_id_0, true), (object_id_1, true)]);
344        assert_eq!(
345            shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
346            10
347        );
348
349        // Test tx that touch object for the first time, which should start from 0.
350        let shared_input_objects = construct_shared_input_objects(&[(object_id_2, true)]);
351        assert_eq!(
352            shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
353            0
354        );
355    }
356
357    // Builds a certificate with a list of shared objects and their mutability. The certificate is only used to
358    // test the SharedObjectCongestionTracker functions, therefore the content other than shared inputs and gas budget
359    // are not important.
360    fn build_transaction(
361        objects: &[(ObjectID, bool)],
362        gas_budget: u64,
363    ) -> VerifiedExecutableTransaction {
364        let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
365        let gas_object = random_object_ref();
366        VerifiedExecutableTransaction::new_system(
367            VerifiedTransaction::new_unchecked(
368                TestTransactionBuilder::new(sender, gas_object, 1000)
369                    .with_gas_budget(gas_budget)
370                    .move_call(
371                        ObjectID::random(),
372                        "unimportant_module",
373                        "unimportant_function",
374                        objects
375                            .iter()
376                            .map(|(id, mutable)| {
377                                CallArg::Object(ObjectArg::SharedObject {
378                                    id: *id,
379                                    initial_shared_version: SequenceNumber::new(),
380                                    mutability: if *mutable {
381                                        SharedObjectMutability::Mutable
382                                    } else {
383                                        SharedObjectMutability::Immutable
384                                    },
385                                })
386                            })
387                            .collect(),
388                    )
389                    .build_and_sign(&keypair),
390            ),
391            0,
392        )
393    }
394
395    fn build_programmable_transaction(
396        objects: &[(ObjectID, bool)],
397        number_of_commands: u64,
398        gas_budget: u64,
399    ) -> VerifiedExecutableTransaction {
400        let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
401        let gas_object = random_object_ref();
402
403        let package_id = ObjectID::random();
404        let mut tx_builder =
405            TestTransactionBuilder::new(sender, gas_object, 1000).with_gas_budget(gas_budget);
406        {
407            let pt_builder = tx_builder.ptb_builder_mut();
408            let mut arguments = Vec::new();
409            for object in objects {
410                arguments.push(
411                    pt_builder
412                        .obj(ObjectArg::SharedObject {
413                            id: object.0,
414                            initial_shared_version: SequenceNumber::new(),
415                            mutability: if object.1 {
416                                SharedObjectMutability::Mutable
417                            } else {
418                                SharedObjectMutability::Immutable
419                            },
420                        })
421                        .unwrap(),
422                );
423            }
424            for _ in 0..number_of_commands {
425                pt_builder.programmable_move_call(
426                    package_id,
427                    Identifier::new("unimportant_module").unwrap(),
428                    Identifier::new("unimportant_function").unwrap(),
429                    vec![],
430                    arguments.clone(),
431                );
432            }
433        }
434
435        VerifiedExecutableTransaction::new_system(
436            VerifiedTransaction::new_unchecked(tx_builder.build_and_sign(&keypair)),
437            0,
438        )
439    }
440
441    #[test]
442    fn test_should_defer_return_correct_congested_objects() {
443        // Creates two shared objects and three transactions that operate on these objects.
444        let shared_obj_0 = ObjectID::random();
445        let shared_obj_1 = ObjectID::random();
446
447        let tx_gas_budget = 100;
448
449        // Construct object execution cost:
450        // object 0 has cost 750 (which exceeds burst limit)
451        // object 1 has cost 0 (within burst limit)
452        let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
453            [(shared_obj_0, 750), (shared_obj_1, 0)],
454            default_params(),
455            false,
456        );
457
458        // Read/write to object 0 should be deferred.
459        for mutable in [true, false].iter() {
460            let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
461            if let Some((_, congested_objects)) = shared_object_congestion_tracker
462                .should_defer_due_to_object_congestion(
463                    &tx,
464                    &HashMap::new(),
465                    &ConsensusCommitInfo::new_for_congestion_test(
466                        0,
467                        0,
468                        Duration::from_micros(1_500),
469                    ),
470                )
471            {
472                assert_eq!(congested_objects.len(), 1);
473                assert_eq!(congested_objects[0], shared_obj_0);
474            } else {
475                panic!("should defer");
476            }
477        }
478
479        // Read/write to object 1 should go through.
480        for mutable in [true, false].iter() {
481            let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
482            assert!(
483                shared_object_congestion_tracker
484                    .should_defer_due_to_object_congestion(
485                        &tx,
486                        &HashMap::new(),
487                        &ConsensusCommitInfo::new_for_congestion_test(
488                            0,
489                            0,
490                            Duration::from_micros(1_500),
491                        ),
492                    )
493                    .is_none()
494            );
495        }
496
497        // Transactions touching both objects should be deferred, with object 0 as the congested object.
498        for mutable_0 in [true, false].iter() {
499            for mutable_1 in [true, false].iter() {
500                let tx = build_transaction(
501                    &[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
502                    tx_gas_budget,
503                );
504                if let Some((_, congested_objects)) = shared_object_congestion_tracker
505                    .should_defer_due_to_object_congestion(
506                        &tx,
507                        &HashMap::new(),
508                        &ConsensusCommitInfo::new_for_congestion_test(
509                            0,
510                            0,
511                            Duration::from_micros(1_500),
512                        ),
513                    )
514                {
515                    assert_eq!(congested_objects.len(), 1);
516                    assert_eq!(congested_objects[0], shared_obj_0);
517                } else {
518                    panic!("should defer");
519                }
520            }
521        }
522    }
523
524    #[test]
525    fn test_should_defer_return_correct_deferral_key() {
526        let shared_obj_0 = ObjectID::random();
527        let tx = build_transaction(&[(shared_obj_0, true)], 100);
528
529        // Set initial cost that exceeds 0 burst limit
530        let shared_object_congestion_tracker =
531            SharedObjectCongestionTracker::new([(shared_obj_0, 1)], default_params(), false);
532
533        // Insert a random pre-existing transaction.
534        let mut previously_deferred_tx_digests = HashMap::new();
535        previously_deferred_tx_digests.insert(
536            TransactionDigest::random(),
537            DeferralKey::ConsensusRound {
538                future_round: 10,
539                deferred_from_round: 5,
540            },
541        );
542
543        // Test deferral key for a transaction that has not been deferred before.
544        if let Some((
545            DeferralKey::ConsensusRound {
546                future_round,
547                deferred_from_round,
548            },
549            _,
550        )) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
551            &tx,
552            &previously_deferred_tx_digests,
553            &ConsensusCommitInfo::new_for_congestion_test(
554                10,
555                10,
556                Duration::from_micros(10_000_000),
557            ),
558        ) {
559            assert_eq!(future_round, 11);
560            assert_eq!(deferred_from_round, 10);
561        } else {
562            panic!("should defer");
563        }
564
565        // Insert `tx` as previously deferred transaction due to randomness.
566        previously_deferred_tx_digests.insert(
567            *tx.digest(),
568            DeferralKey::Randomness {
569                deferred_from_round: 4,
570            },
571        );
572
573        // New deferral key should have deferred_from_round equal to the deferred randomness round.
574        if let Some((
575            DeferralKey::ConsensusRound {
576                future_round,
577                deferred_from_round,
578            },
579            _,
580        )) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
581            &tx,
582            &previously_deferred_tx_digests,
583            &ConsensusCommitInfo::new_for_congestion_test(
584                10,
585                10,
586                Duration::from_micros(10_000_000),
587            ),
588        ) {
589            assert_eq!(future_round, 11);
590            assert_eq!(deferred_from_round, 4);
591        } else {
592            panic!("should defer");
593        }
594
595        // Insert `tx` as previously deferred consensus transaction.
596        previously_deferred_tx_digests.insert(
597            *tx.digest(),
598            DeferralKey::ConsensusRound {
599                future_round: 10,
600                deferred_from_round: 5,
601            },
602        );
603
604        // New deferral key should have deferred_from_round equal to the one in the old deferral key.
605        if let Some((
606            DeferralKey::ConsensusRound {
607                future_round,
608                deferred_from_round,
609            },
610            _,
611        )) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
612            &tx,
613            &previously_deferred_tx_digests,
614            &ConsensusCommitInfo::new_for_congestion_test(
615                10,
616                10,
617                Duration::from_micros(10_000_000),
618            ),
619        ) {
620            assert_eq!(future_round, 11);
621            assert_eq!(deferred_from_round, 5);
622        } else {
623            panic!("should defer");
624        }
625    }
626
627    #[test]
628    fn test_should_defer_allow_overage() {
629        telemetry_subscribers::init_for_testing();
630
631        // Creates two shared objects.
632        let shared_obj_0 = ObjectID::random();
633        let shared_obj_1 = ObjectID::random();
634
635        let tx_gas_budget = 100;
636
637        // Construct object execution cost:
638        // object 0 has cost 1.7M (exceeds burst limit of ~1.6M with 16% utilization on 10s commit)
639        // object 1 has cost 300K (within burst limit)
640        let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
641            [(shared_obj_0, 1_700_000), (shared_obj_1, 300_000)],
642            ExecutionTimeEstimateParams {
643                target_utilization: 16,
644                allowed_txn_cost_overage_burst_limit_us: 0,
645                randomness_scalar: 0,
646                max_estimate_us: u64::MAX,
647                stored_observations_num_included_checkpoints: 10,
648                stored_observations_limit: u64::MAX,
649                stake_weighted_median_threshold: 0,
650                default_none_duration_for_new_keys: false,
651                observations_chunk_size: None,
652            },
653            false,
654        );
655
656        // Read/write to object 0 should be deferred.
657        for mutable in [true, false].iter() {
658            let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
659            if let Some((_, congested_objects)) = shared_object_congestion_tracker
660                .should_defer_due_to_object_congestion(
661                    &tx,
662                    &HashMap::new(),
663                    &ConsensusCommitInfo::new_for_congestion_test(
664                        0,
665                        0,
666                        Duration::from_micros(10_000_000),
667                    ),
668                )
669            {
670                assert_eq!(congested_objects.len(), 1);
671                assert_eq!(congested_objects[0], shared_obj_0);
672            } else {
673                panic!("should defer");
674            }
675        }
676
677        // Read/write to object 1 should go through even though the budget is exceeded.
678        for mutable in [true, false].iter() {
679            let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
680            assert!(
681                shared_object_congestion_tracker
682                    .should_defer_due_to_object_congestion(
683                        &tx,
684                        &HashMap::new(),
685                        &ConsensusCommitInfo::new_for_congestion_test(
686                            0,
687                            0,
688                            Duration::from_micros(10_000_000)
689                        ),
690                    )
691                    .is_none()
692            );
693        }
694
695        // Transactions touching both objects should be deferred, with object 0 as the congested object.
696        for mutable_0 in [true, false].iter() {
697            for mutable_1 in [true, false].iter() {
698                let tx = build_transaction(
699                    &[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
700                    tx_gas_budget,
701                );
702                if let Some((_, congested_objects)) = shared_object_congestion_tracker
703                    .should_defer_due_to_object_congestion(
704                        &tx,
705                        &HashMap::new(),
706                        &ConsensusCommitInfo::new_for_congestion_test(
707                            0,
708                            0,
709                            Duration::from_micros(10_000_000),
710                        ),
711                    )
712                {
713                    assert_eq!(congested_objects.len(), 1);
714                    assert_eq!(congested_objects[0], shared_obj_0);
715                } else {
716                    panic!("should defer");
717                }
718            }
719        }
720    }
721
722    #[test]
723    fn test_should_defer_allow_overage_with_burst() {
724        telemetry_subscribers::init_for_testing();
725
726        let shared_obj_0 = ObjectID::random();
727        let shared_obj_1 = ObjectID::random();
728
729        let tx_gas_budget = 100;
730
731        // Construct object execution cost:
732        // object 0 has cost 4M (exceeds burst limit of 1.6M + 1.5M = 3.1M)
733        // object 1 has cost 2M (within burst limit)
734        // tx cost is ~1M (default estimate for unknown command)
735        let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
736            [(shared_obj_0, 4_000_000), (shared_obj_1, 2_000_000)],
737            ExecutionTimeEstimateParams {
738                target_utilization: 16,
739                allowed_txn_cost_overage_burst_limit_us: 1_500_000,
740                randomness_scalar: 0,
741                max_estimate_us: u64::MAX,
742                stored_observations_num_included_checkpoints: 10,
743                stored_observations_limit: u64::MAX,
744                stake_weighted_median_threshold: 0,
745                default_none_duration_for_new_keys: false,
746                observations_chunk_size: None,
747            },
748            false,
749        );
750
751        // Read/write to object 0 should be deferred.
752        for mutable in [true, false].iter() {
753            let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
754            if let Some((_, congested_objects)) = shared_object_congestion_tracker
755                .should_defer_due_to_object_congestion(
756                    &tx,
757                    &HashMap::new(),
758                    &ConsensusCommitInfo::new_for_congestion_test(
759                        0,
760                        0,
761                        Duration::from_micros(10_000_000),
762                    ),
763                )
764            {
765                assert_eq!(congested_objects.len(), 1);
766                assert_eq!(congested_objects[0], shared_obj_0);
767            } else {
768                panic!("should defer");
769            }
770        }
771
772        // Read/write to object 1 should go through even though the budget is exceeded
773        // even before the cost of this tx is considered.
774        for mutable in [true, false].iter() {
775            let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
776            assert!(
777                shared_object_congestion_tracker
778                    .should_defer_due_to_object_congestion(
779                        &tx,
780                        &HashMap::new(),
781                        &ConsensusCommitInfo::new_for_congestion_test(
782                            0,
783                            0,
784                            Duration::from_micros(10_000_000)
785                        ),
786                    )
787                    .is_none()
788            );
789        }
790
791        // Transactions touching both objects should be deferred, with object 0 as the congested object.
792        for mutable_0 in [true, false].iter() {
793            for mutable_1 in [true, false].iter() {
794                let tx = build_transaction(
795                    &[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
796                    tx_gas_budget,
797                );
798                if let Some((_, congested_objects)) = shared_object_congestion_tracker
799                    .should_defer_due_to_object_congestion(
800                        &tx,
801                        &HashMap::new(),
802                        &ConsensusCommitInfo::new_for_congestion_test(
803                            0,
804                            0,
805                            Duration::from_micros(10_000_000),
806                        ),
807                    )
808                {
809                    assert_eq!(congested_objects.len(), 1);
810                    assert_eq!(congested_objects[0], shared_obj_0);
811                } else {
812                    panic!("should defer");
813                }
814            }
815        }
816    }
817
818    #[test]
819    fn test_bump_object_execution_cost() {
820        telemetry_subscribers::init_for_testing();
821
822        let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
823
824        let object_id_0 = ObjectID::random();
825        let object_id_1 = ObjectID::random();
826        let object_id_2 = ObjectID::random();
827
828        let params = default_params();
829        let mut shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
830            [(object_id_0, 5), (object_id_1, 10)],
831            params,
832            false,
833        );
834        assert_eq!(shared_object_congestion_tracker.max_cost(), 10);
835
836        // Read two objects should not change the object execution cost.
837        let cert = build_transaction(&[(object_id_0, false), (object_id_1, false)], 10);
838        shared_object_congestion_tracker.bump_object_execution_cost(
839            shared_object_congestion_tracker.get_tx_cost(
840                &execution_time_estimator,
841                &cert,
842                &mut IndirectStateObserver::new(),
843            ),
844            &cert,
845        );
846        assert_eq!(
847            shared_object_congestion_tracker,
848            SharedObjectCongestionTracker::new(
849                [(object_id_0, 5), (object_id_1, 10)],
850                params,
851                false,
852            )
853        );
854        assert_eq!(shared_object_congestion_tracker.max_cost(), 10);
855
856        // Write to object 0 should only bump object 0's execution cost. The start cost should be object 1's cost.
857        let cert = build_transaction(&[(object_id_0, true), (object_id_1, false)], 10);
858        shared_object_congestion_tracker.bump_object_execution_cost(
859            shared_object_congestion_tracker.get_tx_cost(
860                &execution_time_estimator,
861                &cert,
862                &mut IndirectStateObserver::new(),
863            ),
864            &cert,
865        );
866        // start cost (10) + tx cost (~1000 for unknown command)
867        let expected_object_0_cost = 1_010;
868        assert_eq!(
869            shared_object_congestion_tracker,
870            SharedObjectCongestionTracker::new(
871                [(object_id_0, expected_object_0_cost), (object_id_1, 10)],
872                params,
873                false,
874            )
875        );
876        assert_eq!(
877            shared_object_congestion_tracker.max_cost(),
878            expected_object_0_cost
879        );
880
881        // Write to all objects should bump all objects' execution cost, including objects that are seen for the first time.
882        let cert = build_transaction(
883            &[
884                (object_id_0, true),
885                (object_id_1, true),
886                (object_id_2, true),
887            ],
888            10,
889        );
890        // previous cost (1_010) + tx cost (~1000 for unknown command)
891        let expected_object_cost = 2_010;
892        shared_object_congestion_tracker.bump_object_execution_cost(
893            shared_object_congestion_tracker.get_tx_cost(
894                &execution_time_estimator,
895                &cert,
896                &mut IndirectStateObserver::new(),
897            ),
898            &cert,
899        );
900        assert_eq!(
901            shared_object_congestion_tracker,
902            SharedObjectCongestionTracker::new(
903                [
904                    (object_id_0, expected_object_cost),
905                    (object_id_1, expected_object_cost),
906                    (object_id_2, expected_object_cost)
907                ],
908                params,
909                false,
910            )
911        );
912        assert_eq!(
913            shared_object_congestion_tracker.max_cost(),
914            expected_object_cost
915        );
916
917        // Write to all objects with PTBs containing 7 commands.
918        let cert = build_programmable_transaction(
919            &[
920                (object_id_0, true),
921                (object_id_1, true),
922                (object_id_2, true),
923            ],
924            7,
925            30,
926        );
927        // previous cost 2_010 + (unknown-command default of 1000 * 7 commands)
928        let expected_object_cost = 9_010;
929        shared_object_congestion_tracker.bump_object_execution_cost(
930            shared_object_congestion_tracker.get_tx_cost(
931                &execution_time_estimator,
932                &cert,
933                &mut IndirectStateObserver::new(),
934            ),
935            &cert,
936        );
937        assert_eq!(
938            shared_object_congestion_tracker,
939            SharedObjectCongestionTracker::new(
940                [
941                    (object_id_0, expected_object_cost),
942                    (object_id_1, expected_object_cost),
943                    (object_id_2, expected_object_cost)
944                ],
945                params,
946                false,
947            )
948        );
949        assert_eq!(
950            shared_object_congestion_tracker.max_cost(),
951            expected_object_cost
952        );
953    }
954
955    #[test]
956    fn test_accumulated_debts() {
957        telemetry_subscribers::init_for_testing();
958
959        let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
960
961        let shared_obj_0 = ObjectID::random();
962        let shared_obj_1 = ObjectID::random();
963
964        let tx_gas_budget = 100;
965
966        // Starting with two objects with accumulated cost 500.
967        let params = ExecutionTimeEstimateParams {
968            target_utilization: 100,
969            // set a burst limit to verify that it does not affect debt calculation.
970            allowed_txn_cost_overage_burst_limit_us: 1_600 * 5,
971            randomness_scalar: 0,
972            max_estimate_us: u64::MAX,
973            stored_observations_num_included_checkpoints: 10,
974            stored_observations_limit: u64::MAX,
975            stake_weighted_median_threshold: 0,
976            default_none_duration_for_new_keys: false,
977            observations_chunk_size: None,
978        };
979        let mut shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
980            [(shared_obj_0, 500), (shared_obj_1, 500)],
981            params,
982            false,
983        );
984
985        // Simulate a tx on object 0 that exceeds the budget.
986        // Only mutable transactions bump cost, so only iterate once with mutable=true
987        let tx = build_transaction(&[(shared_obj_0, true)], tx_gas_budget);
988        shared_object_congestion_tracker.bump_object_execution_cost(
989            shared_object_congestion_tracker.get_tx_cost(
990                &execution_time_estimator,
991                &tx,
992                &mut IndirectStateObserver::new(),
993            ),
994            &tx,
995        );
996
997        // Verify that accumulated_debts reports the debt for object 0.
998        // With 100% target_utilization and 800us commit period, budget is 800
999        // init 500 + 1000 tx cost - budget 800 = 700
1000        let accumulated_debts = shared_object_congestion_tracker.accumulated_debts(
1001            &ConsensusCommitInfo::new_for_congestion_test(0, 0, Duration::from_micros(800)),
1002        );
1003        assert_eq!(accumulated_debts.len(), 1);
1004        assert_eq!(accumulated_debts[0], (shared_obj_0, 700));
1005    }
1006
1007    #[test]
1008    fn test_accumulated_debts_empty() {
1009        let object_id_0 = ObjectID::random();
1010        let object_id_1 = ObjectID::random();
1011        let object_id_2 = ObjectID::random();
1012
1013        // Initialize with zero costs so there's no debt to accumulate
1014        let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
1015            [(object_id_0, 0), (object_id_1, 0), (object_id_2, 0)],
1016            default_params(),
1017            false,
1018        );
1019
1020        let accumulated_debts = shared_object_congestion_tracker.accumulated_debts(
1021            &ConsensusCommitInfo::new_for_congestion_test(0, 0, Duration::ZERO),
1022        );
1023        assert!(accumulated_debts.is_empty());
1024    }
1025}