sui_core/accumulators/object_funds_checker/
mod.rs1use std::{
5 collections::{BTreeMap, BTreeSet},
6 sync::Arc,
7};
8
9use mysten_common::{assert_reachable, debug_fatal};
10use parking_lot::RwLock;
11use sui_types::{
12 SUI_ACCUMULATOR_ROOT_OBJECT_ID,
13 accumulator_root::AccumulatorObjId,
14 base_types::SequenceNumber,
15 effects::{TransactionEffects, TransactionEffectsAPI},
16 executable_transaction::VerifiedExecutableTransaction,
17 execution_params::FundsWithdrawStatus,
18 execution_status::ExecutionStatus,
19 transaction::TransactionDataAPI,
20};
21use tokio::{
22 sync::{oneshot, watch},
23 time::Instant,
24};
25use tracing::{debug, instrument};
26
27use crate::{
28 accumulators::funds_read::AccountFundsRead,
29 authority::{ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore},
30 execution_scheduler::ExecutionScheduler,
31};
32
33#[cfg(test)]
34mod integration_tests;
35pub mod metrics;
36#[cfg(test)]
37mod unit_tests;
38
39pub enum ObjectFundsWithdrawStatus {
43 SufficientFunds,
44 Pending(oneshot::Receiver<FundsWithdrawStatus>),
47}
48
49pub struct ObjectFundsChecker {
50 last_settled_version_sender: watch::Sender<SequenceNumber>,
53 last_settled_version_receiver: watch::Receiver<SequenceNumber>,
54 inner: RwLock<Inner>,
55 metrics: Arc<metrics::ObjectFundsCheckerMetrics>,
56}
57
58#[derive(Default)]
59struct Inner {
60 unsettled_withdraws: BTreeMap<AccumulatorObjId, BTreeMap<SequenceNumber, u128>>,
66 unsettled_accounts: BTreeMap<SequenceNumber, BTreeSet<AccumulatorObjId>>,
71}
72
73impl ObjectFundsChecker {
74 pub fn new(
75 starting_accumulator_version: SequenceNumber,
76 metrics: Arc<metrics::ObjectFundsCheckerMetrics>,
77 ) -> Self {
78 let (last_settled_version_sender, last_settled_version_receiver) =
79 watch::channel(starting_accumulator_version);
80 Self {
81 last_settled_version_sender,
82 last_settled_version_receiver,
83 inner: RwLock::new(Inner::default()),
84 metrics,
85 }
86 }
87
88 #[instrument(level = "debug", skip_all, fields(tx_digest = ?certificate.digest()))]
89 pub fn should_commit_object_funds_withdraws(
90 &self,
91 certificate: &VerifiedExecutableTransaction,
92 execution_status: &ExecutionStatus,
93 accumulator_running_max_withdraws: &BTreeMap<AccumulatorObjId, u128>,
94 execution_env: &ExecutionEnv,
95 funds_read: &Arc<dyn AccountFundsRead>,
96 execution_scheduler: &Arc<ExecutionScheduler>,
97 epoch_store: &Arc<AuthorityPerEpochStore>,
98 ) -> bool {
99 if execution_status.is_err() {
100 debug!("Transaction failed, committing effects");
103 return true;
104 }
105 let address_funds_reservations: BTreeSet<_> = certificate
106 .transaction_data()
107 .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier())
108 .into_keys()
109 .collect();
110 let object_withdraws: BTreeMap<_, _> = accumulator_running_max_withdraws
114 .clone()
115 .into_iter()
116 .filter(|(account, _)| !address_funds_reservations.contains(account))
117 .collect();
118 if object_withdraws.is_empty() {
120 debug!("No object withdraws, committing effects");
121 return true;
122 }
123 let Some(accumulator_version) = execution_env.assigned_versions.accumulator_version else {
128 debug_fatal!("accumulator_version must be set for a tx with object withdraws");
129 return false;
130 };
131 match self.check_object_funds(object_withdraws, accumulator_version, funds_read.as_ref()) {
132 ObjectFundsWithdrawStatus::SufficientFunds => {
134 assert_reachable!("object funds sufficient");
135 debug!("Object funds sufficient, committing effects");
136 self.metrics
137 .check_result
138 .with_label_values(&["sufficient"])
139 .inc();
140 true
141 }
142 ObjectFundsWithdrawStatus::Pending(receiver) => {
147 self.metrics.pending_checks.inc();
148 let timer = self.metrics.pending_check_latency.start_timer();
149 let pending_metrics = self.metrics.clone();
150 let scheduler = execution_scheduler.clone();
151 let cert = certificate.clone();
152 let mut execution_env = execution_env.clone();
153 let epoch_store = epoch_store.clone();
154 tokio::task::spawn(async move {
155 let inner_metrics = pending_metrics.clone();
159 let _ = epoch_store
160 .within_alive_epoch(async move {
161 let tx_digest = cert.digest();
162 match receiver.await {
163 Ok(FundsWithdrawStatus::MaybeSufficient) => {
164 assert_reachable!("object funds maybe sufficient");
165 debug!(?tx_digest, "Object funds possibly sufficient");
170 }
171 Ok(FundsWithdrawStatus::Insufficient) => {
172 assert_reachable!("object funds insufficient");
173 execution_env = execution_env.with_insufficient_funds();
179 inner_metrics
180 .check_result
181 .with_label_values(&["insufficient"])
182 .inc();
183 debug!(?tx_digest, "Object funds insufficient");
184 }
185 Err(e) => {
186 tracing::error!(
187 "Error receiving funds withdraw status: {:?}",
188 e
189 );
190 }
191 }
192 scheduler.send_transaction_for_execution(
193 &cert,
194 execution_env,
195 Instant::now(),
198 );
199 })
200 .await;
201 timer.observe_duration();
202 pending_metrics.pending_checks.dec();
203 });
204 false
205 }
206 }
207 }
208
209 fn check_object_funds(
210 &self,
211 object_withdraws: BTreeMap<AccumulatorObjId, u128>,
212 accumulator_version: SequenceNumber,
213 funds_read: &dyn AccountFundsRead,
214 ) -> ObjectFundsWithdrawStatus {
215 let last_settled_version = *self.last_settled_version_receiver.borrow();
216 if accumulator_version <= last_settled_version {
217 if self.try_withdraw(funds_read, &object_withdraws, accumulator_version) {
220 return ObjectFundsWithdrawStatus::SufficientFunds;
221 } else {
222 let (sender, receiver) = oneshot::channel();
223 sender.send(FundsWithdrawStatus::Insufficient).unwrap();
225 return ObjectFundsWithdrawStatus::Pending(receiver);
226 }
227 }
228
229 let last_settled_version_sender = self.last_settled_version_sender.clone();
232 let (sender, receiver) = oneshot::channel();
233 tokio::spawn(async move {
234 let mut version_receiver = last_settled_version_sender.subscribe();
235 let res = version_receiver
238 .wait_for(|v| *v >= accumulator_version)
239 .await;
240 if res.is_err() {
241 tracing::error!("Last settled accumulator version receiver channel closed");
243 return;
244 }
245 let _ = sender.send(FundsWithdrawStatus::MaybeSufficient);
249 });
250 ObjectFundsWithdrawStatus::Pending(receiver)
251 }
252
253 fn try_withdraw(
254 &self,
255 funds_read: &dyn AccountFundsRead,
256 object_withdraws: &BTreeMap<AccumulatorObjId, u128>,
257 accumulator_version: SequenceNumber,
258 ) -> bool {
259 for (obj_id, amount) in object_withdraws {
260 let funds = funds_read.get_account_amount_at_version(obj_id, accumulator_version);
261 let unsettled_withdraw = self
264 .inner
265 .read()
266 .unsettled_withdraws
267 .get(obj_id)
268 .and_then(|withdraws| withdraws.get(&accumulator_version))
269 .copied()
270 .unwrap_or_default();
271 debug!(
272 ?obj_id,
273 ?funds,
274 ?accumulator_version,
275 ?unsettled_withdraw,
276 ?amount,
277 "Trying to withdraw"
278 );
279 assert!(funds >= unsettled_withdraw);
280 if funds - unsettled_withdraw < *amount {
281 return false;
282 }
283 }
284 let mut inner = self.inner.write();
285 for (obj_id, amount) in object_withdraws {
286 let entry = inner
287 .unsettled_withdraws
288 .entry(*obj_id)
289 .or_default()
290 .entry(accumulator_version)
291 .or_default();
292 debug!(?obj_id, ?amount, ?entry, "Updating unsettled withdraws");
293 *entry = entry.checked_add(*amount).unwrap();
294
295 inner
296 .unsettled_accounts
297 .entry(accumulator_version)
298 .or_default()
299 .insert(*obj_id);
300 }
301 self.metrics
302 .unsettled_accounts
303 .set(inner.unsettled_withdraws.len() as i64);
304 self.metrics
305 .unsettled_versions
306 .set(inner.unsettled_accounts.len() as i64);
307 true
308 }
309
310 pub fn settle_accumulator_version(&self, next_accumulator_version: SequenceNumber) {
311 self.last_settled_version_sender
313 .send(next_accumulator_version)
314 .unwrap();
315 self.metrics
316 .highest_settled_version
317 .set(next_accumulator_version.value() as i64);
318 }
319
320 pub fn commit_effects<'a>(
321 &self,
322 committed_effects: impl Iterator<Item = &'a TransactionEffects>,
323 ) {
324 let committed_accumulator_versions = committed_effects
325 .filter_map(|effects| {
326 effects.object_changes().into_iter().find_map(|o| {
327 if o.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
328 o.input_version
329 } else {
330 None
331 }
332 })
333 })
334 .collect::<Vec<_>>();
335 self.commit_accumulator_versions(committed_accumulator_versions);
336 }
337
338 fn commit_accumulator_versions(&self, committed_accumulator_versions: Vec<SequenceNumber>) {
339 let mut inner = self.inner.write();
340 for accumulator_version in committed_accumulator_versions {
341 let accounts = inner
342 .unsettled_accounts
343 .remove(&accumulator_version)
344 .unwrap_or_default();
345 for account in accounts {
346 let withdraws = inner.unsettled_withdraws.get_mut(&account);
347 if let Some(withdraws) = withdraws {
348 withdraws.remove(&accumulator_version);
349 if withdraws.is_empty() {
350 inner.unsettled_withdraws.remove(&account);
351 }
352 }
353 }
354 }
355 self.metrics
356 .unsettled_accounts
357 .set(inner.unsettled_withdraws.len() as i64);
358 self.metrics
359 .unsettled_versions
360 .set(inner.unsettled_accounts.len() as i64);
361 }
362
363 #[cfg(test)]
364 pub fn get_current_accumulator_version(&self) -> SequenceNumber {
365 *self.last_settled_version_receiver.borrow()
366 }
367
368 #[cfg(test)]
369 pub fn is_empty(&self) -> bool {
370 let inner = self.inner.read();
371 inner.unsettled_withdraws.is_empty() && inner.unsettled_accounts.is_empty()
372 }
373}