sui_replay/
batch_replay.rs1use crate::replay::{ExecutionSandboxState, LocalExec};
5use crate::types::ReplayEngineError;
6use futures::FutureExt;
7use futures::future::join_all;
8use parking_lot::Mutex;
9use std::collections::VecDeque;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::atomic::AtomicUsize;
13use sui_config::node::ExpensiveSafetyCheckConfig;
14use sui_types::base_types::TransactionDigest;
15use tokio::time::Instant;
16use tracing::{error, info};
17
18pub async fn batch_replay(
22 tx_digests: impl Iterator<Item = TransactionDigest>,
23 num_tasks: u64,
24 rpc_url: String,
25 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
26 use_authority: bool,
27 terminate_early: bool,
28 persist_path: Option<PathBuf>,
29) {
30 let provider = Arc::new(TransactionDigestProvider::new(tx_digests));
31 let cancel = tokio_util::sync::CancellationToken::new();
32 let mut tasks = vec![];
33 let cur_time = Instant::now();
34 for _ in 0..num_tasks {
35 let provider = provider.clone();
36 let expensive_safety_check_config = expensive_safety_check_config.clone();
37 let rpc_url_ref = rpc_url.as_ref();
38 let cancel = cancel.clone();
39 let persist_path_ref = persist_path.as_ref();
40 tasks.push(run_task(
41 provider,
42 rpc_url_ref,
43 expensive_safety_check_config,
44 use_authority,
45 terminate_early,
46 cancel,
47 persist_path_ref,
48 ));
49 }
50 let all_failed_transactions: Vec<_> = join_all(tasks).await.into_iter().flatten().collect();
51 info!(
52 "Finished replaying {} transactions, took {:?}",
53 provider.get_executed_count(),
54 cur_time.elapsed()
55 );
56 if all_failed_transactions.is_empty() {
57 info!("All transactions passed");
58 } else {
59 error!("Some transactions failed: {:?}", all_failed_transactions);
60 }
61}
62
63struct TransactionDigestProvider {
64 digests: Mutex<VecDeque<TransactionDigest>>,
65 total_count: usize,
66 executed_count: AtomicUsize,
67}
68
69impl TransactionDigestProvider {
70 pub fn new(digests: impl Iterator<Item = TransactionDigest>) -> Self {
71 let digests: VecDeque<_> = digests.collect();
72 let total_count = digests.len();
73 Self {
74 digests: Mutex::new(digests),
75 total_count,
76 executed_count: AtomicUsize::new(0),
77 }
78 }
79
80 pub fn get_total_count(&self) -> usize {
81 self.total_count
82 }
83
84 pub fn get_executed_count(&self) -> usize {
85 self.executed_count
86 .load(std::sync::atomic::Ordering::Relaxed)
87 }
88
89 pub fn next_digest(&self) -> Option<(usize, TransactionDigest)> {
91 let next_digest = self.digests.lock().pop_front();
92 next_digest.map(|digest| {
93 let executed_count = self
94 .executed_count
95 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
96 (executed_count + 1, digest)
97 })
98 }
99}
100
101async fn run_task(
102 tx_digest_provider: Arc<TransactionDigestProvider>,
103 http_url: &str,
104 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
105 use_authority: bool,
106 terminate_early: bool,
107 cancel: tokio_util::sync::CancellationToken,
108 persist_path: Option<&PathBuf>,
109) -> Vec<ReplayEngineError> {
110 let total_count = tx_digest_provider.get_total_count();
111 let mut failed_transactions = vec![];
112 let mut executor = LocalExec::new_from_fn_url(http_url).await.unwrap();
113 while let Some((index, digest)) = tx_digest_provider.next_digest() {
114 if cancel.is_cancelled() {
115 break;
116 }
117 info!(
118 "[{}/{}] Replaying transaction {:?}...",
119 index, total_count, digest
120 );
121 let sandbox_persist_path = persist_path.map(|path| path.join(format!("{}.json", digest,)));
122 if let Some(p) = sandbox_persist_path.as_ref()
123 && p.exists()
124 {
125 info!(
126 "Skipping transaction {:?} as it has been replayed before",
127 digest
128 );
129 continue;
130 }
131 let async_func = execute_transaction(
132 &mut executor,
133 &digest,
134 expensive_safety_check_config.clone(),
135 use_authority,
136 )
137 .fuse();
138 let result = tokio::select! {
139 result = async_func => result,
140 _ = cancel.cancelled() => {
141 break;
142 }
143 };
144 match result {
145 Err(err) => {
146 error!("Replaying transaction {:?} failed: {:?}", digest, err);
147 failed_transactions.push(err.clone());
148 if terminate_early {
149 cancel.cancel();
150 break;
151 }
152 }
153 Ok(sandbox_state) => {
154 info!("Replaying transaction {:?} succeeded", digest);
155 if let Some(p) = sandbox_persist_path {
156 let out = serde_json::to_string(&sandbox_state).unwrap();
157 std::fs::write(p, out).unwrap();
158 }
159 }
160 }
161 }
162 failed_transactions
163}
164
165async fn execute_transaction(
166 executor: &mut LocalExec,
167 digest: &TransactionDigest,
168 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
169 use_authority: bool,
170) -> Result<ExecutionSandboxState, ReplayEngineError> {
171 *executor = loop {
172 match executor.clone().reset_for_new_execution_with_client().await {
173 Ok(executor) => break executor,
174 Err(err) => {
175 error!("Failed to reset executor: {:?}. Retrying in 3s", err);
176 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
177 }
178 }
179 };
180 let sandbox_state = loop {
181 let result = executor
182 .execute_transaction(
183 digest,
184 expensive_safety_check_config.clone(),
185 use_authority,
186 None,
187 None,
188 None,
189 )
190 .await;
191 match result {
192 Ok(sandbox_state) => break sandbox_state,
193 err @ Err(ReplayEngineError::TransactionNotSupported { .. }) => {
194 return err;
195 }
196 Err(err) => {
197 error!("Failed to execute transaction: {:?}. Retrying in 3s", err);
198 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
199 }
200 }
201 };
202 sandbox_state.check_effects()?;
203 Ok(sandbox_state)
204}