sui_tool/db_tool/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use self::db_dump::{StoreName, dump_table, list_tables, table_summary};
5use self::index_search::{SearchRange, search_index};
6use crate::db_tool::db_dump::{compact, print_table_metadata, prune_checkpoints, prune_objects};
7use anyhow::{anyhow, bail};
8use clap::Parser;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use sui_core::authority::authority_per_epoch_store::AuthorityEpochTables;
12use sui_core::authority::authority_store_pruner::PrunerWatermarks;
13use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
14use sui_core::checkpoints::CheckpointStore;
15use sui_types::base_types::{EpochId, ObjectID};
16use sui_types::digests::{CheckpointContentsDigest, TransactionDigest};
17use sui_types::effects::TransactionEffectsAPI;
18use sui_types::messages_checkpoint::{CheckpointDigest, CheckpointSequenceNumber};
19use typed_store::rocks::{MetricConf, safe_drop_db};
20pub mod db_dump;
21mod index_search;
22
23#[derive(Parser)]
24#[command(rename_all = "kebab-case")]
25pub enum DbToolCommand {
26    ListTables,
27    Dump(Options),
28    IndexSearchKeyRange(IndexSearchKeyRangeOptions),
29    IndexSearchCount(IndexSearchCountOptions),
30    TableSummary(Options),
31    ListDBMetadata(Options),
32    PrintLastConsensusIndex,
33    PrintConsensusCommit(PrintConsensusCommitOptions),
34    PrintTransaction(PrintTransactionOptions),
35    PrintObject(PrintObjectOptions),
36    PrintCheckpoint(PrintCheckpointOptions),
37    PrintCheckpointContent(PrintCheckpointContentOptions),
38    ResetDB,
39    RewindCheckpointExecution(RewindCheckpointExecutionOptions),
40    Compact,
41    PruneObjects,
42    PruneCheckpoints,
43    SetCheckpointWatermark(SetCheckpointWatermarkOptions),
44}
45
46#[derive(Parser)]
47#[command(rename_all = "kebab-case")]
48pub struct IndexSearchKeyRangeOptions {
49    #[arg(long = "table-name", short = 't')]
50    table_name: String,
51    #[arg(long = "start", short = 's')]
52    start: String,
53    #[arg(long = "end", short = 'e')]
54    end_key: String,
55}
56
57#[derive(Parser)]
58#[command(rename_all = "kebab-case")]
59pub struct IndexSearchCountOptions {
60    #[arg(long = "table-name", short = 't')]
61    table_name: String,
62    #[arg(long = "start", short = 's')]
63    start: String,
64    #[arg(long = "count", short = 'c')]
65    count: u64,
66}
67
68#[derive(Parser)]
69#[command(rename_all = "kebab-case")]
70pub struct Options {
71    /// The type of store to dump
72    #[arg(long = "store", short = 's', value_enum)]
73    store_name: StoreName,
74    /// The name of the table to dump
75    #[arg(long = "table-name", short = 't')]
76    table_name: String,
77    /// The size of page to dump. This is a u16
78    #[arg(long = "page-size", short = 'p')]
79    page_size: u16,
80    /// The page number to dump
81    #[arg(long = "page-num", short = 'n')]
82    page_number: usize,
83
84    // TODO: We should load this automatically from the system object in AuthorityPerpetualTables.
85    // This is very difficult to do right now because you can't share code between
86    // AuthorityPerpetualTables and AuthorityEpochTablesReadonly.
87    /// The epoch to use when loading AuthorityEpochTables.
88    #[arg(long = "epoch", short = 'e')]
89    epoch: Option<EpochId>,
90}
91
92#[derive(Parser)]
93#[command(rename_all = "kebab-case")]
94pub struct PrintConsensusCommitOptions {
95    #[arg(long, help = "Sequence number of the consensus commit")]
96    seqnum: u64,
97}
98
99#[derive(Parser)]
100#[command(rename_all = "kebab-case")]
101pub struct PrintTransactionOptions {
102    #[arg(long, help = "The transaction digest to print")]
103    digest: TransactionDigest,
104}
105
106#[derive(Parser)]
107#[command(rename_all = "kebab-case")]
108pub struct PrintObjectOptions {
109    #[arg(long, help = "The object id to print")]
110    id: ObjectID,
111    #[arg(long, help = "The object version to print")]
112    version: Option<u64>,
113}
114
115#[derive(Parser)]
116#[command(rename_all = "kebab-case")]
117pub struct PrintCheckpointOptions {
118    #[arg(long, help = "The checkpoint digest to print")]
119    digest: CheckpointDigest,
120}
121
122#[derive(Parser)]
123#[command(rename_all = "kebab-case")]
124pub struct PrintCheckpointContentOptions {
125    #[arg(
126        long,
127        help = "The checkpoint content digest (NOT the checkpoint digest)"
128    )]
129    digest: CheckpointContentsDigest,
130}
131
132#[derive(Parser)]
133#[command(rename_all = "kebab-case")]
134pub struct RemoveTransactionOptions {
135    #[arg(long, help = "The transaction digest to remove")]
136    digest: TransactionDigest,
137
138    #[arg(long)]
139    confirm: bool,
140
141    /// The epoch to use when loading AuthorityEpochTables.
142    /// Defaults to the current epoch.
143    #[arg(long = "epoch", short = 'e')]
144    epoch: Option<EpochId>,
145}
146
147#[derive(Parser)]
148#[command(rename_all = "kebab-case")]
149pub struct RemoveObjectLockOptions {
150    #[arg(long, help = "The object ID to remove")]
151    id: ObjectID,
152
153    #[arg(long, help = "The object version to remove")]
154    version: u64,
155
156    #[arg(long)]
157    confirm: bool,
158}
159
160#[derive(Parser)]
161#[command(rename_all = "kebab-case")]
162pub struct RewindCheckpointExecutionOptions {
163    #[arg(long = "epoch")]
164    epoch: EpochId,
165
166    #[arg(long = "checkpoint-sequence-number")]
167    checkpoint_sequence_number: u64,
168}
169
170#[derive(Parser)]
171#[command(rename_all = "kebab-case")]
172pub struct SetCheckpointWatermarkOptions {
173    #[arg(long)]
174    highest_verified: Option<CheckpointSequenceNumber>,
175
176    #[arg(long)]
177    highest_synced: Option<CheckpointSequenceNumber>,
178}
179
180pub async fn execute_db_tool_command(db_path: PathBuf, cmd: DbToolCommand) -> anyhow::Result<()> {
181    match cmd {
182        DbToolCommand::ListTables => print_db_all_tables(db_path),
183        DbToolCommand::Dump(d) => print_all_entries(
184            d.store_name,
185            d.epoch,
186            db_path,
187            &d.table_name,
188            d.page_size,
189            d.page_number,
190        ),
191        DbToolCommand::TableSummary(d) => {
192            print_db_table_summary(d.store_name, d.epoch, db_path, &d.table_name)
193        }
194        DbToolCommand::ListDBMetadata(d) => {
195            print_table_metadata(d.store_name, d.epoch, db_path, &d.table_name)
196        }
197        DbToolCommand::PrintLastConsensusIndex => print_last_consensus_index(&db_path),
198        DbToolCommand::PrintConsensusCommit(d) => print_consensus_commit(&db_path, d),
199        DbToolCommand::PrintTransaction(d) => print_transaction(&db_path, d),
200        DbToolCommand::PrintObject(o) => print_object(&db_path, o),
201        DbToolCommand::PrintCheckpoint(d) => print_checkpoint(&db_path, d),
202        DbToolCommand::PrintCheckpointContent(d) => print_checkpoint_content(&db_path, d),
203        DbToolCommand::ResetDB => reset_db_to_genesis(&db_path).await,
204        DbToolCommand::RewindCheckpointExecution(d) => {
205            rewind_checkpoint_execution(&db_path, d.epoch, d.checkpoint_sequence_number)
206        }
207        DbToolCommand::Compact => compact(db_path),
208        DbToolCommand::PruneObjects => prune_objects(db_path).await,
209        DbToolCommand::PruneCheckpoints => prune_checkpoints(db_path).await,
210        DbToolCommand::IndexSearchKeyRange(rg) => {
211            let res = search_index(
212                db_path,
213                rg.table_name,
214                rg.start,
215                SearchRange::ExclusiveLastKey(rg.end_key),
216            )?;
217            for (k, v) in res {
218                println!("{}: {}", k, v);
219            }
220            Ok(())
221        }
222        DbToolCommand::IndexSearchCount(sc) => {
223            let res = search_index(
224                db_path,
225                sc.table_name,
226                sc.start,
227                SearchRange::Count(sc.count),
228            )?;
229            for (k, v) in res {
230                println!("{}: {}", k, v);
231            }
232            Ok(())
233        }
234        DbToolCommand::SetCheckpointWatermark(d) => set_checkpoint_watermark(&db_path, d),
235    }
236}
237
238pub fn print_db_all_tables(db_path: PathBuf) -> anyhow::Result<()> {
239    list_tables(db_path)?.iter().for_each(|t| println!("{}", t));
240    Ok(())
241}
242
243pub fn print_last_consensus_index(path: &Path) -> anyhow::Result<()> {
244    #[cfg(not(tidehunter))]
245    let epoch_tables = AuthorityEpochTables::open_tables_read_write(
246        path.to_path_buf(),
247        MetricConf::default(),
248        None,
249        None,
250    );
251    #[cfg(tidehunter)]
252    let epoch_tables = AuthorityEpochTables::open_with_path(&path.to_path_buf(), None);
253    let last_index = epoch_tables.get_last_consensus_index()?;
254    println!("Last consensus index is {:?}", last_index);
255    Ok(())
256}
257
258// TODO: implement for consensus.
259pub fn print_consensus_commit(
260    _path: &Path,
261    _opt: PrintConsensusCommitOptions,
262) -> anyhow::Result<()> {
263    println!("Printing consensus commit is unimplemented");
264    Ok(())
265}
266
267pub fn print_transaction(path: &Path, opt: PrintTransactionOptions) -> anyhow::Result<()> {
268    let perpetual_db = AuthorityPerpetualTables::open(&path.join("store"), None, None);
269    if let Some((epoch, checkpoint_seq_num)) =
270        perpetual_db.get_checkpoint_sequence_number(&opt.digest)?
271    {
272        println!(
273            "Transaction {:?} executed in epoch {} checkpoint {}",
274            opt.digest, epoch, checkpoint_seq_num
275        );
276    };
277    if let Some(effects) = perpetual_db.get_effects(&opt.digest)? {
278        println!(
279            "Transaction {:?} dependencies: {:#?}",
280            opt.digest,
281            effects.dependencies(),
282        );
283    };
284    Ok(())
285}
286
287pub fn print_object(path: &Path, opt: PrintObjectOptions) -> anyhow::Result<()> {
288    let perpetual_db = AuthorityPerpetualTables::open(&path.join("store"), None, None);
289
290    let obj = if let Some(version) = opt.version {
291        perpetual_db.get_object_by_key_fallible(&opt.id, version.into())?
292    } else {
293        perpetual_db.get_object_fallible(&opt.id)?
294    };
295
296    if let Some(obj) = obj {
297        println!("Object {:?}:\n{:#?}", opt.id, obj);
298    } else {
299        println!("Object {:?} not found", opt.id);
300    }
301
302    Ok(())
303}
304
305pub fn print_checkpoint(path: &Path, opt: PrintCheckpointOptions) -> anyhow::Result<()> {
306    let checkpoint_store = CheckpointStore::new(
307        &path.join("checkpoints"),
308        Arc::new(PrunerWatermarks::default()),
309    );
310    let checkpoint = checkpoint_store
311        .get_checkpoint_by_digest(&opt.digest)?
312        .ok_or(anyhow!(
313            "Checkpoint digest {:?} not found in checkpoint store",
314            opt.digest
315        ))?;
316    println!("Checkpoint: {:?}", checkpoint);
317    drop(checkpoint_store);
318    print_checkpoint_content(
319        path,
320        PrintCheckpointContentOptions {
321            digest: checkpoint.content_digest,
322        },
323    )
324}
325
326pub fn print_checkpoint_content(
327    path: &Path,
328    opt: PrintCheckpointContentOptions,
329) -> anyhow::Result<()> {
330    let checkpoint_store = CheckpointStore::new(
331        &path.join("checkpoints"),
332        Arc::new(PrunerWatermarks::default()),
333    );
334    let contents = checkpoint_store
335        .get_checkpoint_contents(&opt.digest)?
336        .ok_or(anyhow!(
337            "Checkpoint content digest {:?} not found in checkpoint store",
338            opt.digest
339        ))?;
340    println!("Checkpoint content: {:?}", contents);
341    Ok(())
342}
343
344pub async fn reset_db_to_genesis(path: &Path) -> anyhow::Result<()> {
345    // Follow the below steps to test:
346    //
347    // Get a db snapshot. Either generate one by running stress locally and enabling db checkpoints or download one from S3 bucket (pretty big in size though).
348    // Download the snapshot for the epoch you want to restore to the local disk. You will find one snapshot per epoch in the S3 bucket. We need to place the snapshot in the dir where config is pointing to. If db-config in fullnode.yaml is /opt/sui/db/authorities_db and we want to restore from epoch 10, we want to copy the snapshot to /opt/sui/db/authorities_dblike this:
349    // aws s3 cp s3://myBucket/dir /opt/sui/db/authorities_db/ --recursive —exclude “*” —include “epoch_10*”
350    // Mark downloaded snapshot as live: mv  /opt/sui/db/authorities_db/epoch_10  /opt/sui/db/authorities_db/live
351    // Reset the downloaded db to execute from genesis with: cargo run --package sui-tool -- db-tool --db-path /opt/sui/db/authorities_db/live reset-db
352    // Start the sui full node: cargo run --release --bin sui-node -- --config-path ~/db_checkpoints/fullnode.yaml
353    // A sample fullnode.yaml config would be:
354    // ---
355    // db-path:  /opt/sui/db/authorities_db
356    // network-address: /ip4/0.0.0.0/tcp/8080/http
357    // json-rpc-address: "0.0.0.0:9000"
358    // websocket-address: "0.0.0.0:9001"
359    // metrics-address: "0.0.0.0:9184"
360    // admin-interface-port: 1337
361    // enable-event-processing: true
362    // grpc-load-shed: ~
363    // grpc-concurrency-limit: ~
364    // p2p-config:
365    //   listen-address: "0.0.0.0:8084"
366    // genesis:
367    //   genesis-file-location:  <path to genesis blob for the network>
368    // authority-store-pruning-config:
369    //   num-latest-epoch-dbs-to-retain: 3
370    //   epoch-db-pruning-period-secs: 3600
371    //   num-epochs-to-retain: 18446744073709551615
372    //   max-checkpoints-in-batch: 10
373    //   max-transactions-in-batch: 1000
374    safe_drop_db(
375        path.join("store").join("perpetual"),
376        std::time::Duration::from_secs(60),
377    )
378    .await?;
379
380    let checkpoint_db = CheckpointStore::new(
381        &path.join("checkpoints"),
382        Arc::new(PrunerWatermarks::default()),
383    );
384    checkpoint_db.reset_db_for_execution_since_genesis()?;
385
386    Ok(())
387}
388
389/// Force sets the highest executed checkpoint.
390/// NOTE: Does not force re-execution of transactions.
391/// Run with: cargo run --package sui-tool -- db-tool --db-path /opt/sui/db/authorities_db/live rewind-checkpoint-execution --epoch 3 --checkpoint-sequence-number 300000
392pub fn rewind_checkpoint_execution(
393    path: &Path,
394    epoch: EpochId,
395    checkpoint_sequence_number: u64,
396) -> anyhow::Result<()> {
397    let checkpoint_db = CheckpointStore::new(
398        &path.join("checkpoints"),
399        Arc::new(PrunerWatermarks::default()),
400    );
401    let Some(checkpoint) =
402        checkpoint_db.get_checkpoint_by_sequence_number(checkpoint_sequence_number)?
403    else {
404        bail!("Checkpoint {checkpoint_sequence_number} not found!");
405    };
406    if epoch != checkpoint.epoch() {
407        bail!(
408            "Checkpoint {checkpoint_sequence_number} is in epoch {} not {epoch}!",
409            checkpoint.epoch()
410        );
411    }
412
413    let highest_executed_sequence_number = checkpoint_db
414        .get_highest_executed_checkpoint_seq_number()?
415        .unwrap_or_default();
416    if checkpoint_sequence_number > highest_executed_sequence_number {
417        bail!(
418            "Must rewind checkpoint execution to be not later than highest executed ({} > {})!",
419            checkpoint_sequence_number,
420            highest_executed_sequence_number
421        );
422    }
423    checkpoint_db.set_highest_executed_checkpoint_subtle(&checkpoint)?;
424    Ok(())
425}
426
427pub fn print_db_table_summary(
428    store: StoreName,
429    epoch: Option<EpochId>,
430    path: PathBuf,
431    table_name: &str,
432) -> anyhow::Result<()> {
433    let summary = table_summary(store, epoch, path, table_name)?;
434    let quantiles = [25, 50, 75, 90, 99];
435    println!(
436        "Total num keys = {}, total key bytes = {}, total value bytes = {}",
437        summary.num_keys, summary.key_bytes_total, summary.value_bytes_total
438    );
439    println!("Key size distribution:\n");
440    quantiles.iter().for_each(|q| {
441        println!(
442            "p{:?} -> {:?} bytes\n",
443            q,
444            summary.key_hist.value_at_quantile(*q as f64 / 100.0)
445        );
446    });
447    println!("Value size distribution:\n");
448    quantiles.iter().for_each(|q| {
449        println!(
450            "p{:?} -> {:?} bytes\n",
451            q,
452            summary.value_hist.value_at_quantile(*q as f64 / 100.0)
453        );
454    });
455    Ok(())
456}
457
458pub fn print_all_entries(
459    store: StoreName,
460    epoch: Option<EpochId>,
461    path: PathBuf,
462    table_name: &str,
463    page_size: u16,
464    page_number: usize,
465) -> anyhow::Result<()> {
466    for (k, v) in dump_table(store, epoch, path, table_name, page_size, page_number)? {
467        println!("{:>100?}: {:?}", k, v);
468    }
469    Ok(())
470}
471
472/// Force sets state sync checkpoint watermarks.
473/// Run with (for example):
474/// cargo run --package sui-tool -- db-tool --db-path /opt/sui/db/authorities_db/live set_checkpoint_watermark --highest-synced 300000
475pub fn set_checkpoint_watermark(
476    path: &Path,
477    options: SetCheckpointWatermarkOptions,
478) -> anyhow::Result<()> {
479    let checkpoint_db = CheckpointStore::new(
480        &path.join("checkpoints"),
481        Arc::new(PrunerWatermarks::default()),
482    );
483
484    if let Some(highest_verified) = options.highest_verified {
485        let Some(checkpoint) = checkpoint_db.get_checkpoint_by_sequence_number(highest_verified)?
486        else {
487            bail!("Checkpoint {highest_verified} not found");
488        };
489        checkpoint_db.update_highest_verified_checkpoint(&checkpoint)?;
490    }
491    if let Some(highest_synced) = options.highest_synced {
492        let Some(checkpoint) = checkpoint_db.get_checkpoint_by_sequence_number(highest_synced)?
493        else {
494            bail!("Checkpoint {highest_synced} not found");
495        };
496        checkpoint_db.update_highest_synced_checkpoint(&checkpoint)?;
497    }
498    Ok(())
499}