1use crate::db_tool::{DbToolCommand, execute_db_tool_command, print_db_all_tables};
5use crate::{
6 ConciseObjectOutput, GroupedObjectOutput, SnapshotVerifyMode, VerboseObjectOutput,
7 check_completed_snapshot, download_formal_snapshot, get_latest_available_epoch, get_object,
8 get_transaction_block, make_clients, restore_from_db_checkpoint,
9};
10use anyhow::Result;
11use consensus_core::storage::{Store, rocksdb_store::RocksDBStore};
12use consensus_core::{BlockAPI, CommitAPI, CommitRange};
13use futures::TryStreamExt;
14use futures::future::join_all;
15use std::path::PathBuf;
16use std::{collections::BTreeMap, env, sync::Arc};
17use sui_config::genesis::Genesis;
18use sui_core::authority_client::AuthorityAPI;
19use sui_protocol_config::Chain;
20use sui_replay::{ReplayToolCommand, execute_replay_command};
21use sui_rpc_api::Client;
22use sui_types::gas_coin::GasCoin;
23use sui_types::messages_consensus::ConsensusTransaction;
24use sui_types::transaction::Transaction;
25use telemetry_subscribers::TracingHandle;
26
27use sui_types::{
28 base_types::*, crypto::AuthorityPublicKeyBytes, messages_grpc::TransactionInfoRequest,
29};
30
31use clap::*;
32use sui_config::Config;
33use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
34use sui_types::messages_checkpoint::{
35 CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
36};
37
38#[derive(Parser, Clone, ValueEnum)]
39pub enum Verbosity {
40 Grouped,
41 Concise,
42 Verbose,
43}
44
45#[derive(Parser)]
46pub enum ToolCommand {
47 #[command(name = "scan-consensus-commits")]
48 ScanConsensusCommits {
49 #[arg(long = "db-path")]
50 db_path: String,
51 #[arg(long = "start-commit")]
52 start_commit: Option<u32>,
53 #[arg(long = "end-commit")]
54 end_commit: Option<u32>,
55 },
56
57 #[command(name = "locked-object")]
59 LockedObject {
60 #[arg(long, help = "The object ID to fetch")]
63 id: Option<ObjectID>,
64 #[arg(long = "address")]
67 address: Option<SuiAddress>,
68 #[arg(long = "fullnode-rpc-url")]
70 fullnode_rpc_url: String,
71 #[arg(long = "rescue")]
73 rescue: bool,
74 },
75
76 #[command(name = "fetch-object")]
78 FetchObject {
79 #[arg(long, help = "The object ID to fetch")]
80 id: ObjectID,
81
82 #[arg(long, help = "Fetch object at a specific sequence")]
83 version: Option<u64>,
84
85 #[arg(
86 long,
87 help = "Validator to fetch from - if not specified, all validators are queried"
88 )]
89 validator: Option<AuthorityName>,
90
91 #[arg(long = "fullnode-rpc-url")]
93 fullnode_rpc_url: String,
94
95 #[arg(
104 value_enum,
105 long = "verbosity",
106 default_value = "grouped",
107 ignore_case = true
108 )]
109 verbosity: Verbosity,
110
111 #[arg(
112 long = "concise-no-header",
113 help = "don't show header in concise output"
114 )]
115 concise_no_header: bool,
116 },
117
118 #[command(name = "fetch-transaction")]
120 FetchTransaction {
121 #[arg(long = "fullnode-rpc-url")]
123 fullnode_rpc_url: String,
124
125 #[arg(long, help = "The transaction ID to fetch")]
126 digest: TransactionDigest,
127
128 #[arg(long = "show-tx")]
130 show_input_tx: bool,
131 },
132
133 #[command(name = "db-tool")]
135 DbTool {
136 #[arg(long = "db-path")]
138 db_path: String,
139 #[command(subcommand)]
140 cmd: Option<DbToolCommand>,
141 },
142 #[command(name = "dump-packages")]
148 DumpPackages {
149 #[clap(long, short)]
151 rpc_url: String,
152
153 #[clap(long, short)]
155 output_dir: PathBuf,
156
157 #[clap(long)]
160 before_checkpoint: Option<u64>,
161
162 #[clap(short, long = "verbose")]
165 verbose: bool,
166 },
167
168 #[command(name = "dump-validators")]
169 DumpValidators {
170 #[arg(long = "genesis")]
171 genesis: PathBuf,
172
173 #[arg(
174 long = "concise",
175 help = "show concise output - name, protocol key and network address"
176 )]
177 concise: bool,
178 },
179
180 #[command(name = "dump-genesis")]
181 DumpGenesis {
182 #[arg(long = "genesis")]
183 genesis: PathBuf,
184 },
185
186 #[command(name = "fetch-checkpoint")]
189 FetchCheckpoint {
190 #[arg(long = "fullnode-rpc-url")]
192 fullnode_rpc_url: String,
193
194 #[arg(long, help = "Fetch checkpoint at a specific sequence number")]
195 sequence_number: Option<CheckpointSequenceNumber>,
196 },
197
198 #[command(name = "anemo")]
199 Anemo {
200 #[command(next_help_heading = "foo", flatten)]
201 args: anemo_cli::Args,
202 },
203
204 #[command(name = "restore-db")]
205 RestoreFromDBCheckpoint {
206 #[arg(long = "config-path")]
207 config_path: PathBuf,
208 #[arg(long = "db-checkpoint-path")]
209 db_checkpoint_path: PathBuf,
210 },
211
212 #[clap(
216 name = "download-formal-snapshot",
217 about = "Downloads formal database snapshot via cloud object store, outputs to local disk"
218 )]
219 DownloadFormalSnapshot {
220 #[clap(long = "epoch", conflicts_with = "latest")]
221 epoch: Option<u64>,
222 #[clap(long = "genesis")]
223 genesis: PathBuf,
224 #[clap(long = "path")]
225 path: PathBuf,
226 #[clap(long = "num-parallel-downloads")]
228 num_parallel_downloads: Option<usize>,
229 #[clap(long = "num-parallel-chunks", default_value = "8")]
231 num_parallel_chunks: usize,
232 #[clap(long = "verify", default_value = "normal")]
234 verify: Option<SnapshotVerifyMode>,
235 #[clap(long = "network", default_value = "mainnet")]
239 network: Chain,
240 #[clap(long = "snapshot-bucket", conflicts_with = "no_sign_request")]
243 snapshot_bucket: Option<String>,
244 #[clap(
246 long = "snapshot-bucket-type",
247 conflicts_with = "no_sign_request",
248 help = "Required if --no-sign-request is not set"
249 )]
250 snapshot_bucket_type: Option<ObjectStoreType>,
251 #[clap(long = "snapshot-path")]
254 snapshot_path: Option<PathBuf>,
255 #[clap(
257 long = "no-sign-request",
258 conflicts_with_all = &["snapshot_bucket", "snapshot_bucket_type"],
259 help = "if set, no authentication is needed for snapshot restore"
260 )]
261 no_sign_request: bool,
262 #[clap(
265 long = "latest",
266 conflicts_with = "epoch",
267 help = "defaults to latest available snapshot in chosen bucket"
268 )]
269 latest: bool,
270 #[clap(long = "verbose")]
273 verbose: bool,
274
275 #[clap(long = "max-retries", default_value = "3")]
278 max_retries: usize,
279 #[clap(long = "metrics-port", default_value = "9184")]
281 metrics_port: u16,
282 },
283
284 #[clap(name = "replay")]
285 Replay {
286 #[arg(long = "rpc")]
287 rpc_url: Option<String>,
288 #[arg(long = "safety-checks")]
289 safety_checks: bool,
290 #[arg(long = "authority")]
291 use_authority: bool,
292 #[arg(
293 long = "cfg-path",
294 short,
295 help = "Path to the network config file. This should be specified when rpc_url is not present. \
296 If not specified we will use the default network config file at ~/.sui-replay/network-config.yaml"
297 )]
298 cfg_path: Option<PathBuf>,
299 #[arg(
300 long,
301 help = "The name of the chain to replay from, could be one of: mainnet, testnet, devnet.\
302 When rpc_url is not specified, this is used to load the corresponding config from the network config file.\
303 If not specified, mainnet will be used by default"
304 )]
305 chain: Option<String>,
306 #[command(subcommand)]
307 cmd: ReplayToolCommand,
308 },
309
310 #[command(name = "db-shell")]
312 DbShell(crate::db_shell::DbShellArgs),
313
314 #[cfg(all(feature = "tideconsole", not(windows)))]
316 #[command(name = "tideconsole")]
317 TideConsole {
318 #[arg(short, long)]
320 db: Option<PathBuf>,
321 #[arg(short, long)]
323 exec: Option<String>,
324 #[arg(short, long)]
326 script: Option<PathBuf>,
327 },
328}
329
330async fn check_locked_object(
331 sui_client: &Client,
332 committee: Arc<BTreeMap<AuthorityPublicKeyBytes, u64>>,
333 id: ObjectID,
334 rescue: bool,
335) -> anyhow::Result<()> {
336 let clients = Arc::new(make_clients(sui_client).await?);
337 let output = get_object(id, None, None, clients.clone()).await?;
338 let output = GroupedObjectOutput::new(output, committee);
339 if output.fully_locked {
340 println!("Object {} is fully locked.", id);
341 return Ok(());
342 }
343 let top_record = output.voting_power.first().unwrap();
344 let top_record_stake = top_record.1;
345 let top_record = top_record.0.clone().unwrap();
346 if top_record.4.is_none() {
347 println!(
348 "Object {} does not seem to be locked by majority of validators (unlocked stake: {})",
349 id, top_record_stake
350 );
351 return Ok(());
352 }
353
354 let tx_digest = top_record.2;
355 if !rescue {
356 println!("Object {} is rescueable, top tx: {:?}", id, tx_digest);
357 return Ok(());
358 }
359 println!("Object {} is rescueable, trying tx {}", id, tx_digest);
360 let validator = output
361 .grouped_results
362 .get(&Some(top_record))
363 .unwrap()
364 .first()
365 .unwrap();
366 let client = &clients.get(validator).unwrap().1;
367 let tx = client
368 .handle_transaction_info_request(TransactionInfoRequest {
369 transaction_digest: tx_digest,
370 })
371 .await?
372 .transaction;
373 let tx = Transaction::new(tx);
374 let res = sui_client.clone().execute_transaction(&tx).await;
375 match res {
376 Ok(_) => {
377 println!("Transaction executed successfully ({:?})", tx_digest);
378 }
379 Err(e) => {
380 println!("Failed to execute transaction ({:?}): {:?}", tx_digest, e);
381 }
382 }
383 Ok(())
384}
385
386impl ToolCommand {
387 #[allow(clippy::format_in_format_args)]
388 pub async fn execute(self, tracing_handle: TracingHandle) -> Result<(), anyhow::Error> {
389 match self {
390 ToolCommand::ScanConsensusCommits {
391 db_path,
392 start_commit,
393 end_commit,
394 } => {
395 let rocks_db_store = RocksDBStore::new(&db_path);
396
397 let start_commit = start_commit.unwrap_or(0);
398 let end_commit = end_commit.unwrap_or(u32::MAX);
399
400 let commits = rocks_db_store
401 .scan_commits(CommitRange::new(start_commit..=end_commit))
402 .unwrap();
403 println!("found {} consensus commits", commits.len());
404
405 for commit in commits {
406 let inner = &*commit;
407 let block_refs = inner.blocks();
408 let blocks = rocks_db_store.read_blocks(block_refs).unwrap();
409
410 for block in blocks.iter().flatten() {
411 let data = block.transactions_data();
412 println!(
413 "\"index\": \"{}\", \"leader\": \"{}\", \"blocks\": \"{:#?}\", {} txs",
414 inner.index(),
415 inner.leader(),
416 inner.blocks(),
417 data.len()
418 );
419 for txns in &data {
420 let tx: ConsensusTransaction = bcs::from_bytes(txns).unwrap();
421 println!("\t{:?}", tx.key());
422 }
423 }
424 }
425 }
426 ToolCommand::LockedObject {
427 id,
428 fullnode_rpc_url,
429 rescue,
430 address,
431 } => {
432 let sui_client = Client::new(fullnode_rpc_url)?;
433 let committee = Arc::new(
434 sui_client
435 .get_committee(None)
436 .await?
437 .voting_rights
438 .into_iter()
439 .collect::<BTreeMap<_, _>>(),
440 );
441 let object_ids = match id {
442 Some(id) => vec![id],
443 None => {
444 let address = address.expect("Either id or address must be provided");
445 sui_client
446 .list_owned_objects(address, Some(GasCoin::type_()))
447 .map_ok(|o| o.id())
448 .try_collect()
449 .await?
450 }
451 };
452 for ids in object_ids.chunks(30) {
453 let mut tasks = vec![];
454 for id in ids {
455 tasks.push(check_locked_object(
456 &sui_client,
457 committee.clone(),
458 *id,
459 rescue,
460 ))
461 }
462 join_all(tasks)
463 .await
464 .into_iter()
465 .collect::<Result<Vec<_>, _>>()?;
466 }
467 }
468 ToolCommand::FetchObject {
469 id,
470 validator,
471 version,
472 fullnode_rpc_url,
473 verbosity,
474 concise_no_header,
475 } => {
476 let sui_client = Client::new(fullnode_rpc_url)?;
477 let clients = Arc::new(make_clients(&sui_client).await?);
478 let output = get_object(id, version, validator, clients).await?;
479
480 match verbosity {
481 Verbosity::Grouped => {
482 let committee = Arc::new(
483 sui_client
484 .get_committee(None)
485 .await?
486 .voting_rights
487 .into_iter()
488 .collect::<BTreeMap<_, _>>(),
489 );
490 println!("{}", GroupedObjectOutput::new(output, committee));
491 }
492 Verbosity::Verbose => {
493 println!("{}", VerboseObjectOutput(output));
494 }
495 Verbosity::Concise => {
496 if !concise_no_header {
497 println!("{}", ConciseObjectOutput::header());
498 }
499 println!("{}", ConciseObjectOutput(output));
500 }
501 }
502 }
503 ToolCommand::FetchTransaction {
504 digest,
505 show_input_tx,
506 fullnode_rpc_url,
507 } => {
508 print!(
509 "{}",
510 get_transaction_block(digest, show_input_tx, fullnode_rpc_url).await?
511 );
512 }
513 ToolCommand::DbTool { db_path, cmd } => {
514 let path = PathBuf::from(db_path);
515 match cmd {
516 Some(c) => execute_db_tool_command(path, c).await?,
517 None => print_db_all_tables(path)?,
518 }
519 }
520 ToolCommand::DumpPackages {
521 rpc_url,
522 output_dir,
523 before_checkpoint,
524 verbose,
525 } => {
526 if !verbose {
527 tracing_handle
528 .update_log("off")
529 .expect("Failed to update log level");
530 }
531
532 sui_package_dump::dump(rpc_url, output_dir, before_checkpoint).await?;
533 }
534 ToolCommand::DumpValidators { genesis, concise } => {
535 let genesis = Genesis::load(genesis).unwrap();
536 if !concise {
537 println!("{:#?}", genesis.validator_set_for_tooling());
538 } else {
539 for (i, val_info) in genesis.validator_set_for_tooling().iter().enumerate() {
540 let metadata = val_info.verified_metadata();
541 println!(
542 "#{:<2} {:<20} {:?} {:?} {}",
543 i,
544 metadata.name,
545 metadata.sui_pubkey_bytes().concise(),
546 metadata.net_address,
547 anemo::PeerId(metadata.network_pubkey.0.to_bytes()),
548 )
549 }
550 }
551 }
552 ToolCommand::DumpGenesis { genesis } => {
553 let genesis = Genesis::load(genesis)?;
554 println!("{:#?}", genesis);
555 }
556 ToolCommand::FetchCheckpoint {
557 sequence_number,
558 fullnode_rpc_url,
559 } => {
560 let sui_client = Client::new(fullnode_rpc_url)?;
561 let clients = make_clients(&sui_client).await?;
562
563 for (name, (_, client)) in clients {
564 let resp = client
565 .handle_checkpoint(CheckpointRequest {
566 sequence_number,
567 request_content: true,
568 })
569 .await
570 .unwrap();
571 let CheckpointResponse {
572 checkpoint,
573 contents,
574 } = resp;
575
576 let summary = checkpoint.clone().unwrap().data().clone();
577 let mut file = std::fs::File::create("/tmp/ckpt_summary")
579 .expect("Failed to create /tmp/summary");
580 let bytes =
581 bcs::to_bytes(&summary).expect("Failed to serialize summary to BCS");
582 use std::io::Write;
583 file.write_all(&bytes)
584 .expect("Failed to write summary to /tmp/ckpt_summary");
585
586 println!("Validator: {:?}\n", name.concise());
587 println!("Checkpoint: {:?}\n", checkpoint);
588 println!("Content: {:?}\n", contents);
589 }
590 }
591 ToolCommand::Anemo { args } => {
592 let config = crate::make_anemo_config();
593 anemo_cli::run(config, args).await
594 }
595 ToolCommand::RestoreFromDBCheckpoint {
596 config_path,
597 db_checkpoint_path,
598 } => {
599 let config = sui_config::NodeConfig::load(config_path)?;
600 restore_from_db_checkpoint(&config, &db_checkpoint_path).await?;
601 }
602 ToolCommand::DownloadFormalSnapshot {
603 epoch,
604 genesis,
605 path,
606 num_parallel_downloads,
607 num_parallel_chunks,
608 verify,
609 network,
610 snapshot_bucket,
611 snapshot_bucket_type,
612 snapshot_path,
613 no_sign_request,
614 latest,
615 verbose,
616 max_retries,
617 metrics_port,
618 } => {
619 if !verbose {
620 tracing_handle
621 .update_log("off")
622 .expect("Failed to update log level");
623 }
624 let num_parallel_downloads = num_parallel_downloads.unwrap_or(50).min(200);
625 let snapshot_bucket =
626 snapshot_bucket.or_else(|| match (network, no_sign_request) {
627 (Chain::Mainnet, false) => Some(
628 env::var("MAINNET_FORMAL_SIGNED_BUCKET")
629 .unwrap_or("mysten-mainnet-formal".to_string()),
630 ),
631 (Chain::Mainnet, true) => env::var("MAINNET_FORMAL_UNSIGNED_BUCKET").ok(),
632 (Chain::Testnet, true) => env::var("TESTNET_FORMAL_UNSIGNED_BUCKET").ok(),
633 (Chain::Testnet, _) => Some(
634 env::var("TESTNET_FORMAL_SIGNED_BUCKET")
635 .unwrap_or("mysten-testnet-formal".to_string()),
636 ),
637 (Chain::Unknown, _) => {
638 panic!("Cannot generate default snapshot bucket for unknown network");
639 }
640 });
641
642 let aws_endpoint = env::var("AWS_SNAPSHOT_ENDPOINT").ok().or_else(|| {
643 if no_sign_request {
644 if network == Chain::Mainnet {
645 Some("https://formal-snapshot.mainnet.sui.io".to_string())
646 } else if network == Chain::Testnet {
647 Some("https://formal-snapshot.testnet.sui.io".to_string())
648 } else {
649 None
650 }
651 } else {
652 None
653 }
654 });
655
656 let snapshot_bucket_type = if no_sign_request {
657 ObjectStoreType::S3
658 } else {
659 snapshot_bucket_type
660 .expect("You must set either --snapshot-bucket-type or --no-sign-request")
661 };
662 let snapshot_store_config = match snapshot_bucket_type {
663 ObjectStoreType::S3 => ObjectStoreConfig {
664 object_store: Some(ObjectStoreType::S3),
665 bucket: snapshot_bucket.filter(|s| !s.is_empty()),
666 aws_access_key_id: env::var("AWS_SNAPSHOT_ACCESS_KEY_ID").ok(),
667 aws_secret_access_key: env::var("AWS_SNAPSHOT_SECRET_ACCESS_KEY").ok(),
668 aws_region: env::var("AWS_SNAPSHOT_REGION").ok(),
669 aws_endpoint: aws_endpoint.filter(|s| !s.is_empty()),
670 aws_virtual_hosted_style_request: env::var(
671 "AWS_SNAPSHOT_VIRTUAL_HOSTED_REQUESTS",
672 )
673 .ok()
674 .and_then(|b| b.parse().ok())
675 .unwrap_or(no_sign_request),
676 object_store_connection_limit: 200,
677 no_sign_request,
678 ..Default::default()
679 },
680 ObjectStoreType::GCS => ObjectStoreConfig {
681 object_store: Some(ObjectStoreType::GCS),
682 bucket: snapshot_bucket,
683 google_service_account: env::var("GCS_SNAPSHOT_SERVICE_ACCOUNT_FILE_PATH")
684 .ok(),
685 object_store_connection_limit: 200,
686 no_sign_request,
687 ..Default::default()
688 },
689 ObjectStoreType::Azure => ObjectStoreConfig {
690 object_store: Some(ObjectStoreType::Azure),
691 bucket: snapshot_bucket,
692 azure_storage_account: env::var("AZURE_SNAPSHOT_STORAGE_ACCOUNT").ok(),
693 azure_storage_access_key: env::var("AZURE_SNAPSHOT_STORAGE_ACCESS_KEY")
694 .ok(),
695 object_store_connection_limit: 200,
696 no_sign_request,
697 ..Default::default()
698 },
699 ObjectStoreType::File => {
700 if snapshot_path.is_some() {
701 ObjectStoreConfig {
702 object_store: Some(ObjectStoreType::File),
703 directory: snapshot_path,
704 ..Default::default()
705 }
706 } else {
707 panic!(
708 "--snapshot-path must be specified for --snapshot-bucket-type=file"
709 );
710 }
711 }
712 };
713
714 let ingestion_url = match network {
715 Chain::Mainnet => "https://checkpoints.mainnet.sui.io",
716 Chain::Testnet => "https://checkpoints.testnet.sui.io",
717 _ => panic!("Cannot generate default ingestion url for unknown network"),
718 };
719
720 let latest_available_epoch =
721 latest.then_some(get_latest_available_epoch(&snapshot_store_config).await?);
722 let epoch_to_download = epoch.or(latest_available_epoch).expect(
723 "Either pass epoch with --epoch <epoch_num> or use latest with --latest",
724 );
725
726 if let Err(e) =
727 check_completed_snapshot(&snapshot_store_config, epoch_to_download).await
728 {
729 panic!(
730 "Aborting snapshot restore: {}, snapshot may not be uploaded yet",
731 e
732 );
733 }
734
735 let verify = verify.unwrap_or_default();
736 download_formal_snapshot(
737 &path,
738 epoch_to_download,
739 &genesis,
740 snapshot_store_config,
741 ingestion_url,
742 num_parallel_downloads,
743 num_parallel_chunks,
744 network,
745 verify,
746 max_retries,
747 metrics_port,
748 )
749 .await?;
750 }
751 ToolCommand::Replay {
752 rpc_url,
753 safety_checks,
754 cmd,
755 use_authority,
756 cfg_path,
757 chain,
758 } => {
759 execute_replay_command(rpc_url, safety_checks, use_authority, cfg_path, chain, cmd)
760 .await?;
761 }
762 ToolCommand::DbShell(args) => {
763 tokio::task::spawn_blocking(move || crate::db_shell::run(args)).await??;
764 }
765 #[cfg(all(feature = "tideconsole", not(windows)))]
766 ToolCommand::TideConsole { db, exec, script } => {
767 tokio::task::spawn_blocking(move || crate::tideconsole_cmd::run(db, exec, script))
768 .await??;
769 }
770 };
771 Ok(())
772 }
773}