1use async_recursion::async_recursion;
5use clap::Parser;
6use config::ReplayableNetworkConfigSet;
7use fuzz::ReplayFuzzer;
8use fuzz::ReplayFuzzerConfig;
9use fuzz_mutations::base_fuzzers;
10use std::cmp::max;
11use sui_types::base_types::ObjectID;
12use sui_types::base_types::SequenceNumber;
13use sui_types::digests::get_mainnet_chain_identifier;
14use sui_types::digests::get_testnet_chain_identifier;
15use sui_types::message_envelope::Message;
16use tracing::warn;
17use transaction_provider::{FuzzStartPoint, TransactionSource};
18
19use crate::config::get_rpc_url;
20use crate::replay::ExecutionSandboxState;
21use crate::replay::LocalExec;
22use crate::replay::ProtocolVersionSummary;
23use std::env;
24use std::io::BufRead;
25use std::path::PathBuf;
26use std::str::FromStr;
27use sui_config::node::ExpensiveSafetyCheckConfig;
28use sui_protocol_config::Chain;
29use sui_types::digests::TransactionDigest;
30use tracing::{error, info};
31
32pub mod batch_replay;
33pub mod config;
34mod data_fetcher;
35mod displays;
36pub mod fuzz;
37pub mod fuzz_mutations;
38mod replay;
39#[cfg(test)]
40mod tests;
41pub mod transaction_provider;
42pub mod types;
43
44static DEFAULT_SANDBOX_BASE_PATH: &str =
45 concat!(env!("CARGO_MANIFEST_DIR"), "/tests/sandbox_snapshots");
46
47#[derive(Parser, Clone)]
48#[command(rename_all = "kebab-case")]
49pub enum ReplayToolCommand {
50 #[command(name = "gen")]
52 GenerateDefaultConfig,
53
54 #[command(name = "ps")]
56 PersistSandbox {
57 #[arg(long, short)]
58 tx_digest: String,
59 #[arg(long, short, default_value = DEFAULT_SANDBOX_BASE_PATH)]
60 base_path: PathBuf,
61 },
62
63 #[command(name = "rs")]
66 ReplaySandbox {
67 #[arg(long, short)]
68 path: PathBuf,
69 },
70
71 #[command(name = "rp")]
73 ProfileTransaction {
74 #[arg(long, short)]
75 tx_digest: String,
76 #[arg(long, short, allow_hyphen_values = true)]
78 executor_version: Option<i64>,
79 #[arg(long, short, allow_hyphen_values = true)]
81 protocol_version: Option<i64>,
82 #[arg(long, short, allow_hyphen_values = true)]
84 profile_output: Option<PathBuf>,
85 #[arg(long, num_args = 2..)]
89 config_objects: Option<Vec<String>>,
90 },
91
92 #[command(name = "tx")]
94 ReplayTransaction {
95 #[arg(long, short)]
96 tx_digest: String,
97 #[arg(long, short)]
98 show_effects: bool,
99 #[arg(long, short, allow_hyphen_values = true)]
101 executor_version: Option<i64>,
102 #[arg(long, short, allow_hyphen_values = true)]
104 protocol_version: Option<i64>,
105 #[arg(long, num_args = 2..)]
109 config_objects: Option<Vec<String>>,
110 },
111
112 #[command(name = "rb")]
114 ReplayBatch {
115 #[arg(long, short)]
116 path: PathBuf,
117 #[arg(long, short)]
118 terminate_early: bool,
119 #[arg(
120 long,
121 short,
122 default_value = "16",
123 help = "Number of tasks to run in parallel"
124 )]
125 num_tasks: u64,
126 #[arg(
127 long,
128 help = "If provided, dump the state of the execution to a file in the given directory. \
129 This will allow faster replay next time."
130 )]
131 persist_path: Option<PathBuf>,
132 },
133
134 #[command(name = "rd")]
136 ReplayDump {
137 #[arg(long, short)]
138 path: String,
139 #[arg(long, short)]
140 show_effects: bool,
141 },
142
143 #[command(name = "brd")]
145 BatchReplayFromSandbox {
146 #[arg(
147 help = "The path to the directory that contains many JSON files, each representing a persisted sandbox.\
148 These files are typically generated by running the ReplayBatch command with --persist-path specified."
149 )]
150 path: String,
151 #[arg(
152 long,
153 short,
154 default_value = "64",
155 help = "Number of tasks to run in parallel"
156 )]
157 num_tasks: usize,
158 },
159
160 #[command(name = "ch")]
162 ReplayCheckpoints {
163 #[arg(long, short)]
164 start: u64,
165 #[arg(long, short)]
166 end: u64,
167 #[arg(long, short)]
168 terminate_early: bool,
169 #[arg(long, short, default_value = "16")]
170 max_tasks: u64,
171 },
172
173 #[command(name = "ep")]
175 ReplayEpoch {
176 #[arg(long, short)]
177 epoch: u64,
178 #[arg(long, short)]
179 terminate_early: bool,
180 #[arg(long, short, default_value = "16")]
181 max_tasks: u64,
182 },
183
184 #[command(name = "fz")]
186 Fuzz {
187 #[arg(long, short)]
188 start: Option<FuzzStartPoint>,
189 #[arg(long, short)]
190 num_mutations_per_base: u64,
191 #[arg(long, short = 'b', default_value = "18446744073709551614")]
192 num_base_transactions: u64,
193 },
194
195 #[command(name = "report")]
196 Report,
197}
198
199#[async_recursion]
200pub async fn execute_replay_command(
201 rpc_url: Option<String>,
202 safety_checks: bool,
203 use_authority: bool,
204 cfg_path: Option<PathBuf>,
205 chain: Option<String>,
206 cmd: ReplayToolCommand,
207) -> anyhow::Result<Option<(u64, u64)>> {
208 let safety = if safety_checks {
209 ExpensiveSafetyCheckConfig::new_enable_all()
210 } else {
211 ExpensiveSafetyCheckConfig::default()
212 };
213 Ok(match cmd {
214 ReplayToolCommand::ReplaySandbox { path } => {
215 let contents = std::fs::read_to_string(path)?;
216 let sandbox_state: ExecutionSandboxState = serde_json::from_str(&contents)?;
217 info!("Executing tx: {}", sandbox_state.transaction_info.tx_digest);
218 let sandbox_state =
219 LocalExec::certificate_execute_with_sandbox_state(&sandbox_state).await?;
220 sandbox_state.check_effects()?;
221 info!("Execution finished successfully. Local and on-chain effects match.");
222 None
223 }
224 ReplayToolCommand::PersistSandbox {
225 tx_digest,
226 base_path,
227 } => {
228 let tx_digest = TransactionDigest::from_str(&tx_digest)?;
229 info!("Executing tx: {}", tx_digest);
230 let sandbox_state = LocalExec::replay_with_network_config(
231 get_rpc_url(rpc_url, cfg_path, chain)?,
232 tx_digest,
233 safety,
234 use_authority,
235 None,
236 None,
237 None,
238 )
239 .await?;
240
241 let out = serde_json::to_string(&sandbox_state).unwrap();
242 let path = base_path.join(format!("{}.json", tx_digest));
243 std::fs::write(path, out)?;
244 None
245 }
246 ReplayToolCommand::GenerateDefaultConfig => {
247 let set = ReplayableNetworkConfigSet::default();
248 let path = set.save_config(None).unwrap();
249 println!("Default config saved to: {}", path.to_str().unwrap());
250 warn!("Note: default config nodes might prune epochs/objects");
251 None
252 }
253 ReplayToolCommand::Fuzz {
254 start,
255 num_mutations_per_base,
256 num_base_transactions,
257 } => {
258 let config = ReplayFuzzerConfig {
259 num_mutations_per_base,
260 mutator: Box::new(base_fuzzers(num_mutations_per_base)),
261 tx_source: TransactionSource::TailLatest { start },
262 fail_over_on_err: false,
263 expensive_safety_check_config: Default::default(),
264 };
265 let fuzzer = ReplayFuzzer::new(get_rpc_url(rpc_url, cfg_path, chain)?, config)
266 .await
267 .unwrap();
268 fuzzer.run(num_base_transactions).await.unwrap();
269 None
270 }
271 ReplayToolCommand::ReplayDump { path, show_effects } => {
272 let mut lx = LocalExec::new_for_state_dump(&path, rpc_url).await?;
273 let (sandbox_state, node_dump_state) = lx.execute_state_dump(safety).await?;
274 if show_effects {
275 println!("{:#?}", sandbox_state.local_exec_effects);
276 }
277
278 sandbox_state.check_effects()?;
279
280 let effects = node_dump_state.computed_effects.digest();
281 if effects != node_dump_state.expected_effects_digest {
282 error!(
283 "Effects digest mismatch for {}: expected: {:?}, got: {:?}",
284 node_dump_state.tx_digest, node_dump_state.expected_effects_digest, effects,
285 );
286 anyhow::bail!("Effects mismatch");
287 }
288
289 info!("Execution finished successfully. Local and on-chain effects match.");
290 Some((1u64, 1u64))
291 }
292 ReplayToolCommand::ReplayBatch {
293 path,
294 terminate_early,
295 num_tasks,
296 persist_path,
297 } => {
298 let file = std::fs::File::open(path).unwrap();
299 let buf_reader = std::io::BufReader::new(file);
300 let digests = buf_reader.lines().map(|line| {
301 let line = line.unwrap();
302 TransactionDigest::from_str(&line).unwrap_or_else(|err| {
303 panic!("Error parsing tx digest {:?}: {:?}", line, err);
304 })
305 });
306 batch_replay::batch_replay(
307 digests,
308 num_tasks,
309 get_rpc_url(rpc_url, cfg_path, chain)?,
310 safety,
311 use_authority,
312 terminate_early,
313 persist_path,
314 )
315 .await;
316
317 Some((0u64, 0u64))
319 }
320 ReplayToolCommand::BatchReplayFromSandbox { path, num_tasks } => {
321 let files: Vec<_> = std::fs::read_dir(path)?
322 .filter_map(|entry| {
323 let path = entry.ok()?.path();
324 if path.is_file() {
325 path.to_str().map(|p| p.to_owned())
326 } else {
327 None
328 }
329 })
330 .collect();
331 info!("Replaying {} files", files.len());
332 let chunks = files.chunks(max(files.len() / num_tasks, 1));
333 let tasks = chunks.into_iter().map(|chunk| async move {
334 for file in chunk {
335 info!("Replaying from state dump file {}", file);
336 let contents = std::fs::read_to_string(file).unwrap();
337 let sandbox_state: ExecutionSandboxState =
338 serde_json::from_str(&contents).unwrap();
339 let sandbox_state =
340 LocalExec::certificate_execute_with_sandbox_state(&sandbox_state)
341 .await
342 .unwrap();
343 sandbox_state.check_effects().unwrap();
344 }
345 });
346 futures::future::join_all(tasks).await;
347
348 Some((0u64, 0u64))
350 }
351 ReplayToolCommand::ProfileTransaction {
352 tx_digest,
353 executor_version,
354 protocol_version,
355 profile_output: _,
356 config_objects,
357 } => {
358 let tx_digest = TransactionDigest::from_str(&tx_digest)?;
359 info!("Executing tx: {}", tx_digest);
360 let _sandbox_state = LocalExec::replay_with_network_config(
361 get_rpc_url(rpc_url, cfg_path, chain)?,
362 tx_digest,
363 safety,
364 use_authority,
365 executor_version,
366 protocol_version,
367 parse_configs_versions(config_objects),
368 )
369 .await?;
370
371 println!("Execution finished successfully.");
372 Some((1u64, 1u64))
373 }
374
375 ReplayToolCommand::ReplayTransaction {
376 tx_digest,
377 show_effects,
378 executor_version,
379 protocol_version,
380 config_objects,
381 } => {
382 let tx_digest = TransactionDigest::from_str(&tx_digest)?;
383 info!("Executing tx: {}", tx_digest);
384 let sandbox_state = LocalExec::replay_with_network_config(
385 get_rpc_url(rpc_url, cfg_path, chain)?,
386 tx_digest,
387 safety,
388 use_authority,
389 executor_version,
390 protocol_version,
391 parse_configs_versions(config_objects),
392 )
393 .await?;
394
395 if show_effects {
396 println!("{}", sandbox_state.local_exec_effects);
397 }
398
399 sandbox_state.check_effects()?;
400
401 println!("Execution finished successfully. Local and on-chain effects match.");
402 Some((1u64, 1u64))
403 }
404
405 ReplayToolCommand::Report => {
406 let mut lx =
407 LocalExec::new_from_fn_url(&rpc_url.expect("Url must be provided")).await?;
408 let epoch_table = lx.protocol_ver_to_epoch_map().await?;
409
410 lx.current_protocol_version = *epoch_table.keys().peekable().last().unwrap();
412
413 println!(
414 " Protocol Version | Epoch Change TX | Epoch Range | Checkpoint Range "
415 );
416 println!(
417 "---------------------------------------------------------------------------------------------------------------"
418 );
419
420 for (
421 protocol_version,
422 ProtocolVersionSummary {
423 epoch_change_tx: tx_digest,
424 epoch_start: start_epoch,
425 epoch_end: end_epoch,
426 checkpoint_start,
427 checkpoint_end,
428 ..
429 },
430 ) in epoch_table
431 {
432 println!(
433 " {:^16} | {:^43} | {:^10}-{:^10}| {:^10}-{:^10} ",
434 protocol_version,
435 tx_digest,
436 start_epoch,
437 end_epoch,
438 checkpoint_start.unwrap_or(u64::MAX),
439 checkpoint_end.unwrap_or(u64::MAX)
440 );
441 }
442
443 lx.populate_protocol_version_tables().await?;
444 for x in lx.protocol_version_system_package_table {
445 println!("Protocol version: {}", x.0);
446 for (package_id, seq_num) in x.1 {
447 println!("Package: {} Seq: {}", package_id, seq_num);
448 }
449 }
450 None
451 }
452
453 ReplayToolCommand::ReplayCheckpoints {
454 start,
455 end,
456 terminate_early,
457 max_tasks,
458 } => {
459 assert!(start <= end, "Start checkpoint must be <= end checkpoint");
460 assert!(max_tasks > 0, "Max tasks must be > 0");
461 let checkpoints_per_task = ((end - start + max_tasks) / max_tasks) as usize;
462 let mut handles = vec![];
463 info!(
464 "Executing checkpoints {} to {} with at most {} tasks and at most {} checkpoints per task",
465 start, end, max_tasks, checkpoints_per_task
466 );
467
468 let range: Vec<_> = (start..=end).collect();
469 for (task_count, checkpoints) in range.chunks(checkpoints_per_task).enumerate() {
470 let checkpoints = checkpoints.to_vec();
471 let rpc_url = rpc_url.clone();
472 let safety = safety.clone();
473 handles.push(tokio::spawn(async move {
474 info!("Spawning task {task_count} for checkpoints {checkpoints:?}");
475 let time = std::time::Instant::now();
476 let (succeeded, total) = LocalExec::new_from_fn_url(&rpc_url.expect("Url must be provided"))
477 .await
478 .unwrap()
479 .init_for_execution()
480 .await
481 .unwrap()
482 .execute_all_in_checkpoints(&checkpoints, &safety, terminate_early, use_authority)
483 .await
484 .unwrap();
485 let time = time.elapsed();
486 info!(
487 "Task {task_count}: executed checkpoints {:?} @ {} total transactions, {} succeeded",
488 checkpoints, total, succeeded
489 );
490 (succeeded, total, time)
491 }));
492 }
493
494 let mut total_tx = 0;
495 let mut total_time_ms = 0;
496 let mut total_succeeded = 0;
497 futures::future::join_all(handles)
498 .await
499 .into_iter()
500 .for_each(|x| match x {
501 Ok((succeeded, total, time)) => {
502 total_tx += total;
503 total_time_ms += time.as_millis() as u64;
504 total_succeeded += succeeded;
505 }
506 Err(e) => {
507 error!("Task failed: {:?}", e);
508 }
509 });
510 info!(
511 "Executed {} checkpoints @ {}/{} total TXs succeeded in {} ms ({}) avg TX/s",
512 end - start + 1,
513 total_succeeded,
514 total_tx,
515 total_time_ms,
516 (total_tx as f64) / (total_time_ms as f64 / 1000.0)
517 );
518 Some((total_succeeded, total_tx))
519 }
520 ReplayToolCommand::ReplayEpoch {
521 epoch,
522 terminate_early,
523 max_tasks,
524 } => {
525 let lx =
526 LocalExec::new_from_fn_url(&rpc_url.clone().expect("Url must be provided")).await?;
527
528 let (start, end) = lx.checkpoints_for_epoch(epoch).await?;
529
530 info!(
531 "Executing epoch {} (checkpoint range {}-{}) with at most {} tasks",
532 epoch, start, end, max_tasks
533 );
534 let status = execute_replay_command(
535 rpc_url,
536 safety_checks,
537 use_authority,
538 cfg_path,
539 chain,
540 ReplayToolCommand::ReplayCheckpoints {
541 start,
542 end,
543 terminate_early,
544 max_tasks,
545 },
546 )
547 .await;
548 match status {
549 Ok(Some((succeeded, total))) => {
550 info!(
551 "Epoch {} replay finished {} out of {} TXs",
552 epoch, succeeded, total
553 );
554
555 return Ok(Some((succeeded, total)));
556 }
557 Ok(None) => {
558 return Ok(None);
559 }
560 Err(e) => {
561 error!("Epoch {} replay failed: {:?}", epoch, e);
562 return Err(e);
563 }
564 }
565 }
566 })
567}
568
569pub(crate) fn chain_from_chain_id(chain: &str) -> Chain {
570 let mainnet_chain_id = format!("{}", get_mainnet_chain_identifier());
571 let testnet_chain_id = format!("{}", get_testnet_chain_identifier());
574
575 if mainnet_chain_id == chain {
576 Chain::Mainnet
577 } else if testnet_chain_id == chain {
578 Chain::Testnet
579 } else {
580 Chain::Unknown
581 }
582}
583
584fn parse_configs_versions(
585 configs_and_versions: Option<Vec<String>>,
586) -> Option<Vec<(ObjectID, SequenceNumber)>> {
587 let configs_and_versions = configs_and_versions?;
588
589 assert!(
590 configs_and_versions.len() % 2 == 0,
591 "Invalid number of arguments for configs and version -- you must supply a version for each config"
592 );
593 Some(
594 configs_and_versions
595 .chunks_exact(2)
596 .map(|chunk| {
597 let object_id =
598 ObjectID::from_str(&chunk[0]).expect("Invalid object id for config");
599 let object_version = SequenceNumber::from_u64(
600 chunk[1]
601 .parse::<u64>()
602 .expect("Invalid object version for config"),
603 );
604 (object_id, object_version)
605 })
606 .collect(),
607 )
608}