sui_core/authority/
shared_object_congestion_tracker.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::execution_time_estimator::ExecutionTimeEstimator;
5use crate::authority::transaction_deferral::DeferralKey;
6use crate::consensus_handler::{ConsensusCommitInfo, IndirectStateObserver};
7use mysten_common::fatal;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use sui_protocol_config::{
11    ExecutionTimeEstimateParams, PerObjectCongestionControlMode, ProtocolConfig,
12};
13use sui_types::base_types::{ObjectID, TransactionDigest};
14use sui_types::executable_transaction::VerifiedExecutableTransaction;
15use sui_types::messages_consensus::Round;
16use sui_types::transaction::SharedInputObject;
17use tracing::{debug, trace};
18
19#[derive(PartialEq, Eq, Clone, Debug)]
20struct Params {
21    params: ExecutionTimeEstimateParams,
22    for_randomness: bool,
23}
24
25impl Params {
26    // Get the target budget per commit. Over the long term, the scheduler will try to
27    // schedule no more than this much work per object per commit on average.
28    pub fn commit_budget(&self, commit_info: &ConsensusCommitInfo) -> u64 {
29        let estimated_commit_period = commit_info.estimated_commit_period();
30        let commit_period_micros = estimated_commit_period.as_micros() as u64;
31        let mut budget = commit_period_micros.saturating_mul(self.params.target_utilization) / 100;
32        if self.for_randomness {
33            budget = budget.saturating_mul(self.params.randomness_scalar) / 100;
34        }
35        budget
36    }
37
38    // The amount scheduled in a commit can "burst" up to this much over the target budget.
39    // The per-object debt limit will enforce the average limit over time.
40    pub fn max_burst(&self) -> u64 {
41        let mut burst = self.params.allowed_txn_cost_overage_burst_limit_us;
42        if self.for_randomness {
43            burst = burst.saturating_mul(self.params.randomness_scalar) / 100;
44        }
45        burst
46    }
47}
48
49// SharedObjectCongestionTracker stores the accumulated cost of executing transactions on an object, for
50// all transactions in a consensus commit.
51//
52// Cost is an indication of transaction execution latency. When transactions are scheduled by
53// the consensus handler, each scheduled transaction adds cost (execution latency) to all the objects it
54// reads or writes.
55//
56// The goal of this data structure is to capture the critical path of transaction execution latency on each
57// objects.
58//
59// The mode field determines how the cost is calculated. The cost can be calculated based on the total gas
60// budget, or total number of transaction count.
61#[derive(PartialEq, Eq, Clone, Debug)]
62pub struct SharedObjectCongestionTracker {
63    object_execution_cost: HashMap<ObjectID, u64>,
64    params: Params,
65    log_entries: Option<Vec<TransactionCostLogEntry>>,
66}
67
68impl SharedObjectCongestionTracker {
69    pub fn new(
70        initial_object_debts: impl IntoIterator<Item = (ObjectID, u64)>,
71        params: ExecutionTimeEstimateParams,
72        for_randomness: bool,
73        enable_logging: bool,
74    ) -> Self {
75        let object_execution_cost: HashMap<ObjectID, u64> =
76            initial_object_debts.into_iter().collect();
77        trace!(
78            "created SharedObjectCongestionTracker with
79             {} initial object debts,
80             params: {params:?},
81             for_randomness: {for_randomness},",
82            object_execution_cost.len(),
83        );
84        Self {
85            object_execution_cost,
86            params: Params {
87                params,
88                for_randomness,
89            },
90            log_entries: enable_logging.then(Vec::new),
91        }
92    }
93
94    pub fn from_protocol_config(
95        initial_object_debts: impl IntoIterator<Item = (ObjectID, u64)>,
96        protocol_config: &ProtocolConfig,
97        for_randomness: bool,
98        enable_logging: bool,
99    ) -> Self {
100        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) =
101            protocol_config.per_object_congestion_control_mode()
102        else {
103            fatal!(
104                "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
105            );
106        };
107        Self::new(initial_object_debts, params, for_randomness, enable_logging)
108    }
109
110    // Given a list of shared input objects, returns the starting cost of a transaction that operates on
111    // these objects.
112    //
113    // Starting cost is a proxy for the starting time of the transaction. It is determined by all the input
114    // shared objects' last write.
115    pub fn compute_tx_start_at_cost(&self, shared_input_objects: &[SharedInputObject]) -> u64 {
116        shared_input_objects
117            .iter()
118            .map(|obj| *self.object_execution_cost.get(&obj.id).unwrap_or(&0))
119            .max()
120            .expect("There must be at least one object in shared_input_objects.")
121    }
122
123    pub fn get_tx_cost(
124        &self,
125        execution_time_estimator: &ExecutionTimeEstimator,
126        cert: &VerifiedExecutableTransaction,
127        indirect_state_observer: &mut IndirectStateObserver,
128    ) -> u64 {
129        let estimate_us = execution_time_estimator
130            .get_estimate(cert.transaction_data())
131            .as_micros()
132            .try_into()
133            .unwrap_or(u64::MAX);
134        if estimate_us >= 15_000 {
135            let digest = cert.digest();
136            debug!(
137                ?digest,
138                "expensive tx cost estimate detected: {estimate_us}us"
139            );
140        }
141
142        // Historically this was an Option<u64>, must keep it that way for consistency.
143        indirect_state_observer.observe_indirect_state(&Some(estimate_us));
144
145        estimate_us
146    }
147
148    // Given a transaction, returns the deferral key and the congested objects if the transaction should be deferred.
149    pub fn should_defer_due_to_object_congestion(
150        &self,
151        cert: &VerifiedExecutableTransaction,
152        previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
153        commit_info: &ConsensusCommitInfo,
154    ) -> Option<(DeferralKey, Vec<ObjectID>)> {
155        let commit_round = commit_info.round;
156
157        let shared_input_objects: Vec<_> = cert.shared_input_objects().collect();
158        if shared_input_objects.is_empty() {
159            // No shared object used by this transaction. No need to defer.
160            return None;
161        }
162
163        // Allow tx if it's within configured limits.
164        let start_cost = self.compute_tx_start_at_cost(&shared_input_objects);
165        let budget = self.params.commit_budget(commit_info);
166        let burst_limit = budget.saturating_add(self.params.max_burst());
167        if start_cost <= burst_limit {
168            return None;
169        }
170
171        // Finds out the congested objects.
172        //
173        // Note that the congested objects here may be caused by transaction dependency of other congested objects.
174        // Consider in a consensus commit, there are many transactions touching object A, and later in processing the
175        // consensus commit, there is a transaction touching both object A and B. Although there are fewer transactions
176        // touching object B, because it's starting execution is delayed due to dependency to other transactions on
177        // object A, it may be shown up as congested objects.
178        let mut congested_objects = vec![];
179        for obj in shared_input_objects {
180            // TODO: right now, we only return objects that are on the execution critical path in this consensus commit.
181            // However, for objects that are no on the critical path, they may potentially also be congested (e.g., an
182            // object has start cost == start_cost - 1, and adding the gas budget will exceed the limit). We don't
183            // return them for now because it's unclear how they can be used to return suggested gas price for the
184            // user. We need to revisit this later once we have a clear idea of how to determine the suggested gas price.
185            if &start_cost == self.object_execution_cost.get(&obj.id).unwrap_or(&0) {
186                congested_objects.push(obj.id);
187            }
188        }
189
190        assert!(!congested_objects.is_empty());
191
192        let deferral_key =
193            if let Some(previous_key) = previously_deferred_tx_digests.get(cert.digest()) {
194                // This transaction has been deferred in previous consensus commit. Use its previous deferred_from_round.
195                DeferralKey::new_for_consensus_round(
196                    commit_round + 1,
197                    previous_key.deferred_from_round(),
198                )
199            } else {
200                // This transaction has not been deferred before. Use the current commit round
201                // as the deferred_from_round.
202                DeferralKey::new_for_consensus_round(commit_round + 1, commit_round)
203            };
204        Some((deferral_key, congested_objects))
205    }
206
207    // Update shared objects' execution cost used in `cert` using `cert`'s execution cost.
208    // This is called when `cert` is scheduled for execution.
209    pub fn bump_object_execution_cost(
210        &mut self,
211        tx_cost: u64,
212        cert: &VerifiedExecutableTransaction,
213    ) {
214        let shared_input_objects: Vec<_> = cert.shared_input_objects().collect();
215        if shared_input_objects.is_empty() {
216            return;
217        }
218
219        let start_cost = self.compute_tx_start_at_cost(&shared_input_objects);
220        let end_cost = start_cost.saturating_add(tx_cost);
221
222        for obj in shared_input_objects {
223            if obj.is_accessed_exclusively() {
224                let old_end_cost = self.object_execution_cost.insert(obj.id, end_cost);
225                assert!(old_end_cost.is_none() || old_end_cost.unwrap() <= end_cost);
226            }
227        }
228
229        if let Some(entries) = &mut self.log_entries {
230            entries.push(TransactionCostLogEntry {
231                tx_digest: *cert.digest(),
232                start_cost,
233                end_cost,
234            });
235        }
236    }
237
238    // Returns end-of-commit data from congestion control:
239    // - Accumulated debts for objects whose budgets have been exceeded over the course of
240    //   the commit.
241    // - Detailed per-object congestion data for logging.
242    // Consumes the tracker object, since this should only be called once after
243    // all tx have been processed.
244    pub fn finish_commit(self, commit_info: &ConsensusCommitInfo) -> FinishedCommitData {
245        let commit_budget = self.params.commit_budget(commit_info);
246        let log_entries = self.log_entries.unwrap_or_default();
247        let final_object_execution_costs = self.object_execution_cost.clone();
248        let accumulated_debts = self
249            .object_execution_cost
250            .into_iter()
251            .filter_map(|(obj_id, cost)| {
252                let remaining_cost = cost.saturating_sub(commit_budget);
253                if remaining_cost > 0 {
254                    Some((obj_id, remaining_cost))
255                } else {
256                    None
257                }
258            })
259            .collect();
260        FinishedCommitData {
261            accumulated_debts,
262            log_entries,
263            final_object_execution_costs,
264            commit_budget,
265        }
266    }
267
268    // Returns the maximum cost of all objects.
269    pub fn max_cost(&self) -> u64 {
270        self.object_execution_cost
271            .values()
272            .max()
273            .copied()
274            .unwrap_or(0)
275    }
276}
277
278#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
279pub enum CongestionPerObjectDebt {
280    V1(Round, u64),
281}
282
283impl CongestionPerObjectDebt {
284    pub fn new(round: Round, debt: u64) -> Self {
285        Self::V1(round, debt)
286    }
287
288    pub fn into_v1(self) -> (Round, u64) {
289        match self {
290            Self::V1(round, debt) => (round, debt),
291        }
292    }
293}
294
295#[derive(PartialEq, Eq, Clone, Debug)]
296pub struct TransactionCostLogEntry {
297    pub tx_digest: TransactionDigest,
298    pub start_cost: u64,
299    pub end_cost: u64,
300}
301
302pub struct FinishedCommitData {
303    pub accumulated_debts: Vec<(ObjectID, u64)>,
304    pub log_entries: Vec<TransactionCostLogEntry>,
305    pub final_object_execution_costs: HashMap<ObjectID, u64>,
306    pub commit_budget: u64,
307}
308
309#[cfg(test)]
310mod object_cost_tests {
311    use super::*;
312
313    use std::time::Duration;
314    use sui_protocol_config::ExecutionTimeEstimateParams;
315    use sui_test_transaction_builder::TestTransactionBuilder;
316    use sui_types::Identifier;
317    use sui_types::base_types::{SequenceNumber, random_object_ref};
318    use sui_types::crypto::{AccountKeyPair, get_key_pair};
319    use sui_types::transaction::{CallArg, ObjectArg, SharedObjectMutability, VerifiedTransaction};
320
321    fn default_params() -> ExecutionTimeEstimateParams {
322        ExecutionTimeEstimateParams {
323            target_utilization: 0,
324            allowed_txn_cost_overage_burst_limit_us: 0,
325            max_estimate_us: u64::MAX,
326            randomness_scalar: 0,
327            stored_observations_num_included_checkpoints: 10,
328            stored_observations_limit: u64::MAX,
329            stake_weighted_median_threshold: 0,
330            default_none_duration_for_new_keys: false,
331            observations_chunk_size: None,
332        }
333    }
334
335    fn construct_shared_input_objects(objects: &[(ObjectID, bool)]) -> Vec<SharedInputObject> {
336        objects
337            .iter()
338            .map(|(id, mutable)| SharedInputObject {
339                id: *id,
340                initial_shared_version: SequenceNumber::new(),
341                mutability: if *mutable {
342                    SharedObjectMutability::Mutable
343                } else {
344                    SharedObjectMutability::Immutable
345                },
346            })
347            .collect()
348    }
349
350    #[test]
351    fn test_compute_tx_start_at_cost() {
352        let object_id_0 = ObjectID::random();
353        let object_id_1 = ObjectID::random();
354        let object_id_2 = ObjectID::random();
355
356        let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
357            [(object_id_0, 5), (object_id_1, 10)],
358            default_params(),
359            false,
360            false,
361        );
362
363        let shared_input_objects = construct_shared_input_objects(&[(object_id_0, false)]);
364        assert_eq!(
365            shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
366            5
367        );
368
369        let shared_input_objects = construct_shared_input_objects(&[(object_id_1, true)]);
370        assert_eq!(
371            shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
372            10
373        );
374
375        let shared_input_objects =
376            construct_shared_input_objects(&[(object_id_0, false), (object_id_1, false)]);
377        assert_eq!(
378            shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
379            10
380        );
381
382        let shared_input_objects =
383            construct_shared_input_objects(&[(object_id_0, true), (object_id_1, true)]);
384        assert_eq!(
385            shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
386            10
387        );
388
389        // Test tx that touch object for the first time, which should start from 0.
390        let shared_input_objects = construct_shared_input_objects(&[(object_id_2, true)]);
391        assert_eq!(
392            shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
393            0
394        );
395    }
396
397    // Builds a certificate with a list of shared objects and their mutability. The certificate is only used to
398    // test the SharedObjectCongestionTracker functions, therefore the content other than shared inputs and gas budget
399    // are not important.
400    fn build_transaction(
401        objects: &[(ObjectID, bool)],
402        gas_budget: u64,
403    ) -> VerifiedExecutableTransaction {
404        let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
405        let gas_object = random_object_ref();
406        VerifiedExecutableTransaction::new_system(
407            VerifiedTransaction::new_unchecked(
408                TestTransactionBuilder::new(sender, gas_object, 1000)
409                    .with_gas_budget(gas_budget)
410                    .move_call(
411                        ObjectID::random(),
412                        "unimportant_module",
413                        "unimportant_function",
414                        objects
415                            .iter()
416                            .map(|(id, mutable)| {
417                                CallArg::Object(ObjectArg::SharedObject {
418                                    id: *id,
419                                    initial_shared_version: SequenceNumber::new(),
420                                    mutability: if *mutable {
421                                        SharedObjectMutability::Mutable
422                                    } else {
423                                        SharedObjectMutability::Immutable
424                                    },
425                                })
426                            })
427                            .collect(),
428                    )
429                    .build_and_sign(&keypair),
430            ),
431            0,
432        )
433    }
434
435    fn build_programmable_transaction(
436        objects: &[(ObjectID, bool)],
437        number_of_commands: u64,
438        gas_budget: u64,
439    ) -> VerifiedExecutableTransaction {
440        let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
441        let gas_object = random_object_ref();
442
443        let package_id = ObjectID::random();
444        let mut tx_builder =
445            TestTransactionBuilder::new(sender, gas_object, 1000).with_gas_budget(gas_budget);
446        {
447            let pt_builder = tx_builder.ptb_builder_mut();
448            let mut arguments = Vec::new();
449            for object in objects {
450                arguments.push(
451                    pt_builder
452                        .obj(ObjectArg::SharedObject {
453                            id: object.0,
454                            initial_shared_version: SequenceNumber::new(),
455                            mutability: if object.1 {
456                                SharedObjectMutability::Mutable
457                            } else {
458                                SharedObjectMutability::Immutable
459                            },
460                        })
461                        .unwrap(),
462                );
463            }
464            for _ in 0..number_of_commands {
465                pt_builder.programmable_move_call(
466                    package_id,
467                    Identifier::new("unimportant_module").unwrap(),
468                    Identifier::new("unimportant_function").unwrap(),
469                    vec![],
470                    arguments.clone(),
471                );
472            }
473        }
474
475        VerifiedExecutableTransaction::new_system(
476            VerifiedTransaction::new_unchecked(tx_builder.build_and_sign(&keypair)),
477            0,
478        )
479    }
480
481    #[test]
482    fn test_should_defer_return_correct_congested_objects() {
483        // Creates two shared objects and three transactions that operate on these objects.
484        let shared_obj_0 = ObjectID::random();
485        let shared_obj_1 = ObjectID::random();
486
487        let tx_gas_budget = 100;
488
489        // Construct object execution cost:
490        // object 0 has cost 750 (which exceeds burst limit)
491        // object 1 has cost 0 (within burst limit)
492        let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
493            [(shared_obj_0, 750), (shared_obj_1, 0)],
494            default_params(),
495            false,
496            false,
497        );
498
499        // Read/write to object 0 should be deferred.
500        for mutable in [true, false].iter() {
501            let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
502            if let Some((_, congested_objects)) = shared_object_congestion_tracker
503                .should_defer_due_to_object_congestion(
504                    &tx,
505                    &HashMap::new(),
506                    &ConsensusCommitInfo::new_for_congestion_test(
507                        0,
508                        0,
509                        Duration::from_micros(1_500),
510                    ),
511                )
512            {
513                assert_eq!(congested_objects.len(), 1);
514                assert_eq!(congested_objects[0], shared_obj_0);
515            } else {
516                panic!("should defer");
517            }
518        }
519
520        // Read/write to object 1 should go through.
521        for mutable in [true, false].iter() {
522            let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
523            assert!(
524                shared_object_congestion_tracker
525                    .should_defer_due_to_object_congestion(
526                        &tx,
527                        &HashMap::new(),
528                        &ConsensusCommitInfo::new_for_congestion_test(
529                            0,
530                            0,
531                            Duration::from_micros(1_500),
532                        ),
533                    )
534                    .is_none()
535            );
536        }
537
538        // Transactions touching both objects should be deferred, with object 0 as the congested object.
539        for mutable_0 in [true, false].iter() {
540            for mutable_1 in [true, false].iter() {
541                let tx = build_transaction(
542                    &[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
543                    tx_gas_budget,
544                );
545                if let Some((_, congested_objects)) = shared_object_congestion_tracker
546                    .should_defer_due_to_object_congestion(
547                        &tx,
548                        &HashMap::new(),
549                        &ConsensusCommitInfo::new_for_congestion_test(
550                            0,
551                            0,
552                            Duration::from_micros(1_500),
553                        ),
554                    )
555                {
556                    assert_eq!(congested_objects.len(), 1);
557                    assert_eq!(congested_objects[0], shared_obj_0);
558                } else {
559                    panic!("should defer");
560                }
561            }
562        }
563    }
564
565    #[test]
566    fn test_should_defer_return_correct_deferral_key() {
567        let shared_obj_0 = ObjectID::random();
568        let tx = build_transaction(&[(shared_obj_0, true)], 100);
569
570        // Set initial cost that exceeds 0 burst limit
571        let shared_object_congestion_tracker =
572            SharedObjectCongestionTracker::new([(shared_obj_0, 1)], default_params(), false, false);
573
574        // Insert a random pre-existing transaction.
575        let mut previously_deferred_tx_digests = HashMap::new();
576        previously_deferred_tx_digests.insert(
577            TransactionDigest::random(),
578            DeferralKey::ConsensusRound {
579                future_round: 10,
580                deferred_from_round: 5,
581            },
582        );
583
584        // Test deferral key for a transaction that has not been deferred before.
585        if let Some((
586            DeferralKey::ConsensusRound {
587                future_round,
588                deferred_from_round,
589            },
590            _,
591        )) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
592            &tx,
593            &previously_deferred_tx_digests,
594            &ConsensusCommitInfo::new_for_congestion_test(
595                10,
596                10,
597                Duration::from_micros(10_000_000),
598            ),
599        ) {
600            assert_eq!(future_round, 11);
601            assert_eq!(deferred_from_round, 10);
602        } else {
603            panic!("should defer");
604        }
605
606        // Insert `tx` as previously deferred transaction due to randomness.
607        previously_deferred_tx_digests.insert(
608            *tx.digest(),
609            DeferralKey::Randomness {
610                deferred_from_round: 4,
611            },
612        );
613
614        // New deferral key should have deferred_from_round equal to the deferred randomness round.
615        if let Some((
616            DeferralKey::ConsensusRound {
617                future_round,
618                deferred_from_round,
619            },
620            _,
621        )) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
622            &tx,
623            &previously_deferred_tx_digests,
624            &ConsensusCommitInfo::new_for_congestion_test(
625                10,
626                10,
627                Duration::from_micros(10_000_000),
628            ),
629        ) {
630            assert_eq!(future_round, 11);
631            assert_eq!(deferred_from_round, 4);
632        } else {
633            panic!("should defer");
634        }
635
636        // Insert `tx` as previously deferred consensus transaction.
637        previously_deferred_tx_digests.insert(
638            *tx.digest(),
639            DeferralKey::ConsensusRound {
640                future_round: 10,
641                deferred_from_round: 5,
642            },
643        );
644
645        // New deferral key should have deferred_from_round equal to the one in the old deferral key.
646        if let Some((
647            DeferralKey::ConsensusRound {
648                future_round,
649                deferred_from_round,
650            },
651            _,
652        )) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
653            &tx,
654            &previously_deferred_tx_digests,
655            &ConsensusCommitInfo::new_for_congestion_test(
656                10,
657                10,
658                Duration::from_micros(10_000_000),
659            ),
660        ) {
661            assert_eq!(future_round, 11);
662            assert_eq!(deferred_from_round, 5);
663        } else {
664            panic!("should defer");
665        }
666    }
667
668    #[test]
669    fn test_should_defer_allow_overage() {
670        telemetry_subscribers::init_for_testing();
671
672        // Creates two shared objects.
673        let shared_obj_0 = ObjectID::random();
674        let shared_obj_1 = ObjectID::random();
675
676        let tx_gas_budget = 100;
677
678        // Construct object execution cost:
679        // object 0 has cost 1.7M (exceeds burst limit of ~1.6M with 16% utilization on 10s commit)
680        // object 1 has cost 300K (within burst limit)
681        let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
682            [(shared_obj_0, 1_700_000), (shared_obj_1, 300_000)],
683            ExecutionTimeEstimateParams {
684                target_utilization: 16,
685                allowed_txn_cost_overage_burst_limit_us: 0,
686                randomness_scalar: 0,
687                max_estimate_us: u64::MAX,
688                stored_observations_num_included_checkpoints: 10,
689                stored_observations_limit: u64::MAX,
690                stake_weighted_median_threshold: 0,
691                default_none_duration_for_new_keys: false,
692                observations_chunk_size: None,
693            },
694            false,
695            false,
696        );
697
698        // Read/write to object 0 should be deferred.
699        for mutable in [true, false].iter() {
700            let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
701            if let Some((_, congested_objects)) = shared_object_congestion_tracker
702                .should_defer_due_to_object_congestion(
703                    &tx,
704                    &HashMap::new(),
705                    &ConsensusCommitInfo::new_for_congestion_test(
706                        0,
707                        0,
708                        Duration::from_micros(10_000_000),
709                    ),
710                )
711            {
712                assert_eq!(congested_objects.len(), 1);
713                assert_eq!(congested_objects[0], shared_obj_0);
714            } else {
715                panic!("should defer");
716            }
717        }
718
719        // Read/write to object 1 should go through even though the budget is exceeded.
720        for mutable in [true, false].iter() {
721            let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
722            assert!(
723                shared_object_congestion_tracker
724                    .should_defer_due_to_object_congestion(
725                        &tx,
726                        &HashMap::new(),
727                        &ConsensusCommitInfo::new_for_congestion_test(
728                            0,
729                            0,
730                            Duration::from_micros(10_000_000)
731                        ),
732                    )
733                    .is_none()
734            );
735        }
736
737        // Transactions touching both objects should be deferred, with object 0 as the congested object.
738        for mutable_0 in [true, false].iter() {
739            for mutable_1 in [true, false].iter() {
740                let tx = build_transaction(
741                    &[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
742                    tx_gas_budget,
743                );
744                if let Some((_, congested_objects)) = shared_object_congestion_tracker
745                    .should_defer_due_to_object_congestion(
746                        &tx,
747                        &HashMap::new(),
748                        &ConsensusCommitInfo::new_for_congestion_test(
749                            0,
750                            0,
751                            Duration::from_micros(10_000_000),
752                        ),
753                    )
754                {
755                    assert_eq!(congested_objects.len(), 1);
756                    assert_eq!(congested_objects[0], shared_obj_0);
757                } else {
758                    panic!("should defer");
759                }
760            }
761        }
762    }
763
764    #[test]
765    fn test_should_defer_allow_overage_with_burst() {
766        telemetry_subscribers::init_for_testing();
767
768        let shared_obj_0 = ObjectID::random();
769        let shared_obj_1 = ObjectID::random();
770
771        let tx_gas_budget = 100;
772
773        // Construct object execution cost:
774        // object 0 has cost 4M (exceeds burst limit of 1.6M + 1.5M = 3.1M)
775        // object 1 has cost 2M (within burst limit)
776        // tx cost is ~1M (default estimate for unknown command)
777        let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
778            [(shared_obj_0, 4_000_000), (shared_obj_1, 2_000_000)],
779            ExecutionTimeEstimateParams {
780                target_utilization: 16,
781                allowed_txn_cost_overage_burst_limit_us: 1_500_000,
782                randomness_scalar: 0,
783                max_estimate_us: u64::MAX,
784                stored_observations_num_included_checkpoints: 10,
785                stored_observations_limit: u64::MAX,
786                stake_weighted_median_threshold: 0,
787                default_none_duration_for_new_keys: false,
788                observations_chunk_size: None,
789            },
790            false,
791            false,
792        );
793
794        // Read/write to object 0 should be deferred.
795        for mutable in [true, false].iter() {
796            let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
797            if let Some((_, congested_objects)) = shared_object_congestion_tracker
798                .should_defer_due_to_object_congestion(
799                    &tx,
800                    &HashMap::new(),
801                    &ConsensusCommitInfo::new_for_congestion_test(
802                        0,
803                        0,
804                        Duration::from_micros(10_000_000),
805                    ),
806                )
807            {
808                assert_eq!(congested_objects.len(), 1);
809                assert_eq!(congested_objects[0], shared_obj_0);
810            } else {
811                panic!("should defer");
812            }
813        }
814
815        // Read/write to object 1 should go through even though the budget is exceeded
816        // even before the cost of this tx is considered.
817        for mutable in [true, false].iter() {
818            let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
819            assert!(
820                shared_object_congestion_tracker
821                    .should_defer_due_to_object_congestion(
822                        &tx,
823                        &HashMap::new(),
824                        &ConsensusCommitInfo::new_for_congestion_test(
825                            0,
826                            0,
827                            Duration::from_micros(10_000_000)
828                        ),
829                    )
830                    .is_none()
831            );
832        }
833
834        // Transactions touching both objects should be deferred, with object 0 as the congested object.
835        for mutable_0 in [true, false].iter() {
836            for mutable_1 in [true, false].iter() {
837                let tx = build_transaction(
838                    &[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
839                    tx_gas_budget,
840                );
841                if let Some((_, congested_objects)) = shared_object_congestion_tracker
842                    .should_defer_due_to_object_congestion(
843                        &tx,
844                        &HashMap::new(),
845                        &ConsensusCommitInfo::new_for_congestion_test(
846                            0,
847                            0,
848                            Duration::from_micros(10_000_000),
849                        ),
850                    )
851                {
852                    assert_eq!(congested_objects.len(), 1);
853                    assert_eq!(congested_objects[0], shared_obj_0);
854                } else {
855                    panic!("should defer");
856                }
857            }
858        }
859    }
860
861    #[test]
862    fn test_bump_object_execution_cost() {
863        telemetry_subscribers::init_for_testing();
864
865        let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
866
867        let object_id_0 = ObjectID::random();
868        let object_id_1 = ObjectID::random();
869        let object_id_2 = ObjectID::random();
870
871        let params = default_params();
872        let mut shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
873            [(object_id_0, 5), (object_id_1, 10)],
874            params,
875            false,
876            false,
877        );
878        assert_eq!(shared_object_congestion_tracker.max_cost(), 10);
879
880        // Read two objects should not change the object execution cost.
881        let cert = build_transaction(&[(object_id_0, false), (object_id_1, false)], 10);
882        shared_object_congestion_tracker.bump_object_execution_cost(
883            shared_object_congestion_tracker.get_tx_cost(
884                &execution_time_estimator,
885                &cert,
886                &mut IndirectStateObserver::new(),
887            ),
888            &cert,
889        );
890        assert_eq!(
891            shared_object_congestion_tracker,
892            SharedObjectCongestionTracker::new(
893                [(object_id_0, 5), (object_id_1, 10)],
894                params,
895                false,
896                false,
897            )
898        );
899        assert_eq!(shared_object_congestion_tracker.max_cost(), 10);
900
901        // Write to object 0 should only bump object 0's execution cost. The start cost should be object 1's cost.
902        let cert = build_transaction(&[(object_id_0, true), (object_id_1, false)], 10);
903        shared_object_congestion_tracker.bump_object_execution_cost(
904            shared_object_congestion_tracker.get_tx_cost(
905                &execution_time_estimator,
906                &cert,
907                &mut IndirectStateObserver::new(),
908            ),
909            &cert,
910        );
911        // start cost (10) + tx cost (~1000 for unknown command)
912        let expected_object_0_cost = 1_010;
913        assert_eq!(
914            shared_object_congestion_tracker,
915            SharedObjectCongestionTracker::new(
916                [(object_id_0, expected_object_0_cost), (object_id_1, 10)],
917                params,
918                false,
919                false,
920            )
921        );
922        assert_eq!(
923            shared_object_congestion_tracker.max_cost(),
924            expected_object_0_cost
925        );
926
927        // Write to all objects should bump all objects' execution cost, including objects that are seen for the first time.
928        let cert = build_transaction(
929            &[
930                (object_id_0, true),
931                (object_id_1, true),
932                (object_id_2, true),
933            ],
934            10,
935        );
936        // previous cost (1_010) + tx cost (~1000 for unknown command)
937        let expected_object_cost = 2_010;
938        shared_object_congestion_tracker.bump_object_execution_cost(
939            shared_object_congestion_tracker.get_tx_cost(
940                &execution_time_estimator,
941                &cert,
942                &mut IndirectStateObserver::new(),
943            ),
944            &cert,
945        );
946        assert_eq!(
947            shared_object_congestion_tracker,
948            SharedObjectCongestionTracker::new(
949                [
950                    (object_id_0, expected_object_cost),
951                    (object_id_1, expected_object_cost),
952                    (object_id_2, expected_object_cost)
953                ],
954                params,
955                false,
956                false,
957            )
958        );
959        assert_eq!(
960            shared_object_congestion_tracker.max_cost(),
961            expected_object_cost
962        );
963
964        // Write to all objects with PTBs containing 7 commands.
965        let cert = build_programmable_transaction(
966            &[
967                (object_id_0, true),
968                (object_id_1, true),
969                (object_id_2, true),
970            ],
971            7,
972            30,
973        );
974        // previous cost 2_010 + (unknown-command default of 1000 * 7 commands)
975        let expected_object_cost = 9_010;
976        shared_object_congestion_tracker.bump_object_execution_cost(
977            shared_object_congestion_tracker.get_tx_cost(
978                &execution_time_estimator,
979                &cert,
980                &mut IndirectStateObserver::new(),
981            ),
982            &cert,
983        );
984        assert_eq!(
985            shared_object_congestion_tracker,
986            SharedObjectCongestionTracker::new(
987                [
988                    (object_id_0, expected_object_cost),
989                    (object_id_1, expected_object_cost),
990                    (object_id_2, expected_object_cost)
991                ],
992                params,
993                false,
994                false,
995            )
996        );
997        assert_eq!(
998            shared_object_congestion_tracker.max_cost(),
999            expected_object_cost
1000        );
1001    }
1002
1003    #[test]
1004    fn test_finish_commit_empty_debts() {
1005        let object_id_0 = ObjectID::random();
1006        let object_id_1 = ObjectID::random();
1007        let object_id_2 = ObjectID::random();
1008
1009        let tracker = SharedObjectCongestionTracker::new(
1010            [(object_id_0, 0), (object_id_1, 0), (object_id_2, 0)],
1011            default_params(),
1012            false,
1013            false,
1014        );
1015
1016        let data = tracker.finish_commit(&ConsensusCommitInfo::new_for_congestion_test(
1017            0,
1018            0,
1019            Duration::ZERO,
1020        ));
1021        assert!(data.accumulated_debts.is_empty());
1022    }
1023
1024    #[test]
1025    fn test_finish_commit_returns_correct_debts() {
1026        telemetry_subscribers::init_for_testing();
1027
1028        let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
1029
1030        let shared_obj_0 = ObjectID::random();
1031        let shared_obj_1 = ObjectID::random();
1032
1033        let params = ExecutionTimeEstimateParams {
1034            target_utilization: 100,
1035            allowed_txn_cost_overage_burst_limit_us: 1_600 * 5,
1036            randomness_scalar: 0,
1037            max_estimate_us: u64::MAX,
1038            stored_observations_num_included_checkpoints: 10,
1039            stored_observations_limit: u64::MAX,
1040            stake_weighted_median_threshold: 0,
1041            default_none_duration_for_new_keys: false,
1042            observations_chunk_size: None,
1043        };
1044        let mut tracker = SharedObjectCongestionTracker::new(
1045            [(shared_obj_0, 500), (shared_obj_1, 500)],
1046            params,
1047            false,
1048            false,
1049        );
1050
1051        let tx = build_transaction(&[(shared_obj_0, true)], 100);
1052        tracker.bump_object_execution_cost(
1053            tracker.get_tx_cost(
1054                &execution_time_estimator,
1055                &tx,
1056                &mut IndirectStateObserver::new(),
1057            ),
1058            &tx,
1059        );
1060
1061        let commit_info =
1062            ConsensusCommitInfo::new_for_congestion_test(0, 0, Duration::from_micros(800));
1063        let data = tracker.finish_commit(&commit_info);
1064
1065        assert_eq!(data.accumulated_debts.len(), 1);
1066        assert_eq!(data.accumulated_debts[0], (shared_obj_0, 700));
1067        assert_eq!(data.commit_budget, 800);
1068    }
1069
1070    #[test]
1071    fn test_finish_commit_with_logging() {
1072        telemetry_subscribers::init_for_testing();
1073
1074        let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
1075
1076        let shared_obj_0 = ObjectID::random();
1077        let shared_obj_1 = ObjectID::random();
1078
1079        let params = ExecutionTimeEstimateParams {
1080            target_utilization: 100,
1081            allowed_txn_cost_overage_burst_limit_us: 1_600 * 5,
1082            randomness_scalar: 0,
1083            max_estimate_us: u64::MAX,
1084            stored_observations_num_included_checkpoints: 10,
1085            stored_observations_limit: u64::MAX,
1086            stake_weighted_median_threshold: 0,
1087            default_none_duration_for_new_keys: false,
1088            observations_chunk_size: None,
1089        };
1090        let mut tracker = SharedObjectCongestionTracker::new(
1091            [(shared_obj_0, 0), (shared_obj_1, 0)],
1092            params,
1093            false,
1094            true,
1095        );
1096
1097        let tx = build_transaction(&[(shared_obj_0, true), (shared_obj_1, true)], 100);
1098        let tx_digest = *tx.digest();
1099        tracker.bump_object_execution_cost(
1100            tracker.get_tx_cost(
1101                &execution_time_estimator,
1102                &tx,
1103                &mut IndirectStateObserver::new(),
1104            ),
1105            &tx,
1106        );
1107
1108        let commit_info =
1109            ConsensusCommitInfo::new_for_congestion_test(0, 0, Duration::from_micros(800));
1110        let data = tracker.finish_commit(&commit_info);
1111
1112        assert_eq!(data.log_entries.len(), 1);
1113        assert_eq!(data.log_entries[0].tx_digest, tx_digest);
1114        assert_eq!(data.log_entries[0].start_cost, 0);
1115        assert!(
1116            data.final_object_execution_costs
1117                .contains_key(&shared_obj_0)
1118        );
1119        assert!(
1120            data.final_object_execution_costs
1121                .contains_key(&shared_obj_1)
1122        );
1123    }
1124}