sui_core/accumulators/object_funds_checker/
mod.rs1use std::{
5 collections::{BTreeMap, BTreeSet},
6 sync::Arc,
7};
8
9use mysten_common::assert_reachable;
10use parking_lot::RwLock;
11use sui_types::{
12 SUI_ACCUMULATOR_ROOT_OBJECT_ID,
13 accumulator_root::AccumulatorObjId,
14 base_types::SequenceNumber,
15 effects::{TransactionEffects, TransactionEffectsAPI},
16 executable_transaction::VerifiedExecutableTransaction,
17 execution_params::FundsWithdrawStatus,
18 execution_status::ExecutionStatus,
19 transaction::TransactionDataAPI,
20};
21use tokio::{
22 sync::{oneshot, watch},
23 time::Instant,
24};
25use tracing::{debug, instrument};
26
27use crate::{
28 accumulators::funds_read::AccountFundsRead,
29 authority::{ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore},
30 execution_scheduler::ExecutionScheduler,
31};
32
33#[cfg(test)]
34mod integration_tests;
35pub mod metrics;
36#[cfg(test)]
37mod unit_tests;
38
39pub enum ObjectFundsWithdrawStatus {
43 SufficientFunds,
44 Pending(oneshot::Receiver<FundsWithdrawStatus>),
47}
48
49pub struct ObjectFundsChecker {
50 last_settled_version_sender: watch::Sender<SequenceNumber>,
53 last_settled_version_receiver: watch::Receiver<SequenceNumber>,
54 inner: RwLock<Inner>,
55 metrics: Arc<metrics::ObjectFundsCheckerMetrics>,
56}
57
58#[derive(Default)]
59struct Inner {
60 unsettled_withdraws: BTreeMap<AccumulatorObjId, BTreeMap<SequenceNumber, u128>>,
66 unsettled_accounts: BTreeMap<SequenceNumber, BTreeSet<AccumulatorObjId>>,
71}
72
73impl ObjectFundsChecker {
74 pub fn new(
75 starting_accumulator_version: SequenceNumber,
76 metrics: Arc<metrics::ObjectFundsCheckerMetrics>,
77 ) -> Self {
78 let (last_settled_version_sender, last_settled_version_receiver) =
79 watch::channel(starting_accumulator_version);
80 Self {
81 last_settled_version_sender,
82 last_settled_version_receiver,
83 inner: RwLock::new(Inner::default()),
84 metrics,
85 }
86 }
87
88 #[instrument(level = "debug", skip_all, fields(tx_digest = ?certificate.digest()))]
89 pub fn should_commit_object_funds_withdraws(
90 &self,
91 certificate: &VerifiedExecutableTransaction,
92 execution_status: &ExecutionStatus,
93 accumulator_running_max_withdraws: &BTreeMap<AccumulatorObjId, u128>,
94 execution_env: &ExecutionEnv,
95 funds_read: &Arc<dyn AccountFundsRead>,
96 execution_scheduler: &Arc<ExecutionScheduler>,
97 epoch_store: &Arc<AuthorityPerEpochStore>,
98 ) -> bool {
99 if execution_status.is_err() {
100 debug!("Transaction failed, committing effects");
103 return true;
104 }
105 let address_funds_reservations: BTreeSet<_> = certificate
106 .transaction_data()
107 .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier())
108 .into_keys()
109 .collect();
110 let object_withdraws: BTreeMap<_, _> = accumulator_running_max_withdraws
114 .clone()
115 .into_iter()
116 .filter(|(account, _)| !address_funds_reservations.contains(account))
117 .collect();
118 if object_withdraws.is_empty() {
120 debug!("No object withdraws, committing effects");
121 return true;
122 }
123 let Some(accumulator_version) = execution_env.assigned_versions.accumulator_version else {
124 return false;
132 };
133 match self.check_object_funds(object_withdraws, accumulator_version, funds_read.as_ref()) {
134 ObjectFundsWithdrawStatus::SufficientFunds => {
136 assert_reachable!("object funds sufficient");
137 debug!("Object funds sufficient, committing effects");
138 self.metrics
139 .check_result
140 .with_label_values(&["sufficient"])
141 .inc();
142 true
143 }
144 ObjectFundsWithdrawStatus::Pending(receiver) => {
149 self.metrics.pending_checks.inc();
150 let timer = self.metrics.pending_check_latency.start_timer();
151 let pending_metrics = self.metrics.clone();
152 let scheduler = execution_scheduler.clone();
153 let cert = certificate.clone();
154 let mut execution_env = execution_env.clone();
155 let epoch_store = epoch_store.clone();
156 tokio::task::spawn(async move {
157 let inner_metrics = pending_metrics.clone();
161 let _ = epoch_store
162 .within_alive_epoch(async move {
163 let tx_digest = cert.digest();
164 match receiver.await {
165 Ok(FundsWithdrawStatus::MaybeSufficient) => {
166 assert_reachable!("object funds maybe sufficient");
167 debug!(?tx_digest, "Object funds possibly sufficient");
172 }
173 Ok(FundsWithdrawStatus::Insufficient) => {
174 assert_reachable!("object funds insufficient");
175 execution_env = execution_env.with_insufficient_funds();
181 inner_metrics
182 .check_result
183 .with_label_values(&["insufficient"])
184 .inc();
185 debug!(?tx_digest, "Object funds insufficient");
186 }
187 Err(e) => {
188 tracing::error!(
189 "Error receiving funds withdraw status: {:?}",
190 e
191 );
192 }
193 }
194 scheduler.send_transaction_for_execution(
195 &cert,
196 execution_env,
197 Instant::now(),
200 );
201 })
202 .await;
203 timer.observe_duration();
204 pending_metrics.pending_checks.dec();
205 });
206 false
207 }
208 }
209 }
210
211 fn check_object_funds(
212 &self,
213 object_withdraws: BTreeMap<AccumulatorObjId, u128>,
214 accumulator_version: SequenceNumber,
215 funds_read: &dyn AccountFundsRead,
216 ) -> ObjectFundsWithdrawStatus {
217 let last_settled_version = *self.last_settled_version_receiver.borrow();
218 if accumulator_version <= last_settled_version {
219 if self.try_withdraw(funds_read, &object_withdraws, accumulator_version) {
222 return ObjectFundsWithdrawStatus::SufficientFunds;
223 } else {
224 let (sender, receiver) = oneshot::channel();
225 sender.send(FundsWithdrawStatus::Insufficient).unwrap();
227 return ObjectFundsWithdrawStatus::Pending(receiver);
228 }
229 }
230
231 let last_settled_version_sender = self.last_settled_version_sender.clone();
234 let (sender, receiver) = oneshot::channel();
235 tokio::spawn(async move {
236 let mut version_receiver = last_settled_version_sender.subscribe();
237 let res = version_receiver
240 .wait_for(|v| *v >= accumulator_version)
241 .await;
242 if res.is_err() {
243 tracing::error!("Last settled accumulator version receiver channel closed");
245 return;
246 }
247 let _ = sender.send(FundsWithdrawStatus::MaybeSufficient);
251 });
252 ObjectFundsWithdrawStatus::Pending(receiver)
253 }
254
255 fn try_withdraw(
256 &self,
257 funds_read: &dyn AccountFundsRead,
258 object_withdraws: &BTreeMap<AccumulatorObjId, u128>,
259 accumulator_version: SequenceNumber,
260 ) -> bool {
261 for (obj_id, amount) in object_withdraws {
262 let funds = funds_read.get_account_amount_at_version(obj_id, accumulator_version);
263 let unsettled_withdraw = self
266 .inner
267 .read()
268 .unsettled_withdraws
269 .get(obj_id)
270 .and_then(|withdraws| withdraws.get(&accumulator_version))
271 .copied()
272 .unwrap_or_default();
273 debug!(
274 ?obj_id,
275 ?funds,
276 ?accumulator_version,
277 ?unsettled_withdraw,
278 ?amount,
279 "Trying to withdraw"
280 );
281 assert!(funds >= unsettled_withdraw);
282 if funds - unsettled_withdraw < *amount {
283 return false;
284 }
285 }
286 let mut inner = self.inner.write();
287 for (obj_id, amount) in object_withdraws {
288 let entry = inner
289 .unsettled_withdraws
290 .entry(*obj_id)
291 .or_default()
292 .entry(accumulator_version)
293 .or_default();
294 debug!(?obj_id, ?amount, ?entry, "Updating unsettled withdraws");
295 *entry = entry.checked_add(*amount).unwrap();
296
297 inner
298 .unsettled_accounts
299 .entry(accumulator_version)
300 .or_default()
301 .insert(*obj_id);
302 }
303 self.metrics
304 .unsettled_accounts
305 .set(inner.unsettled_withdraws.len() as i64);
306 self.metrics
307 .unsettled_versions
308 .set(inner.unsettled_accounts.len() as i64);
309 true
310 }
311
312 pub fn settle_accumulator_version(&self, next_accumulator_version: SequenceNumber) {
313 self.last_settled_version_sender
315 .send(next_accumulator_version)
316 .unwrap();
317 self.metrics
318 .highest_settled_version
319 .set(next_accumulator_version.value() as i64);
320 }
321
322 pub fn commit_effects<'a>(
323 &self,
324 committed_effects: impl Iterator<Item = &'a TransactionEffects>,
325 ) {
326 let committed_accumulator_versions = committed_effects
327 .filter_map(|effects| {
328 effects.object_changes().into_iter().find_map(|o| {
329 if o.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
330 o.input_version
331 } else {
332 None
333 }
334 })
335 })
336 .collect::<Vec<_>>();
337 self.commit_accumulator_versions(committed_accumulator_versions);
338 }
339
340 fn commit_accumulator_versions(&self, committed_accumulator_versions: Vec<SequenceNumber>) {
341 let mut inner = self.inner.write();
342 for accumulator_version in committed_accumulator_versions {
343 let accounts = inner
344 .unsettled_accounts
345 .remove(&accumulator_version)
346 .unwrap_or_default();
347 for account in accounts {
348 let withdraws = inner.unsettled_withdraws.get_mut(&account);
349 if let Some(withdraws) = withdraws {
350 withdraws.remove(&accumulator_version);
351 if withdraws.is_empty() {
352 inner.unsettled_withdraws.remove(&account);
353 }
354 }
355 }
356 }
357 self.metrics
358 .unsettled_accounts
359 .set(inner.unsettled_withdraws.len() as i64);
360 self.metrics
361 .unsettled_versions
362 .set(inner.unsettled_accounts.len() as i64);
363 }
364
365 #[cfg(test)]
366 pub fn get_current_accumulator_version(&self) -> SequenceNumber {
367 *self.last_settled_version_receiver.borrow()
368 }
369
370 #[cfg(test)]
371 pub fn is_empty(&self) -> bool {
372 let inner = self.inner.read();
373 inner.unsettled_withdraws.is_empty() && inner.unsettled_accounts.is_empty()
374 }
375}