sui_replay/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_recursion::async_recursion;
5use clap::Parser;
6use config::ReplayableNetworkConfigSet;
7use fuzz::ReplayFuzzer;
8use fuzz::ReplayFuzzerConfig;
9use fuzz_mutations::base_fuzzers;
10use std::cmp::max;
11use sui_types::base_types::ObjectID;
12use sui_types::base_types::SequenceNumber;
13use sui_types::digests::get_mainnet_chain_identifier;
14use sui_types::digests::get_testnet_chain_identifier;
15use sui_types::message_envelope::Message;
16use tracing::warn;
17use transaction_provider::{FuzzStartPoint, TransactionSource};
18
19use crate::config::get_rpc_url;
20use crate::replay::ExecutionSandboxState;
21use crate::replay::LocalExec;
22use crate::replay::ProtocolVersionSummary;
23use std::env;
24use std::io::BufRead;
25use std::path::PathBuf;
26use std::str::FromStr;
27use sui_config::node::ExpensiveSafetyCheckConfig;
28use sui_protocol_config::Chain;
29use sui_types::digests::TransactionDigest;
30use tracing::{error, info};
31
32pub mod batch_replay;
33pub mod config;
34mod data_fetcher;
35mod displays;
36pub mod fuzz;
37pub mod fuzz_mutations;
38mod replay;
39#[cfg(test)]
40mod tests;
41pub mod transaction_provider;
42pub mod types;
43
44static DEFAULT_SANDBOX_BASE_PATH: &str =
45    concat!(env!("CARGO_MANIFEST_DIR"), "/tests/sandbox_snapshots");
46
47#[derive(Parser, Clone)]
48#[command(rename_all = "kebab-case")]
49pub enum ReplayToolCommand {
50    /// Generate a new network config file
51    #[command(name = "gen")]
52    GenerateDefaultConfig,
53
54    /// Persist sandbox state
55    #[command(name = "ps")]
56    PersistSandbox {
57        #[arg(long, short)]
58        tx_digest: String,
59        #[arg(long, short, default_value = DEFAULT_SANDBOX_BASE_PATH)]
60        base_path: PathBuf,
61    },
62
63    /// Replay from sandbox state file
64    /// This is a completely local execution
65    #[command(name = "rs")]
66    ReplaySandbox {
67        #[arg(long, short)]
68        path: PathBuf,
69    },
70
71    /// Profile transaction
72    #[command(name = "rp")]
73    ProfileTransaction {
74        #[arg(long, short)]
75        tx_digest: String,
76        /// Optional version of the executor to use, if not specified defaults to the one originally used for the transaction.
77        #[arg(long, short, allow_hyphen_values = true)]
78        executor_version: Option<i64>,
79        /// Optional protocol version to use, if not specified defaults to the one originally used for the transaction.
80        #[arg(long, short, allow_hyphen_values = true)]
81        protocol_version: Option<i64>,
82        /// Optional output filepath for the profile generated by this run, if not specified defaults to `gas_profile_{tx_digest}_{unix_timestamp}.json in the working directory.
83        #[arg(long, short, allow_hyphen_values = true)]
84        profile_output: Option<PathBuf>,
85        /// Required config objects and versions of the config objects to use if replaying a
86        /// transaction that utilizes the config object for regulated coin types and that has been
87        /// denied.
88        #[arg(long, num_args = 2..)]
89        config_objects: Option<Vec<String>>,
90    },
91
92    /// Replay transaction
93    #[command(name = "tx")]
94    ReplayTransaction {
95        #[arg(long, short)]
96        tx_digest: String,
97        #[arg(long, short)]
98        show_effects: bool,
99        /// Optional version of the executor to use, if not specified defaults to the one originally used for the transaction.
100        #[arg(long, short, allow_hyphen_values = true)]
101        executor_version: Option<i64>,
102        /// Optional protocol version to use, if not specified defaults to the one originally used for the transaction.
103        #[arg(long, short, allow_hyphen_values = true)]
104        protocol_version: Option<i64>,
105        /// Required config objects and versions of the config objects to use if replaying a
106        /// transaction that utilizes the config object for regulated coin types and that has been
107        /// denied.
108        #[arg(long, num_args = 2..)]
109        config_objects: Option<Vec<String>>,
110    },
111
112    /// Replay transactions listed in a file
113    #[command(name = "rb")]
114    ReplayBatch {
115        #[arg(long, short)]
116        path: PathBuf,
117        #[arg(long, short)]
118        terminate_early: bool,
119        #[arg(
120            long,
121            short,
122            default_value = "16",
123            help = "Number of tasks to run in parallel"
124        )]
125        num_tasks: u64,
126        #[arg(
127            long,
128            help = "If provided, dump the state of the execution to a file in the given directory. \
129            This will allow faster replay next time."
130        )]
131        persist_path: Option<PathBuf>,
132    },
133
134    /// Replay a transaction from a node state dump
135    #[command(name = "rd")]
136    ReplayDump {
137        #[arg(long, short)]
138        path: String,
139        #[arg(long, short)]
140        show_effects: bool,
141    },
142
143    /// Replay multiple transactions from JSON files that contain the sandbox persisted state.
144    #[command(name = "brd")]
145    BatchReplayFromSandbox {
146        #[arg(
147            help = "The path to the directory that contains many JSON files, each representing a persisted sandbox.\
148            These files are typically generated by running the ReplayBatch command with --persist-path specified."
149        )]
150        path: String,
151        #[arg(
152            long,
153            short,
154            default_value = "64",
155            help = "Number of tasks to run in parallel"
156        )]
157        num_tasks: usize,
158    },
159
160    /// Replay all transactions in a range of checkpoints
161    #[command(name = "ch")]
162    ReplayCheckpoints {
163        #[arg(long, short)]
164        start: u64,
165        #[arg(long, short)]
166        end: u64,
167        #[arg(long, short)]
168        terminate_early: bool,
169        #[arg(long, short, default_value = "16")]
170        max_tasks: u64,
171    },
172
173    /// Replay all transactions in an epoch
174    #[command(name = "ep")]
175    ReplayEpoch {
176        #[arg(long, short)]
177        epoch: u64,
178        #[arg(long, short)]
179        terminate_early: bool,
180        #[arg(long, short, default_value = "16")]
181        max_tasks: u64,
182    },
183
184    /// Run the replay based fuzzer
185    #[command(name = "fz")]
186    Fuzz {
187        #[arg(long, short)]
188        start: Option<FuzzStartPoint>,
189        #[arg(long, short)]
190        num_mutations_per_base: u64,
191        #[arg(long, short = 'b', default_value = "18446744073709551614")]
192        num_base_transactions: u64,
193    },
194
195    #[command(name = "report")]
196    Report,
197}
198
199#[async_recursion]
200pub async fn execute_replay_command(
201    rpc_url: Option<String>,
202    safety_checks: bool,
203    use_authority: bool,
204    cfg_path: Option<PathBuf>,
205    chain: Option<String>,
206    cmd: ReplayToolCommand,
207) -> anyhow::Result<Option<(u64, u64)>> {
208    let safety = if safety_checks {
209        ExpensiveSafetyCheckConfig::new_enable_all()
210    } else {
211        ExpensiveSafetyCheckConfig::default()
212    };
213    Ok(match cmd {
214        ReplayToolCommand::ReplaySandbox { path } => {
215            let contents = std::fs::read_to_string(path)?;
216            let sandbox_state: ExecutionSandboxState = serde_json::from_str(&contents)?;
217            info!("Executing tx: {}", sandbox_state.transaction_info.tx_digest);
218            let sandbox_state =
219                LocalExec::certificate_execute_with_sandbox_state(&sandbox_state).await?;
220            sandbox_state.check_effects()?;
221            info!("Execution finished successfully. Local and on-chain effects match.");
222            None
223        }
224        ReplayToolCommand::PersistSandbox {
225            tx_digest,
226            base_path,
227        } => {
228            let tx_digest = TransactionDigest::from_str(&tx_digest)?;
229            info!("Executing tx: {}", tx_digest);
230            let sandbox_state = LocalExec::replay_with_network_config(
231                get_rpc_url(rpc_url, cfg_path, chain)?,
232                tx_digest,
233                safety,
234                use_authority,
235                None,
236                None,
237                None,
238            )
239            .await?;
240
241            let out = serde_json::to_string(&sandbox_state).unwrap();
242            let path = base_path.join(format!("{}.json", tx_digest));
243            std::fs::write(path, out)?;
244            None
245        }
246        ReplayToolCommand::GenerateDefaultConfig => {
247            let set = ReplayableNetworkConfigSet::default();
248            let path = set.save_config(None).unwrap();
249            println!("Default config saved to: {}", path.to_str().unwrap());
250            warn!("Note: default config nodes might prune epochs/objects");
251            None
252        }
253        ReplayToolCommand::Fuzz {
254            start,
255            num_mutations_per_base,
256            num_base_transactions,
257        } => {
258            let config = ReplayFuzzerConfig {
259                num_mutations_per_base,
260                mutator: Box::new(base_fuzzers(num_mutations_per_base)),
261                tx_source: TransactionSource::TailLatest { start },
262                fail_over_on_err: false,
263                expensive_safety_check_config: Default::default(),
264            };
265            let fuzzer = ReplayFuzzer::new(get_rpc_url(rpc_url, cfg_path, chain)?, config)
266                .await
267                .unwrap();
268            fuzzer.run(num_base_transactions).await.unwrap();
269            None
270        }
271        ReplayToolCommand::ReplayDump { path, show_effects } => {
272            let mut lx = LocalExec::new_for_state_dump(&path, rpc_url).await?;
273            let (sandbox_state, node_dump_state) = lx.execute_state_dump(safety).await?;
274            if show_effects {
275                println!("{:#?}", sandbox_state.local_exec_effects);
276            }
277
278            sandbox_state.check_effects()?;
279
280            let effects = node_dump_state.computed_effects.digest();
281            if effects != node_dump_state.expected_effects_digest {
282                error!(
283                    "Effects digest mismatch for {}: expected: {:?}, got: {:?}",
284                    node_dump_state.tx_digest, node_dump_state.expected_effects_digest, effects,
285                );
286                anyhow::bail!("Effects mismatch");
287            }
288
289            info!("Execution finished successfully. Local and on-chain effects match.");
290            Some((1u64, 1u64))
291        }
292        ReplayToolCommand::ReplayBatch {
293            path,
294            terminate_early,
295            num_tasks,
296            persist_path,
297        } => {
298            let file = std::fs::File::open(path).unwrap();
299            let buf_reader = std::io::BufReader::new(file);
300            let digests = buf_reader.lines().map(|line| {
301                let line = line.unwrap();
302                TransactionDigest::from_str(&line).unwrap_or_else(|err| {
303                    panic!("Error parsing tx digest {:?}: {:?}", line, err);
304                })
305            });
306            batch_replay::batch_replay(
307                digests,
308                num_tasks,
309                get_rpc_url(rpc_url, cfg_path, chain)?,
310                safety,
311                use_authority,
312                terminate_early,
313                persist_path,
314            )
315            .await;
316
317            // TODO: clean this up
318            Some((0u64, 0u64))
319        }
320        ReplayToolCommand::BatchReplayFromSandbox { path, num_tasks } => {
321            let files: Vec<_> = std::fs::read_dir(path)?
322                .filter_map(|entry| {
323                    let path = entry.ok()?.path();
324                    if path.is_file() {
325                        path.to_str().map(|p| p.to_owned())
326                    } else {
327                        None
328                    }
329                })
330                .collect();
331            info!("Replaying {} files", files.len());
332            let chunks = files.chunks(max(files.len() / num_tasks, 1));
333            let tasks = chunks.into_iter().map(|chunk| async move {
334                for file in chunk {
335                    info!("Replaying from state dump file {}", file);
336                    let contents = std::fs::read_to_string(file).unwrap();
337                    let sandbox_state: ExecutionSandboxState =
338                        serde_json::from_str(&contents).unwrap();
339                    let sandbox_state =
340                        LocalExec::certificate_execute_with_sandbox_state(&sandbox_state)
341                            .await
342                            .unwrap();
343                    sandbox_state.check_effects().unwrap();
344                }
345            });
346            futures::future::join_all(tasks).await;
347
348            // TODO: WTF is this
349            Some((0u64, 0u64))
350        }
351        ReplayToolCommand::ProfileTransaction {
352            tx_digest,
353            executor_version,
354            protocol_version,
355            profile_output: _,
356            config_objects,
357        } => {
358            let tx_digest = TransactionDigest::from_str(&tx_digest)?;
359            info!("Executing tx: {}", tx_digest);
360            let _sandbox_state = LocalExec::replay_with_network_config(
361                get_rpc_url(rpc_url, cfg_path, chain)?,
362                tx_digest,
363                safety,
364                use_authority,
365                executor_version,
366                protocol_version,
367                parse_configs_versions(config_objects),
368            )
369            .await?;
370
371            println!("Execution finished successfully.");
372            Some((1u64, 1u64))
373        }
374
375        ReplayToolCommand::ReplayTransaction {
376            tx_digest,
377            show_effects,
378            executor_version,
379            protocol_version,
380            config_objects,
381        } => {
382            let tx_digest = TransactionDigest::from_str(&tx_digest)?;
383            info!("Executing tx: {}", tx_digest);
384            let sandbox_state = LocalExec::replay_with_network_config(
385                get_rpc_url(rpc_url, cfg_path, chain)?,
386                tx_digest,
387                safety,
388                use_authority,
389                executor_version,
390                protocol_version,
391                parse_configs_versions(config_objects),
392            )
393            .await?;
394
395            if show_effects {
396                println!("{}", sandbox_state.local_exec_effects);
397            }
398
399            sandbox_state.check_effects()?;
400
401            println!("Execution finished successfully. Local and on-chain effects match.");
402            Some((1u64, 1u64))
403        }
404
405        ReplayToolCommand::Report => {
406            let mut lx =
407                LocalExec::new_from_fn_url(&rpc_url.expect("Url must be provided")).await?;
408            let epoch_table = lx.protocol_ver_to_epoch_map().await?;
409
410            // We need this for other activities in this session
411            lx.current_protocol_version = *epoch_table.keys().peekable().last().unwrap();
412
413            println!(
414                "  Protocol Version  |                Epoch Change TX               |      Epoch Range     |   Checkpoint Range   "
415            );
416            println!(
417                "---------------------------------------------------------------------------------------------------------------"
418            );
419
420            for (
421                protocol_version,
422                ProtocolVersionSummary {
423                    epoch_change_tx: tx_digest,
424                    epoch_start: start_epoch,
425                    epoch_end: end_epoch,
426                    checkpoint_start,
427                    checkpoint_end,
428                    ..
429                },
430            ) in epoch_table
431            {
432                println!(
433                    " {:^16}   | {:^43} | {:^10}-{:^10}| {:^10}-{:^10} ",
434                    protocol_version,
435                    tx_digest,
436                    start_epoch,
437                    end_epoch,
438                    checkpoint_start.unwrap_or(u64::MAX),
439                    checkpoint_end.unwrap_or(u64::MAX)
440                );
441            }
442
443            lx.populate_protocol_version_tables().await?;
444            for x in lx.protocol_version_system_package_table {
445                println!("Protocol version: {}", x.0);
446                for (package_id, seq_num) in x.1 {
447                    println!("Package: {} Seq: {}", package_id, seq_num);
448                }
449            }
450            None
451        }
452
453        ReplayToolCommand::ReplayCheckpoints {
454            start,
455            end,
456            terminate_early,
457            max_tasks,
458        } => {
459            assert!(start <= end, "Start checkpoint must be <= end checkpoint");
460            assert!(max_tasks > 0, "Max tasks must be > 0");
461            let checkpoints_per_task = ((end - start + max_tasks) / max_tasks) as usize;
462            let mut handles = vec![];
463            info!(
464                "Executing checkpoints {} to {} with at most {} tasks and at most {} checkpoints per task",
465                start, end, max_tasks, checkpoints_per_task
466            );
467
468            let range: Vec<_> = (start..=end).collect();
469            for (task_count, checkpoints) in range.chunks(checkpoints_per_task).enumerate() {
470                let checkpoints = checkpoints.to_vec();
471                let rpc_url = rpc_url.clone();
472                let safety = safety.clone();
473                handles.push(tokio::spawn(async move {
474                    info!("Spawning task {task_count} for checkpoints {checkpoints:?}");
475                    let time = std::time::Instant::now();
476                    let (succeeded, total) = LocalExec::new_from_fn_url(&rpc_url.expect("Url must be provided"))
477                        .await
478                        .unwrap()
479                        .init_for_execution()
480                        .await
481                        .unwrap()
482                        .execute_all_in_checkpoints(&checkpoints, &safety, terminate_early, use_authority)
483                        .await
484                        .unwrap();
485                    let time = time.elapsed();
486                    info!(
487                        "Task {task_count}: executed checkpoints {:?} @ {} total transactions, {} succeeded",
488                        checkpoints, total, succeeded
489                    );
490                    (succeeded, total, time)
491                }));
492            }
493
494            let mut total_tx = 0;
495            let mut total_time_ms = 0;
496            let mut total_succeeded = 0;
497            futures::future::join_all(handles)
498                .await
499                .into_iter()
500                .for_each(|x| match x {
501                    Ok((succeeded, total, time)) => {
502                        total_tx += total;
503                        total_time_ms += time.as_millis() as u64;
504                        total_succeeded += succeeded;
505                    }
506                    Err(e) => {
507                        error!("Task failed: {:?}", e);
508                    }
509                });
510            info!(
511                "Executed {} checkpoints @ {}/{} total TXs succeeded in {} ms ({}) avg TX/s",
512                end - start + 1,
513                total_succeeded,
514                total_tx,
515                total_time_ms,
516                (total_tx as f64) / (total_time_ms as f64 / 1000.0)
517            );
518            Some((total_succeeded, total_tx))
519        }
520        ReplayToolCommand::ReplayEpoch {
521            epoch,
522            terminate_early,
523            max_tasks,
524        } => {
525            let lx =
526                LocalExec::new_from_fn_url(&rpc_url.clone().expect("Url must be provided")).await?;
527
528            let (start, end) = lx.checkpoints_for_epoch(epoch).await?;
529
530            info!(
531                "Executing epoch {} (checkpoint range {}-{}) with at most {} tasks",
532                epoch, start, end, max_tasks
533            );
534            let status = execute_replay_command(
535                rpc_url,
536                safety_checks,
537                use_authority,
538                cfg_path,
539                chain,
540                ReplayToolCommand::ReplayCheckpoints {
541                    start,
542                    end,
543                    terminate_early,
544                    max_tasks,
545                },
546            )
547            .await;
548            match status {
549                Ok(Some((succeeded, total))) => {
550                    info!(
551                        "Epoch {} replay finished {} out of {} TXs",
552                        epoch, succeeded, total
553                    );
554
555                    return Ok(Some((succeeded, total)));
556                }
557                Ok(None) => {
558                    return Ok(None);
559                }
560                Err(e) => {
561                    error!("Epoch {} replay failed: {:?}", epoch, e);
562                    return Err(e);
563                }
564            }
565        }
566    })
567}
568
569pub(crate) fn chain_from_chain_id(chain: &str) -> Chain {
570    let mainnet_chain_id = format!("{}", get_mainnet_chain_identifier());
571    // TODO: Since testnet periodically resets, we need to ensure that the chain id
572    // is updated to the latest one.
573    let testnet_chain_id = format!("{}", get_testnet_chain_identifier());
574
575    if mainnet_chain_id == chain {
576        Chain::Mainnet
577    } else if testnet_chain_id == chain {
578        Chain::Testnet
579    } else {
580        Chain::Unknown
581    }
582}
583
584fn parse_configs_versions(
585    configs_and_versions: Option<Vec<String>>,
586) -> Option<Vec<(ObjectID, SequenceNumber)>> {
587    let configs_and_versions = configs_and_versions?;
588
589    assert!(
590        configs_and_versions.len() % 2 == 0,
591        "Invalid number of arguments for configs and version -- you must supply a version for each config"
592    );
593    Some(
594        configs_and_versions
595            .chunks_exact(2)
596            .map(|chunk| {
597                let object_id =
598                    ObjectID::from_str(&chunk[0]).expect("Invalid object id for config");
599                let object_version = SequenceNumber::from_u64(
600                    chunk[1]
601                        .parse::<u64>()
602                        .expect("Invalid object version for config"),
603                );
604                (object_id, object_version)
605            })
606            .collect(),
607    )
608}