sui_replay/
batch_replay.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::replay::{ExecutionSandboxState, LocalExec};
5use crate::types::ReplayEngineError;
6use futures::FutureExt;
7use futures::future::join_all;
8use parking_lot::Mutex;
9use std::collections::VecDeque;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::atomic::AtomicUsize;
13use sui_config::node::ExpensiveSafetyCheckConfig;
14use sui_types::base_types::TransactionDigest;
15use tokio::time::Instant;
16use tracing::{error, info};
17
18/// Given a list of transaction digests, replay them in parallel using `num_tasks` tasks.
19/// If `terminate_early` is true, the replay will terminate early if any transaction fails;
20/// otherwise it will try to finish all transactions.
21pub async fn batch_replay(
22    tx_digests: impl Iterator<Item = TransactionDigest>,
23    num_tasks: u64,
24    rpc_url: String,
25    expensive_safety_check_config: ExpensiveSafetyCheckConfig,
26    use_authority: bool,
27    terminate_early: bool,
28    persist_path: Option<PathBuf>,
29) {
30    let provider = Arc::new(TransactionDigestProvider::new(tx_digests));
31    let cancel = tokio_util::sync::CancellationToken::new();
32    let mut tasks = vec![];
33    let cur_time = Instant::now();
34    for _ in 0..num_tasks {
35        let provider = provider.clone();
36        let expensive_safety_check_config = expensive_safety_check_config.clone();
37        let rpc_url_ref = rpc_url.as_ref();
38        let cancel = cancel.clone();
39        let persist_path_ref = persist_path.as_ref();
40        tasks.push(run_task(
41            provider,
42            rpc_url_ref,
43            expensive_safety_check_config,
44            use_authority,
45            terminate_early,
46            cancel,
47            persist_path_ref,
48        ));
49    }
50    let all_failed_transactions: Vec<_> = join_all(tasks).await.into_iter().flatten().collect();
51    info!(
52        "Finished replaying {} transactions, took {:?}",
53        provider.get_executed_count(),
54        cur_time.elapsed()
55    );
56    if all_failed_transactions.is_empty() {
57        info!("All transactions passed");
58    } else {
59        error!("Some transactions failed: {:?}", all_failed_transactions);
60    }
61}
62
63struct TransactionDigestProvider {
64    digests: Mutex<VecDeque<TransactionDigest>>,
65    total_count: usize,
66    executed_count: AtomicUsize,
67}
68
69impl TransactionDigestProvider {
70    pub fn new(digests: impl Iterator<Item = TransactionDigest>) -> Self {
71        let digests: VecDeque<_> = digests.collect();
72        let total_count = digests.len();
73        Self {
74            digests: Mutex::new(digests),
75            total_count,
76            executed_count: AtomicUsize::new(0),
77        }
78    }
79
80    pub fn get_total_count(&self) -> usize {
81        self.total_count
82    }
83
84    pub fn get_executed_count(&self) -> usize {
85        self.executed_count
86            .load(std::sync::atomic::Ordering::Relaxed)
87    }
88
89    /// Returns the index and digest of the next transaction, if any.
90    pub fn next_digest(&self) -> Option<(usize, TransactionDigest)> {
91        let next_digest = self.digests.lock().pop_front();
92        next_digest.map(|digest| {
93            let executed_count = self
94                .executed_count
95                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
96            (executed_count + 1, digest)
97        })
98    }
99}
100
101async fn run_task(
102    tx_digest_provider: Arc<TransactionDigestProvider>,
103    http_url: &str,
104    expensive_safety_check_config: ExpensiveSafetyCheckConfig,
105    use_authority: bool,
106    terminate_early: bool,
107    cancel: tokio_util::sync::CancellationToken,
108    persist_path: Option<&PathBuf>,
109) -> Vec<ReplayEngineError> {
110    let total_count = tx_digest_provider.get_total_count();
111    let mut failed_transactions = vec![];
112    let mut executor = LocalExec::new_from_fn_url(http_url).await.unwrap();
113    while let Some((index, digest)) = tx_digest_provider.next_digest() {
114        if cancel.is_cancelled() {
115            break;
116        }
117        info!(
118            "[{}/{}] Replaying transaction {:?}...",
119            index, total_count, digest
120        );
121        let sandbox_persist_path = persist_path.map(|path| path.join(format!("{}.json", digest,)));
122        if let Some(p) = sandbox_persist_path.as_ref()
123            && p.exists()
124        {
125            info!(
126                "Skipping transaction {:?} as it has been replayed before",
127                digest
128            );
129            continue;
130        }
131        let async_func = execute_transaction(
132            &mut executor,
133            &digest,
134            expensive_safety_check_config.clone(),
135            use_authority,
136        )
137        .fuse();
138        let result = tokio::select! {
139            result = async_func => result,
140            _ = cancel.cancelled() => {
141                break;
142            }
143        };
144        match result {
145            Err(err) => {
146                error!("Replaying transaction {:?} failed: {:?}", digest, err);
147                failed_transactions.push(err.clone());
148                if terminate_early {
149                    cancel.cancel();
150                    break;
151                }
152            }
153            Ok(sandbox_state) => {
154                info!("Replaying transaction {:?} succeeded", digest);
155                if let Some(p) = sandbox_persist_path {
156                    let out = serde_json::to_string(&sandbox_state).unwrap();
157                    std::fs::write(p, out).unwrap();
158                }
159            }
160        }
161    }
162    failed_transactions
163}
164
165async fn execute_transaction(
166    executor: &mut LocalExec,
167    digest: &TransactionDigest,
168    expensive_safety_check_config: ExpensiveSafetyCheckConfig,
169    use_authority: bool,
170) -> Result<ExecutionSandboxState, ReplayEngineError> {
171    *executor = loop {
172        match executor.clone().reset_for_new_execution_with_client().await {
173            Ok(executor) => break executor,
174            Err(err) => {
175                error!("Failed to reset executor: {:?}. Retrying in 3s", err);
176                tokio::time::sleep(std::time::Duration::from_secs(3)).await;
177            }
178        }
179    };
180    let sandbox_state = loop {
181        let result = executor
182            .execute_transaction(
183                digest,
184                expensive_safety_check_config.clone(),
185                use_authority,
186                None,
187                None,
188                None,
189            )
190            .await;
191        match result {
192            Ok(sandbox_state) => break sandbox_state,
193            err @ Err(ReplayEngineError::TransactionNotSupported { .. }) => {
194                return err;
195            }
196            Err(err) => {
197                error!("Failed to execute transaction: {:?}. Retrying in 3s", err);
198                tokio::time::sleep(std::time::Duration::from_secs(3)).await;
199            }
200        }
201    };
202    sandbox_state.check_effects()?;
203    Ok(sandbox_state)
204}