sui_core/accumulators/object_funds_checker/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    sync::Arc,
7};
8
9use mysten_common::{assert_reachable, debug_fatal};
10use parking_lot::RwLock;
11use sui_types::{
12    SUI_ACCUMULATOR_ROOT_OBJECT_ID,
13    accumulator_root::AccumulatorObjId,
14    base_types::SequenceNumber,
15    effects::{TransactionEffects, TransactionEffectsAPI},
16    executable_transaction::VerifiedExecutableTransaction,
17    execution_params::FundsWithdrawStatus,
18    execution_status::ExecutionStatus,
19    transaction::TransactionDataAPI,
20};
21use tokio::{
22    sync::{oneshot, watch},
23    time::Instant,
24};
25use tracing::{debug, instrument};
26
27use crate::{
28    accumulators::funds_read::AccountFundsRead,
29    authority::{ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore},
30    execution_scheduler::ExecutionScheduler,
31};
32
33#[cfg(test)]
34mod integration_tests;
35pub mod metrics;
36#[cfg(test)]
37mod unit_tests;
38
39/// Note that there is no need to have a separate InsufficientFunds variant.
40/// If the funds are insufficient, the execution would still have to abort and rely on
41/// a rescheduling to be able to execute again.
42pub enum ObjectFundsWithdrawStatus {
43    SufficientFunds,
44    // The receiver will be notified when the funds are determined to be sufficient or insufficient.
45    // The bool is true if the funds are sufficient, false if the funds are insufficient.
46    Pending(oneshot::Receiver<FundsWithdrawStatus>),
47}
48
49pub struct ObjectFundsChecker {
50    /// Watchers to keep track the last settled accumulator version.
51    /// This is updated whenever the settlement barrier transaction is executed.
52    last_settled_version_sender: watch::Sender<SequenceNumber>,
53    last_settled_version_receiver: watch::Receiver<SequenceNumber>,
54    inner: RwLock<Inner>,
55    metrics: Arc<metrics::ObjectFundsCheckerMetrics>,
56}
57
58#[derive(Default)]
59struct Inner {
60    /// Tracks the amount of pending unsettled withdraws for each account at each accumulator version.
61    /// When we check object funds sufficiency, we read the balance bounded by the withdraw accumulator version.
62    /// Balance are updated only by settlement transactions, not when we withdraw funds.
63    /// Hence when we are checking object funds, on top of the settled balance, we also need to account for
64    /// the amount of withdraws from the same consensus commit (that all reads from the same accumulator version).
65    unsettled_withdraws: BTreeMap<AccumulatorObjId, BTreeMap<SequenceNumber, u128>>,
66    /// Tracks the accounts that have pending withdraws at each accumulator version.
67    /// This information is not required for functional correctness, but needed to garbage collect
68    /// unused entries in unsettled_withdraws that are now fully committed. Without doing so unsettled_withdraws
69    /// may grow unbounded.
70    unsettled_accounts: BTreeMap<SequenceNumber, BTreeSet<AccumulatorObjId>>,
71}
72
73impl ObjectFundsChecker {
74    pub fn new(
75        starting_accumulator_version: SequenceNumber,
76        metrics: Arc<metrics::ObjectFundsCheckerMetrics>,
77    ) -> Self {
78        let (last_settled_version_sender, last_settled_version_receiver) =
79            watch::channel(starting_accumulator_version);
80        Self {
81            last_settled_version_sender,
82            last_settled_version_receiver,
83            inner: RwLock::new(Inner::default()),
84            metrics,
85        }
86    }
87
88    #[instrument(level = "debug", skip_all, fields(tx_digest = ?certificate.digest()))]
89    pub fn should_commit_object_funds_withdraws(
90        &self,
91        certificate: &VerifiedExecutableTransaction,
92        execution_status: &ExecutionStatus,
93        accumulator_running_max_withdraws: &BTreeMap<AccumulatorObjId, u128>,
94        execution_env: &ExecutionEnv,
95        funds_read: &Arc<dyn AccountFundsRead>,
96        execution_scheduler: &Arc<ExecutionScheduler>,
97        epoch_store: &Arc<AuthorityPerEpochStore>,
98    ) -> bool {
99        if execution_status.is_err() {
100            // This transaction already failed. It does not matter any more
101            // whether it has sufficient object funds or not.
102            debug!("Transaction failed, committing effects");
103            return true;
104        }
105        let address_funds_reservations: BTreeSet<_> = certificate
106            .transaction_data()
107            .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier())
108            .into_keys()
109            .collect();
110        // All withdraws will show up as accumulator events with integer values.
111        // Among them, addresses that do not have funds reservations are object
112        // withdraws.
113        let object_withdraws: BTreeMap<_, _> = accumulator_running_max_withdraws
114            .clone()
115            .into_iter()
116            .filter(|(account, _)| !address_funds_reservations.contains(account))
117            .collect();
118        // If there are no object withdraws, we can skip checking object funds.
119        if object_withdraws.is_empty() {
120            debug!("No object withdraws, committing effects");
121            return true;
122        }
123        // A tx with object withdraws can only exist when accumulators are enabled
124        // for the epoch, and every production path that produces such a tx also
125        // assigns an accumulator version. The `None` paths (accumulator-disabled
126        // epoch, end-of-epoch tx) never produce withdraws and so never reach here.
127        let Some(accumulator_version) = execution_env.assigned_versions.accumulator_version else {
128            debug_fatal!("accumulator_version must be set for a tx with object withdraws");
129            return false;
130        };
131        match self.check_object_funds(object_withdraws, accumulator_version, funds_read.as_ref()) {
132            // Sufficient funds, we can go ahead and commit the execution results as it is.
133            ObjectFundsWithdrawStatus::SufficientFunds => {
134                assert_reachable!("object funds sufficient");
135                debug!("Object funds sufficient, committing effects");
136                self.metrics
137                    .check_result
138                    .with_label_values(&["sufficient"])
139                    .inc();
140                true
141            }
142            // Currently insufficient funds. We need to wait until it reach a deterministic state
143            // before we can determine if it is really insufficient (to include potential deposits)
144            // At that time we will have to re-enqueue the transaction for execution again.
145            // Re-enqueue is handled here so the caller does not need to worry about it.
146            ObjectFundsWithdrawStatus::Pending(receiver) => {
147                self.metrics.pending_checks.inc();
148                let timer = self.metrics.pending_check_latency.start_timer();
149                let pending_metrics = self.metrics.clone();
150                let scheduler = execution_scheduler.clone();
151                let cert = certificate.clone();
152                let mut execution_env = execution_env.clone();
153                let epoch_store = epoch_store.clone();
154                tokio::task::spawn(async move {
155                    // It is possible that checkpoint executor finished executing
156                    // the current epoch and went ahead with epoch change asynchronously,
157                    // while this is still waiting.
158                    let inner_metrics = pending_metrics.clone();
159                    let _ = epoch_store
160                        .within_alive_epoch(async move {
161                            let tx_digest = cert.digest();
162                            match receiver.await {
163                                Ok(FundsWithdrawStatus::MaybeSufficient) => {
164                                    assert_reachable!("object funds maybe sufficient");
165                                    // The withdraw state is now deterministically known,
166                                    // so we can enqueue the transaction again and it will check again
167                                    // whether it is sufficient or not in the next execution.
168                                    // TODO: We should be able to optimize this by avoiding re-execution.
169                                    debug!(?tx_digest, "Object funds possibly sufficient");
170                                }
171                                Ok(FundsWithdrawStatus::Insufficient) => {
172                                    assert_reachable!("object funds insufficient");
173                                    // Re-enqueue with insufficient funds status, so it will be executed
174                                    // in the next execution and fail through early error.
175                                    // FIXME: We need to also track the amount of gas that was used,
176                                    // so that we could charge properly in the next execution when we
177                                    // go through early error. Otherwise we would undercharge.
178                                    execution_env = execution_env.with_insufficient_funds();
179                                    inner_metrics
180                                        .check_result
181                                        .with_label_values(&["insufficient"])
182                                        .inc();
183                                    debug!(?tx_digest, "Object funds insufficient");
184                                }
185                                Err(e) => {
186                                    tracing::error!(
187                                        "Error receiving funds withdraw status: {:?}",
188                                        e
189                                    );
190                                }
191                            }
192                            scheduler.send_transaction_for_execution(
193                                &cert,
194                                execution_env,
195                                // TODO: Should the enqueue_time be the original enqueue time
196                                // of this transaction?
197                                Instant::now(),
198                            );
199                        })
200                        .await;
201                    timer.observe_duration();
202                    pending_metrics.pending_checks.dec();
203                });
204                false
205            }
206        }
207    }
208
209    fn check_object_funds(
210        &self,
211        object_withdraws: BTreeMap<AccumulatorObjId, u128>,
212        accumulator_version: SequenceNumber,
213        funds_read: &dyn AccountFundsRead,
214    ) -> ObjectFundsWithdrawStatus {
215        let last_settled_version = *self.last_settled_version_receiver.borrow();
216        if accumulator_version <= last_settled_version {
217            // If the version we are withdrawing from is already settled, we have all the information
218            // we need to determine if the funds are sufficient or not.
219            if self.try_withdraw(funds_read, &object_withdraws, accumulator_version) {
220                return ObjectFundsWithdrawStatus::SufficientFunds;
221            } else {
222                let (sender, receiver) = oneshot::channel();
223                // unwrap is safe because the receiver is defined right above.
224                sender.send(FundsWithdrawStatus::Insufficient).unwrap();
225                return ObjectFundsWithdrawStatus::Pending(receiver);
226            }
227        }
228
229        // Spawn a task to wait for the last settled version to become accumulator_version,
230        // before we could check again.
231        let last_settled_version_sender = self.last_settled_version_sender.clone();
232        let (sender, receiver) = oneshot::channel();
233        tokio::spawn(async move {
234            let mut version_receiver = last_settled_version_sender.subscribe();
235            // The wait is guaranteed to be notified because we update version after executing each settlement transaction,
236            // and every settlement transaction must eventually be executed.
237            let res = version_receiver
238                .wait_for(|v| *v >= accumulator_version)
239                .await;
240            if res.is_err() {
241                // This shouldn't happen, but just to be safe.
242                tracing::error!("Last settled accumulator version receiver channel closed");
243                return;
244            }
245            // We notify the waiter that the funds are now deterministically known,
246            // but we don't need to check here whether they are sufficient or not.
247            // Next time during execution we will check again.
248            let _ = sender.send(FundsWithdrawStatus::MaybeSufficient);
249        });
250        ObjectFundsWithdrawStatus::Pending(receiver)
251    }
252
253    fn try_withdraw(
254        &self,
255        funds_read: &dyn AccountFundsRead,
256        object_withdraws: &BTreeMap<AccumulatorObjId, u128>,
257        accumulator_version: SequenceNumber,
258    ) -> bool {
259        for (obj_id, amount) in object_withdraws {
260            let funds = funds_read.get_account_amount_at_version(obj_id, accumulator_version);
261            // Reading inner without a top-level lock is safe because no two transactions can be withdrawing
262            // from the same account at the same time.
263            let unsettled_withdraw = self
264                .inner
265                .read()
266                .unsettled_withdraws
267                .get(obj_id)
268                .and_then(|withdraws| withdraws.get(&accumulator_version))
269                .copied()
270                .unwrap_or_default();
271            debug!(
272                ?obj_id,
273                ?funds,
274                ?accumulator_version,
275                ?unsettled_withdraw,
276                ?amount,
277                "Trying to withdraw"
278            );
279            assert!(funds >= unsettled_withdraw);
280            if funds - unsettled_withdraw < *amount {
281                return false;
282            }
283        }
284        let mut inner = self.inner.write();
285        for (obj_id, amount) in object_withdraws {
286            let entry = inner
287                .unsettled_withdraws
288                .entry(*obj_id)
289                .or_default()
290                .entry(accumulator_version)
291                .or_default();
292            debug!(?obj_id, ?amount, ?entry, "Updating unsettled withdraws");
293            *entry = entry.checked_add(*amount).unwrap();
294
295            inner
296                .unsettled_accounts
297                .entry(accumulator_version)
298                .or_default()
299                .insert(*obj_id);
300        }
301        self.metrics
302            .unsettled_accounts
303            .set(inner.unsettled_withdraws.len() as i64);
304        self.metrics
305            .unsettled_versions
306            .set(inner.unsettled_accounts.len() as i64);
307        true
308    }
309
310    pub fn settle_accumulator_version(&self, next_accumulator_version: SequenceNumber) {
311        // unwrap is safe because a receiver is always alive as part of self.
312        self.last_settled_version_sender
313            .send(next_accumulator_version)
314            .unwrap();
315        self.metrics
316            .highest_settled_version
317            .set(next_accumulator_version.value() as i64);
318    }
319
320    pub fn commit_effects<'a>(
321        &self,
322        committed_effects: impl Iterator<Item = &'a TransactionEffects>,
323    ) {
324        let committed_accumulator_versions = committed_effects
325            .filter_map(|effects| {
326                effects.object_changes().into_iter().find_map(|o| {
327                    if o.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
328                        o.input_version
329                    } else {
330                        None
331                    }
332                })
333            })
334            .collect::<Vec<_>>();
335        self.commit_accumulator_versions(committed_accumulator_versions);
336    }
337
338    fn commit_accumulator_versions(&self, committed_accumulator_versions: Vec<SequenceNumber>) {
339        let mut inner = self.inner.write();
340        for accumulator_version in committed_accumulator_versions {
341            let accounts = inner
342                .unsettled_accounts
343                .remove(&accumulator_version)
344                .unwrap_or_default();
345            for account in accounts {
346                let withdraws = inner.unsettled_withdraws.get_mut(&account);
347                if let Some(withdraws) = withdraws {
348                    withdraws.remove(&accumulator_version);
349                    if withdraws.is_empty() {
350                        inner.unsettled_withdraws.remove(&account);
351                    }
352                }
353            }
354        }
355        self.metrics
356            .unsettled_accounts
357            .set(inner.unsettled_withdraws.len() as i64);
358        self.metrics
359            .unsettled_versions
360            .set(inner.unsettled_accounts.len() as i64);
361    }
362
363    #[cfg(test)]
364    pub fn get_current_accumulator_version(&self) -> SequenceNumber {
365        *self.last_settled_version_receiver.borrow()
366    }
367
368    #[cfg(test)]
369    pub fn is_empty(&self) -> bool {
370        let inner = self.inner.read();
371        inner.unsettled_withdraws.is_empty() && inner.unsettled_accounts.is_empty()
372    }
373}