1use moka::ops::compute::Op;
5use moka::sync::Cache;
6use std::collections::HashMap;
7use std::collections::hash_map::Entry;
8use sui_types::base_types::ObjectID;
9use sui_types::effects::{InputConsensusObject, TransactionEffects, TransactionEffectsAPI};
10use sui_types::execution_status::CongestedObjects;
11use sui_types::messages_checkpoint::{CheckpointTimestamp, VerifiedCheckpoint};
12use sui_types::transaction::{TransactionData, TransactionDataAPI};
13
14use crate::execution_cache::TransactionCacheRead;
15
16#[derive(Clone, Copy, Debug)]
17pub struct CongestionInfo {
18 pub last_cancellation_time: CheckpointTimestamp,
19
20 pub highest_cancelled_gas_price: u64,
21
22 pub last_success_time: Option<CheckpointTimestamp>,
23 pub lowest_executed_gas_price: Option<u64>,
24}
25
26impl CongestionInfo {
27 fn update_for_new_checkpoint(&mut self, new: &CongestionInfo) {
29 if new.last_cancellation_time > self.last_cancellation_time {
32 self.last_cancellation_time = new.last_cancellation_time;
33 self.highest_cancelled_gas_price = new.highest_cancelled_gas_price;
34 }
35 if new.last_success_time > self.last_success_time {
38 self.last_success_time = new.last_success_time;
39 self.lowest_executed_gas_price = new.lowest_executed_gas_price;
40 }
41 }
42
43 fn update_cancellation_gas_price(&mut self, gas_price: u64) {
44 self.highest_cancelled_gas_price =
45 std::cmp::max(self.highest_cancelled_gas_price, gas_price);
46 }
47
48 fn update_for_success(&mut self, now: CheckpointTimestamp, gas_price: u64) {
49 self.last_success_time = Some(now);
50 self.lowest_executed_gas_price = Some(match self.lowest_executed_gas_price {
51 Some(current_min) => std::cmp::min(current_min, gas_price),
52 None => gas_price,
53 });
54 }
55}
56
57pub struct CongestionTracker {
58 pub congestion_clearing_prices: Cache<ObjectID, CongestionInfo>,
59}
60
61impl Default for CongestionTracker {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67impl CongestionTracker {
68 pub fn new() -> Self {
69 Self {
70 congestion_clearing_prices: Cache::new(10_000),
71 }
72 }
73
74 pub fn process_checkpoint_effects(
75 &self,
76 transaction_cache_reader: &dyn TransactionCacheRead,
77 checkpoint: &VerifiedCheckpoint,
78 effects: &[TransactionEffects],
79 ) {
80 let mut congestion_events = Vec::with_capacity(effects.len());
81 let mut cleared_events = Vec::with_capacity(effects.len());
82
83 for effect in effects {
84 let gas_price = transaction_cache_reader
85 .get_transaction_block(effect.transaction_digest())
86 .unwrap()
87 .transaction_data()
88 .gas_price();
89 if let Some(CongestedObjects(congested_objects)) =
90 effect.status().get_congested_objects()
91 {
92 congestion_events.push((gas_price, congested_objects.clone()));
93 } else {
94 cleared_events.push((
95 gas_price,
96 effect
97 .input_consensus_objects()
98 .into_iter()
99 .filter_map(|object| match object {
100 InputConsensusObject::Mutate((id, _, _)) => Some(id),
101 InputConsensusObject::Cancelled(_, _)
102 | InputConsensusObject::ReadOnly(_)
103 | InputConsensusObject::ReadConsensusStreamEnded(_, _)
104 | InputConsensusObject::MutateConsensusStreamEnded(_, _) => None,
105 })
106 .collect::<Vec<_>>(),
107 ));
108 }
109 }
110
111 self.process_per_checkpoint_events(
112 checkpoint.timestamp_ms,
113 &congestion_events,
114 &cleared_events,
115 );
116 }
117
118 pub fn get_suggested_gas_prices(&self, transaction: &TransactionData) -> Option<u64> {
121 self.get_suggested_gas_price_for_objects(
122 transaction
123 .shared_input_objects()
124 .into_iter()
125 .filter(|id| id.is_accessed_exclusively())
126 .map(|id| id.id),
127 )
128 }
129}
130
131impl CongestionTracker {
132 fn process_per_checkpoint_events(
133 &self,
134 now: CheckpointTimestamp,
135 congestion_events: &[(u64, Vec<ObjectID>)],
136 cleared_events: &[(u64, Vec<ObjectID>)],
137 ) {
138 let congestion_info_map =
139 self.compute_per_checkpoint_congestion_info(now, congestion_events, cleared_events);
140 self.process_checkpoint_congestion(congestion_info_map);
141 }
142
143 fn get_suggested_gas_price_for_objects(
144 &self,
145 objects: impl Iterator<Item = ObjectID>,
146 ) -> Option<u64> {
147 let mut clearing_price = None;
148 for object_id in objects {
149 if let Some(info) = self.get_congestion_info(object_id) {
150 let clearing_price_for_object = match info
151 .last_success_time
152 .cmp(&Some(info.last_cancellation_time))
153 {
154 std::cmp::Ordering::Greater => {
155 None
158 }
159 std::cmp::Ordering::Less => {
160 Some(info.highest_cancelled_gas_price)
163 }
164 std::cmp::Ordering::Equal => {
165 info.lowest_executed_gas_price
167 }
168 };
169 clearing_price = std::cmp::max(clearing_price, clearing_price_for_object);
170 }
171 }
172 clearing_price
173 }
174
175 fn compute_per_checkpoint_congestion_info(
176 &self,
177 now: CheckpointTimestamp,
178 congestion_events: &[(u64, Vec<ObjectID>)],
179 cleared_events: &[(u64, Vec<ObjectID>)],
180 ) -> HashMap<ObjectID, CongestionInfo> {
181 let mut congestion_info_map: HashMap<ObjectID, CongestionInfo> = HashMap::new();
182
183 for (gas_price, objects) in congestion_events {
184 for object in objects {
185 match congestion_info_map.entry(*object) {
186 Entry::Occupied(entry) => {
187 entry.into_mut().update_cancellation_gas_price(*gas_price);
188 }
189 Entry::Vacant(entry) => {
190 let info = CongestionInfo {
191 last_cancellation_time: now,
192 highest_cancelled_gas_price: *gas_price,
193 last_success_time: None,
194 lowest_executed_gas_price: None,
195 };
196
197 entry.insert(info);
198 }
199 }
200 }
201 }
202
203 for (gas_price, objects) in cleared_events {
204 for object in objects {
205 match congestion_info_map.entry(*object) {
207 Entry::Occupied(entry) => {
208 entry.into_mut().update_for_success(now, *gas_price);
209 }
210 Entry::Vacant(entry) => {
211 if let Some(prev) = self.get_congestion_info(*object) {
212 let info = CongestionInfo {
213 last_cancellation_time: prev.last_cancellation_time,
214 highest_cancelled_gas_price: prev.highest_cancelled_gas_price,
215 last_success_time: Some(now),
216 lowest_executed_gas_price: Some(*gas_price),
217 };
218 entry.insert(info);
219 }
220 }
221 }
222 }
223 }
224
225 congestion_info_map
226 }
227
228 fn process_checkpoint_congestion(
229 &self,
230 congestion_info_map: HashMap<ObjectID, CongestionInfo>,
231 ) {
232 for (object_id, info) in congestion_info_map {
233 self.congestion_clearing_prices
234 .entry(object_id)
235 .and_compute_with(|maybe_entry| {
236 if let Some(e) = maybe_entry {
237 let mut e = e.into_value();
238 e.update_for_new_checkpoint(&info);
239 Op::Put(e)
240 } else {
241 Op::Put(info)
242 }
243 });
244 }
245 }
246
247 fn get_congestion_info(&self, object_id: ObjectID) -> Option<CongestionInfo> {
248 self.congestion_clearing_prices.get(&object_id)
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255
256 #[test]
257 fn test_process_events_new_congestion() {
258 let tracker = CongestionTracker::new();
259 let obj1 = ObjectID::random();
260 let obj2 = ObjectID::random();
261 let now = 1000;
262
263 tracker.process_per_checkpoint_events(now, &[(100, vec![obj1]), (200, vec![obj2])], &[]);
264
265 assert_eq!(
266 tracker.get_suggested_gas_price_for_objects(vec![obj1].into_iter()),
267 Some(100)
268 );
269 assert_eq!(
270 tracker.get_suggested_gas_price_for_objects(vec![obj2].into_iter()),
271 Some(200)
272 );
273 }
274
275 #[test]
276 fn test_process_events_congestion_then_success() {
277 let tracker = CongestionTracker::new();
278 let obj = ObjectID::random();
279
280 tracker.process_per_checkpoint_events(1000, &[(100, vec![obj]), (75, vec![obj])], &[]);
282 assert_eq!(
283 tracker.get_suggested_gas_price_for_objects(vec![obj].into_iter()),
284 Some(100)
285 );
286
287 tracker.process_per_checkpoint_events(2000, &[], &[(150, vec![obj])]);
289 assert_eq!(
290 tracker.get_suggested_gas_price_for_objects(vec![obj].into_iter()),
291 None,
292 );
293
294 tracker.process_per_checkpoint_events(
296 3000,
297 &[(100, vec![obj])],
298 &[(175, vec![obj]), (125, vec![obj])],
299 );
300 assert_eq!(
301 tracker.get_suggested_gas_price_for_objects(vec![obj].into_iter()),
302 Some(125)
303 );
304 }
305
306 #[test]
307 fn test_get_suggested_gas_price_multiple_objects() {
308 let tracker = CongestionTracker::new();
309 let obj1 = ObjectID::random();
310 let obj2 = ObjectID::random();
311
312 tracker.process_per_checkpoint_events(1000, &[(100, vec![obj1]), (200, vec![obj2])], &[]);
314
315 assert_eq!(
317 tracker.get_suggested_gas_price_for_objects(vec![obj1, obj2].into_iter()),
318 Some(200)
319 );
320
321 tracker.process_per_checkpoint_events(
323 2000,
324 &[(100, vec![obj1]), (200, vec![obj2])],
325 &[(100, vec![obj1]), (150, vec![obj2])],
326 );
327 assert_eq!(
329 tracker.get_suggested_gas_price_for_objects(vec![obj1, obj2].into_iter()),
330 Some(150)
331 );
332 }
333}