sui_core/
congestion_tracker.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use moka::ops::compute::Op;
5use moka::sync::Cache;
6use std::collections::HashMap;
7use std::collections::hash_map::Entry;
8use sui_types::base_types::ObjectID;
9use sui_types::effects::{InputConsensusObject, TransactionEffects, TransactionEffectsAPI};
10use sui_types::execution_status::CongestedObjects;
11use sui_types::messages_checkpoint::{CheckpointTimestamp, VerifiedCheckpoint};
12use sui_types::transaction::{TransactionData, TransactionDataAPI};
13
14use crate::execution_cache::TransactionCacheRead;
15
16#[derive(Clone, Copy, Debug)]
17pub struct CongestionInfo {
18    pub last_cancellation_time: CheckpointTimestamp,
19
20    pub highest_cancelled_gas_price: u64,
21
22    pub last_success_time: Option<CheckpointTimestamp>,
23    pub lowest_executed_gas_price: Option<u64>,
24}
25
26impl CongestionInfo {
27    /// Update the congestion info with the latest congestion info from a new checkpoint
28    fn update_for_new_checkpoint(&mut self, new: &CongestionInfo) {
29        // If there are more recent cancellations, we need to know the latest highest
30        // cancelled price
31        if new.last_cancellation_time > self.last_cancellation_time {
32            self.last_cancellation_time = new.last_cancellation_time;
33            self.highest_cancelled_gas_price = new.highest_cancelled_gas_price;
34        }
35        // If there are more recent successful transactions, we need to know the latest lowest
36        // executed price
37        if new.last_success_time > self.last_success_time {
38            self.last_success_time = new.last_success_time;
39            self.lowest_executed_gas_price = new.lowest_executed_gas_price;
40        }
41    }
42
43    fn update_cancellation_gas_price(&mut self, gas_price: u64) {
44        self.highest_cancelled_gas_price =
45            std::cmp::max(self.highest_cancelled_gas_price, gas_price);
46    }
47
48    fn update_for_success(&mut self, now: CheckpointTimestamp, gas_price: u64) {
49        self.last_success_time = Some(now);
50        self.lowest_executed_gas_price = Some(match self.lowest_executed_gas_price {
51            Some(current_min) => std::cmp::min(current_min, gas_price),
52            None => gas_price,
53        });
54    }
55}
56
57pub struct CongestionTracker {
58    pub congestion_clearing_prices: Cache<ObjectID, CongestionInfo>,
59}
60
61impl Default for CongestionTracker {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67impl CongestionTracker {
68    pub fn new() -> Self {
69        Self {
70            congestion_clearing_prices: Cache::new(10_000),
71        }
72    }
73
74    pub fn process_checkpoint_effects(
75        &self,
76        transaction_cache_reader: &dyn TransactionCacheRead,
77        checkpoint: &VerifiedCheckpoint,
78        effects: &[TransactionEffects],
79    ) {
80        let mut congestion_events = Vec::with_capacity(effects.len());
81        let mut cleared_events = Vec::with_capacity(effects.len());
82
83        for effect in effects {
84            let gas_price = transaction_cache_reader
85                .get_transaction_block(effect.transaction_digest())
86                .unwrap()
87                .transaction_data()
88                .gas_price();
89            if let Some(CongestedObjects(congested_objects)) =
90                effect.status().get_congested_objects()
91            {
92                congestion_events.push((gas_price, congested_objects.clone()));
93            } else {
94                cleared_events.push((
95                    gas_price,
96                    effect
97                        .input_consensus_objects()
98                        .into_iter()
99                        .filter_map(|object| match object {
100                            InputConsensusObject::Mutate((id, _, _)) => Some(id),
101                            InputConsensusObject::Cancelled(_, _)
102                            | InputConsensusObject::ReadOnly(_)
103                            | InputConsensusObject::ReadConsensusStreamEnded(_, _)
104                            | InputConsensusObject::MutateConsensusStreamEnded(_, _) => None,
105                        })
106                        .collect::<Vec<_>>(),
107                ));
108            }
109        }
110
111        self.process_per_checkpoint_events(
112            checkpoint.timestamp_ms,
113            &congestion_events,
114            &cleared_events,
115        );
116    }
117
118    /// For all the mutable shared inputs, get the highest minimum clearing price (if any exists)
119    /// and the lowest maximum cancelled price.
120    pub fn get_suggested_gas_prices(&self, transaction: &TransactionData) -> Option<u64> {
121        self.get_suggested_gas_price_for_objects(
122            transaction
123                .shared_input_objects()
124                .into_iter()
125                .filter(|id| id.is_accessed_exclusively())
126                .map(|id| id.id),
127        )
128    }
129}
130
131impl CongestionTracker {
132    fn process_per_checkpoint_events(
133        &self,
134        now: CheckpointTimestamp,
135        congestion_events: &[(u64, Vec<ObjectID>)],
136        cleared_events: &[(u64, Vec<ObjectID>)],
137    ) {
138        let congestion_info_map =
139            self.compute_per_checkpoint_congestion_info(now, congestion_events, cleared_events);
140        self.process_checkpoint_congestion(congestion_info_map);
141    }
142
143    fn get_suggested_gas_price_for_objects(
144        &self,
145        objects: impl Iterator<Item = ObjectID>,
146    ) -> Option<u64> {
147        let mut clearing_price = None;
148        for object_id in objects {
149            if let Some(info) = self.get_congestion_info(object_id) {
150                let clearing_price_for_object = match info
151                    .last_success_time
152                    .cmp(&Some(info.last_cancellation_time))
153                {
154                    std::cmp::Ordering::Greater => {
155                        // there were no cancellations in the most recent checkpoint,
156                        // so the object is probably not congested any more
157                        None
158                    }
159                    std::cmp::Ordering::Less => {
160                        // there were no successes in the most recent checkpoint. This should be a rare case,
161                        // but we know we will have to bid at least as much as the highest cancelled price.
162                        Some(info.highest_cancelled_gas_price)
163                    }
164                    std::cmp::Ordering::Equal => {
165                        // there were both successes and cancellations.
166                        info.lowest_executed_gas_price
167                    }
168                };
169                clearing_price = std::cmp::max(clearing_price, clearing_price_for_object);
170            }
171        }
172        clearing_price
173    }
174
175    fn compute_per_checkpoint_congestion_info(
176        &self,
177        now: CheckpointTimestamp,
178        congestion_events: &[(u64, Vec<ObjectID>)],
179        cleared_events: &[(u64, Vec<ObjectID>)],
180    ) -> HashMap<ObjectID, CongestionInfo> {
181        let mut congestion_info_map: HashMap<ObjectID, CongestionInfo> = HashMap::new();
182
183        for (gas_price, objects) in congestion_events {
184            for object in objects {
185                match congestion_info_map.entry(*object) {
186                    Entry::Occupied(entry) => {
187                        entry.into_mut().update_cancellation_gas_price(*gas_price);
188                    }
189                    Entry::Vacant(entry) => {
190                        let info = CongestionInfo {
191                            last_cancellation_time: now,
192                            highest_cancelled_gas_price: *gas_price,
193                            last_success_time: None,
194                            lowest_executed_gas_price: None,
195                        };
196
197                        entry.insert(info);
198                    }
199                }
200            }
201        }
202
203        for (gas_price, objects) in cleared_events {
204            for object in objects {
205                // We only record clearing prices if the object has observed cancellations recently
206                match congestion_info_map.entry(*object) {
207                    Entry::Occupied(entry) => {
208                        entry.into_mut().update_for_success(now, *gas_price);
209                    }
210                    Entry::Vacant(entry) => {
211                        if let Some(prev) = self.get_congestion_info(*object) {
212                            let info = CongestionInfo {
213                                last_cancellation_time: prev.last_cancellation_time,
214                                highest_cancelled_gas_price: prev.highest_cancelled_gas_price,
215                                last_success_time: Some(now),
216                                lowest_executed_gas_price: Some(*gas_price),
217                            };
218                            entry.insert(info);
219                        }
220                    }
221                }
222            }
223        }
224
225        congestion_info_map
226    }
227
228    fn process_checkpoint_congestion(
229        &self,
230        congestion_info_map: HashMap<ObjectID, CongestionInfo>,
231    ) {
232        for (object_id, info) in congestion_info_map {
233            self.congestion_clearing_prices
234                .entry(object_id)
235                .and_compute_with(|maybe_entry| {
236                    if let Some(e) = maybe_entry {
237                        let mut e = e.into_value();
238                        e.update_for_new_checkpoint(&info);
239                        Op::Put(e)
240                    } else {
241                        Op::Put(info)
242                    }
243                });
244        }
245    }
246
247    fn get_congestion_info(&self, object_id: ObjectID) -> Option<CongestionInfo> {
248        self.congestion_clearing_prices.get(&object_id)
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn test_process_events_new_congestion() {
258        let tracker = CongestionTracker::new();
259        let obj1 = ObjectID::random();
260        let obj2 = ObjectID::random();
261        let now = 1000;
262
263        tracker.process_per_checkpoint_events(now, &[(100, vec![obj1]), (200, vec![obj2])], &[]);
264
265        assert_eq!(
266            tracker.get_suggested_gas_price_for_objects(vec![obj1].into_iter()),
267            Some(100)
268        );
269        assert_eq!(
270            tracker.get_suggested_gas_price_for_objects(vec![obj2].into_iter()),
271            Some(200)
272        );
273    }
274
275    #[test]
276    fn test_process_events_congestion_then_success() {
277        let tracker = CongestionTracker::new();
278        let obj = ObjectID::random();
279
280        // Cancellations only, no successes. Highest cancelled price is used.
281        tracker.process_per_checkpoint_events(1000, &[(100, vec![obj]), (75, vec![obj])], &[]);
282        assert_eq!(
283            tracker.get_suggested_gas_price_for_objects(vec![obj].into_iter()),
284            Some(100)
285        );
286
287        // No cancellations in last checkpoint, so no congestion
288        tracker.process_per_checkpoint_events(2000, &[], &[(150, vec![obj])]);
289        assert_eq!(
290            tracker.get_suggested_gas_price_for_objects(vec![obj].into_iter()),
291            None,
292        );
293
294        // next checkpoint has cancellations and successes, so the lowest success price is used.
295        tracker.process_per_checkpoint_events(
296            3000,
297            &[(100, vec![obj])],
298            &[(175, vec![obj]), (125, vec![obj])],
299        );
300        assert_eq!(
301            tracker.get_suggested_gas_price_for_objects(vec![obj].into_iter()),
302            Some(125)
303        );
304    }
305
306    #[test]
307    fn test_get_suggested_gas_price_multiple_objects() {
308        let tracker = CongestionTracker::new();
309        let obj1 = ObjectID::random();
310        let obj2 = ObjectID::random();
311
312        // Process different congestion events
313        tracker.process_per_checkpoint_events(1000, &[(100, vec![obj1]), (200, vec![obj2])], &[]);
314
315        // Should suggest highest congestion price
316        assert_eq!(
317            tracker.get_suggested_gas_price_for_objects(vec![obj1, obj2].into_iter()),
318            Some(200)
319        );
320
321        // Process different congestion events
322        tracker.process_per_checkpoint_events(
323            2000,
324            &[(100, vec![obj1]), (200, vec![obj2])],
325            &[(100, vec![obj1]), (150, vec![obj2])],
326        );
327        // Should suggest the highest lowest success price
328        assert_eq!(
329            tracker.get_suggested_gas_price_for_objects(vec![obj1, obj2].into_iter()),
330            Some(150)
331        );
332    }
333}