sui_core/accumulators/object_funds_checker/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    sync::Arc,
7};
8
9use mysten_common::assert_reachable;
10use parking_lot::RwLock;
11use sui_types::{
12    SUI_ACCUMULATOR_ROOT_OBJECT_ID,
13    accumulator_root::AccumulatorObjId,
14    base_types::SequenceNumber,
15    effects::{TransactionEffects, TransactionEffectsAPI},
16    executable_transaction::VerifiedExecutableTransaction,
17    execution_params::FundsWithdrawStatus,
18    execution_status::ExecutionStatus,
19    transaction::TransactionDataAPI,
20};
21use tokio::{
22    sync::{oneshot, watch},
23    time::Instant,
24};
25use tracing::{debug, instrument};
26
27use crate::{
28    accumulators::funds_read::AccountFundsRead,
29    authority::{ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore},
30    execution_scheduler::ExecutionScheduler,
31};
32
33#[cfg(test)]
34mod integration_tests;
35pub mod metrics;
36#[cfg(test)]
37mod unit_tests;
38
39/// Note that there is no need to have a separate InsufficientFunds variant.
40/// If the funds are insufficient, the execution would still have to abort and rely on
41/// a rescheduling to be able to execute again.
42pub enum ObjectFundsWithdrawStatus {
43    SufficientFunds,
44    // The receiver will be notified when the funds are determined to be sufficient or insufficient.
45    // The bool is true if the funds are sufficient, false if the funds are insufficient.
46    Pending(oneshot::Receiver<FundsWithdrawStatus>),
47}
48
49pub struct ObjectFundsChecker {
50    /// Watchers to keep track the last settled accumulator version.
51    /// This is updated whenever the settlement barrier transaction is executed.
52    last_settled_version_sender: watch::Sender<SequenceNumber>,
53    last_settled_version_receiver: watch::Receiver<SequenceNumber>,
54    inner: RwLock<Inner>,
55    metrics: Arc<metrics::ObjectFundsCheckerMetrics>,
56}
57
58#[derive(Default)]
59struct Inner {
60    /// Tracks the amount of pending unsettled withdraws for each account at each accumulator version.
61    /// When we check object funds sufficiency, we read the balance bounded by the withdraw accumulator version.
62    /// Balance are updated only by settlement transactions, not when we withdraw funds.
63    /// Hence when we are checking object funds, on top of the settled balance, we also need to account for
64    /// the amount of withdraws from the same consensus commit (that all reads from the same accumulator version).
65    unsettled_withdraws: BTreeMap<AccumulatorObjId, BTreeMap<SequenceNumber, u128>>,
66    /// Tracks the accounts that have pending withdraws at each accumulator version.
67    /// This information is not required for functional correctness, but needed to garbage collect
68    /// unused entries in unsettled_withdraws that are now fully committed. Without doing so unsettled_withdraws
69    /// may grow unbounded.
70    unsettled_accounts: BTreeMap<SequenceNumber, BTreeSet<AccumulatorObjId>>,
71}
72
73impl ObjectFundsChecker {
74    pub fn new(
75        starting_accumulator_version: SequenceNumber,
76        metrics: Arc<metrics::ObjectFundsCheckerMetrics>,
77    ) -> Self {
78        let (last_settled_version_sender, last_settled_version_receiver) =
79            watch::channel(starting_accumulator_version);
80        Self {
81            last_settled_version_sender,
82            last_settled_version_receiver,
83            inner: RwLock::new(Inner::default()),
84            metrics,
85        }
86    }
87
88    #[instrument(level = "debug", skip_all, fields(tx_digest = ?certificate.digest()))]
89    pub fn should_commit_object_funds_withdraws(
90        &self,
91        certificate: &VerifiedExecutableTransaction,
92        execution_status: &ExecutionStatus,
93        accumulator_running_max_withdraws: &BTreeMap<AccumulatorObjId, u128>,
94        execution_env: &ExecutionEnv,
95        funds_read: &Arc<dyn AccountFundsRead>,
96        execution_scheduler: &Arc<ExecutionScheduler>,
97        epoch_store: &Arc<AuthorityPerEpochStore>,
98    ) -> bool {
99        if execution_status.is_err() {
100            // This transaction already failed. It does not matter any more
101            // whether it has sufficient object funds or not.
102            debug!("Transaction failed, committing effects");
103            return true;
104        }
105        let address_funds_reservations: BTreeSet<_> = certificate
106            .transaction_data()
107            .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier())
108            .into_keys()
109            .collect();
110        // All withdraws will show up as accumulator events with integer values.
111        // Among them, addresses that do not have funds reservations are object
112        // withdraws.
113        let object_withdraws: BTreeMap<_, _> = accumulator_running_max_withdraws
114            .clone()
115            .into_iter()
116            .filter(|(account, _)| !address_funds_reservations.contains(account))
117            .collect();
118        // If there are no object withdraws, we can skip checking object funds.
119        if object_withdraws.is_empty() {
120            debug!("No object withdraws, committing effects");
121            return true;
122        }
123        let Some(accumulator_version) = execution_env.assigned_versions.accumulator_version else {
124            // Fastpath transactions that perform object funds withdraws
125            // must wait for consensus to assign the accumulator version.
126            // We cannot optimize the scheduling by processing fastpath object withdraws
127            // sooner because these may get reverted, and we don't want them
128            // pollute the scheduler tracking state.
129            // TODO: We could however optimize execution by caching
130            // the execution state to avoid re-execution.
131            return false;
132        };
133        match self.check_object_funds(object_withdraws, accumulator_version, funds_read.as_ref()) {
134            // Sufficient funds, we can go ahead and commit the execution results as it is.
135            ObjectFundsWithdrawStatus::SufficientFunds => {
136                assert_reachable!("object funds sufficient");
137                debug!("Object funds sufficient, committing effects");
138                self.metrics
139                    .check_result
140                    .with_label_values(&["sufficient"])
141                    .inc();
142                true
143            }
144            // Currently insufficient funds. We need to wait until it reach a deterministic state
145            // before we can determine if it is really insufficient (to include potential deposits)
146            // At that time we will have to re-enqueue the transaction for execution again.
147            // Re-enqueue is handled here so the caller does not need to worry about it.
148            ObjectFundsWithdrawStatus::Pending(receiver) => {
149                self.metrics.pending_checks.inc();
150                let timer = self.metrics.pending_check_latency.start_timer();
151                let pending_metrics = self.metrics.clone();
152                let scheduler = execution_scheduler.clone();
153                let cert = certificate.clone();
154                let mut execution_env = execution_env.clone();
155                let epoch_store = epoch_store.clone();
156                tokio::task::spawn(async move {
157                    // It is possible that checkpoint executor finished executing
158                    // the current epoch and went ahead with epoch change asynchronously,
159                    // while this is still waiting.
160                    let inner_metrics = pending_metrics.clone();
161                    let _ = epoch_store
162                        .within_alive_epoch(async move {
163                            let tx_digest = cert.digest();
164                            match receiver.await {
165                                Ok(FundsWithdrawStatus::MaybeSufficient) => {
166                                    assert_reachable!("object funds maybe sufficient");
167                                    // The withdraw state is now deterministically known,
168                                    // so we can enqueue the transaction again and it will check again
169                                    // whether it is sufficient or not in the next execution.
170                                    // TODO: We should be able to optimize this by avoiding re-execution.
171                                    debug!(?tx_digest, "Object funds possibly sufficient");
172                                }
173                                Ok(FundsWithdrawStatus::Insufficient) => {
174                                    assert_reachable!("object funds insufficient");
175                                    // Re-enqueue with insufficient funds status, so it will be executed
176                                    // in the next execution and fail through early error.
177                                    // FIXME: We need to also track the amount of gas that was used,
178                                    // so that we could charge properly in the next execution when we
179                                    // go through early error. Otherwise we would undercharge.
180                                    execution_env = execution_env.with_insufficient_funds();
181                                    inner_metrics
182                                        .check_result
183                                        .with_label_values(&["insufficient"])
184                                        .inc();
185                                    debug!(?tx_digest, "Object funds insufficient");
186                                }
187                                Err(e) => {
188                                    tracing::error!(
189                                        "Error receiving funds withdraw status: {:?}",
190                                        e
191                                    );
192                                }
193                            }
194                            scheduler.send_transaction_for_execution(
195                                &cert,
196                                execution_env,
197                                // TODO: Should the enqueue_time be the original enqueue time
198                                // of this transaction?
199                                Instant::now(),
200                            );
201                        })
202                        .await;
203                    timer.observe_duration();
204                    pending_metrics.pending_checks.dec();
205                });
206                false
207            }
208        }
209    }
210
211    fn check_object_funds(
212        &self,
213        object_withdraws: BTreeMap<AccumulatorObjId, u128>,
214        accumulator_version: SequenceNumber,
215        funds_read: &dyn AccountFundsRead,
216    ) -> ObjectFundsWithdrawStatus {
217        let last_settled_version = *self.last_settled_version_receiver.borrow();
218        if accumulator_version <= last_settled_version {
219            // If the version we are withdrawing from is already settled, we have all the information
220            // we need to determine if the funds are sufficient or not.
221            if self.try_withdraw(funds_read, &object_withdraws, accumulator_version) {
222                return ObjectFundsWithdrawStatus::SufficientFunds;
223            } else {
224                let (sender, receiver) = oneshot::channel();
225                // unwrap is safe because the receiver is defined right above.
226                sender.send(FundsWithdrawStatus::Insufficient).unwrap();
227                return ObjectFundsWithdrawStatus::Pending(receiver);
228            }
229        }
230
231        // Spawn a task to wait for the last settled version to become accumulator_version,
232        // before we could check again.
233        let last_settled_version_sender = self.last_settled_version_sender.clone();
234        let (sender, receiver) = oneshot::channel();
235        tokio::spawn(async move {
236            let mut version_receiver = last_settled_version_sender.subscribe();
237            // The wait is guaranteed to be notified because we update version after executing each settlement transaction,
238            // and every settlement transaction must eventually be executed.
239            let res = version_receiver
240                .wait_for(|v| *v >= accumulator_version)
241                .await;
242            if res.is_err() {
243                // This shouldn't happen, but just to be safe.
244                tracing::error!("Last settled accumulator version receiver channel closed");
245                return;
246            }
247            // We notify the waiter that the funds are now deterministically known,
248            // but we don't need to check here whether they are sufficient or not.
249            // Next time during execution we will check again.
250            let _ = sender.send(FundsWithdrawStatus::MaybeSufficient);
251        });
252        ObjectFundsWithdrawStatus::Pending(receiver)
253    }
254
255    fn try_withdraw(
256        &self,
257        funds_read: &dyn AccountFundsRead,
258        object_withdraws: &BTreeMap<AccumulatorObjId, u128>,
259        accumulator_version: SequenceNumber,
260    ) -> bool {
261        for (obj_id, amount) in object_withdraws {
262            let funds = funds_read.get_account_amount_at_version(obj_id, accumulator_version);
263            // Reading inner without a top-level lock is safe because no two transactions can be withdrawing
264            // from the same account at the same time.
265            let unsettled_withdraw = self
266                .inner
267                .read()
268                .unsettled_withdraws
269                .get(obj_id)
270                .and_then(|withdraws| withdraws.get(&accumulator_version))
271                .copied()
272                .unwrap_or_default();
273            debug!(
274                ?obj_id,
275                ?funds,
276                ?accumulator_version,
277                ?unsettled_withdraw,
278                ?amount,
279                "Trying to withdraw"
280            );
281            assert!(funds >= unsettled_withdraw);
282            if funds - unsettled_withdraw < *amount {
283                return false;
284            }
285        }
286        let mut inner = self.inner.write();
287        for (obj_id, amount) in object_withdraws {
288            let entry = inner
289                .unsettled_withdraws
290                .entry(*obj_id)
291                .or_default()
292                .entry(accumulator_version)
293                .or_default();
294            debug!(?obj_id, ?amount, ?entry, "Updating unsettled withdraws");
295            *entry = entry.checked_add(*amount).unwrap();
296
297            inner
298                .unsettled_accounts
299                .entry(accumulator_version)
300                .or_default()
301                .insert(*obj_id);
302        }
303        self.metrics
304            .unsettled_accounts
305            .set(inner.unsettled_withdraws.len() as i64);
306        self.metrics
307            .unsettled_versions
308            .set(inner.unsettled_accounts.len() as i64);
309        true
310    }
311
312    pub fn settle_accumulator_version(&self, next_accumulator_version: SequenceNumber) {
313        // unwrap is safe because a receiver is always alive as part of self.
314        self.last_settled_version_sender
315            .send(next_accumulator_version)
316            .unwrap();
317        self.metrics
318            .highest_settled_version
319            .set(next_accumulator_version.value() as i64);
320    }
321
322    pub fn commit_effects<'a>(
323        &self,
324        committed_effects: impl Iterator<Item = &'a TransactionEffects>,
325    ) {
326        let committed_accumulator_versions = committed_effects
327            .filter_map(|effects| {
328                effects.object_changes().into_iter().find_map(|o| {
329                    if o.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
330                        o.input_version
331                    } else {
332                        None
333                    }
334                })
335            })
336            .collect::<Vec<_>>();
337        self.commit_accumulator_versions(committed_accumulator_versions);
338    }
339
340    fn commit_accumulator_versions(&self, committed_accumulator_versions: Vec<SequenceNumber>) {
341        let mut inner = self.inner.write();
342        for accumulator_version in committed_accumulator_versions {
343            let accounts = inner
344                .unsettled_accounts
345                .remove(&accumulator_version)
346                .unwrap_or_default();
347            for account in accounts {
348                let withdraws = inner.unsettled_withdraws.get_mut(&account);
349                if let Some(withdraws) = withdraws {
350                    withdraws.remove(&accumulator_version);
351                    if withdraws.is_empty() {
352                        inner.unsettled_withdraws.remove(&account);
353                    }
354                }
355            }
356        }
357        self.metrics
358            .unsettled_accounts
359            .set(inner.unsettled_withdraws.len() as i64);
360        self.metrics
361            .unsettled_versions
362            .set(inner.unsettled_accounts.len() as i64);
363    }
364
365    #[cfg(test)]
366    pub fn get_current_accumulator_version(&self) -> SequenceNumber {
367        *self.last_settled_version_receiver.borrow()
368    }
369
370    #[cfg(test)]
371    pub fn is_empty(&self) -> bool {
372        let inner = self.inner.read();
373        inner.unsettled_withdraws.is_empty() && inner.unsettled_accounts.is_empty()
374    }
375}