sui_tool/db_tool/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use self::db_dump::{StoreName, dump_table, duplicate_objects_summary, list_tables, table_summary};
5use self::index_search::{SearchRange, search_index};
6use crate::db_tool::db_dump::{compact, print_table_metadata, prune_checkpoints, prune_objects};
7use anyhow::{anyhow, bail};
8use clap::Parser;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use sui_core::authority::authority_per_epoch_store::AuthorityEpochTables;
12use sui_core::authority::authority_store_pruner::PrunerWatermarks;
13use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
14use sui_core::checkpoints::CheckpointStore;
15use sui_types::base_types::{EpochId, ObjectID};
16use sui_types::digests::{CheckpointContentsDigest, TransactionDigest};
17use sui_types::effects::TransactionEffectsAPI;
18use sui_types::messages_checkpoint::{CheckpointDigest, CheckpointSequenceNumber};
19use typed_store::rocks::{MetricConf, safe_drop_db};
20pub mod db_dump;
21mod index_search;
22
23#[derive(Parser)]
24#[command(rename_all = "kebab-case")]
25pub enum DbToolCommand {
26    ListTables,
27    Dump(Options),
28    IndexSearchKeyRange(IndexSearchKeyRangeOptions),
29    IndexSearchCount(IndexSearchCountOptions),
30    TableSummary(Options),
31    DuplicatesSummary,
32    ListDBMetadata(Options),
33    PrintLastConsensusIndex,
34    PrintConsensusCommit(PrintConsensusCommitOptions),
35    PrintTransaction(PrintTransactionOptions),
36    PrintObject(PrintObjectOptions),
37    PrintCheckpoint(PrintCheckpointOptions),
38    PrintCheckpointContent(PrintCheckpointContentOptions),
39    ResetDB,
40    RewindCheckpointExecution(RewindCheckpointExecutionOptions),
41    Compact,
42    PruneObjects,
43    PruneCheckpoints,
44    SetCheckpointWatermark(SetCheckpointWatermarkOptions),
45}
46
47#[derive(Parser)]
48#[command(rename_all = "kebab-case")]
49pub struct IndexSearchKeyRangeOptions {
50    #[arg(long = "table-name", short = 't')]
51    table_name: String,
52    #[arg(long = "start", short = 's')]
53    start: String,
54    #[arg(long = "end", short = 'e')]
55    end_key: String,
56}
57
58#[derive(Parser)]
59#[command(rename_all = "kebab-case")]
60pub struct IndexSearchCountOptions {
61    #[arg(long = "table-name", short = 't')]
62    table_name: String,
63    #[arg(long = "start", short = 's')]
64    start: String,
65    #[arg(long = "count", short = 'c')]
66    count: u64,
67}
68
69#[derive(Parser)]
70#[command(rename_all = "kebab-case")]
71pub struct Options {
72    /// The type of store to dump
73    #[arg(long = "store", short = 's', value_enum)]
74    store_name: StoreName,
75    /// The name of the table to dump
76    #[arg(long = "table-name", short = 't')]
77    table_name: String,
78    /// The size of page to dump. This is a u16
79    #[arg(long = "page-size", short = 'p')]
80    page_size: u16,
81    /// The page number to dump
82    #[arg(long = "page-num", short = 'n')]
83    page_number: usize,
84
85    // TODO: We should load this automatically from the system object in AuthorityPerpetualTables.
86    // This is very difficult to do right now because you can't share code between
87    // AuthorityPerpetualTables and AuthorityEpochTablesReadonly.
88    /// The epoch to use when loading AuthorityEpochTables.
89    #[arg(long = "epoch", short = 'e')]
90    epoch: Option<EpochId>,
91}
92
93#[derive(Parser)]
94#[command(rename_all = "kebab-case")]
95pub struct PrintConsensusCommitOptions {
96    #[arg(long, help = "Sequence number of the consensus commit")]
97    seqnum: u64,
98}
99
100#[derive(Parser)]
101#[command(rename_all = "kebab-case")]
102pub struct PrintTransactionOptions {
103    #[arg(long, help = "The transaction digest to print")]
104    digest: TransactionDigest,
105}
106
107#[derive(Parser)]
108#[command(rename_all = "kebab-case")]
109pub struct PrintObjectOptions {
110    #[arg(long, help = "The object id to print")]
111    id: ObjectID,
112    #[arg(long, help = "The object version to print")]
113    version: Option<u64>,
114}
115
116#[derive(Parser)]
117#[command(rename_all = "kebab-case")]
118pub struct PrintCheckpointOptions {
119    #[arg(long, help = "The checkpoint digest to print")]
120    digest: CheckpointDigest,
121}
122
123#[derive(Parser)]
124#[command(rename_all = "kebab-case")]
125pub struct PrintCheckpointContentOptions {
126    #[arg(
127        long,
128        help = "The checkpoint content digest (NOT the checkpoint digest)"
129    )]
130    digest: CheckpointContentsDigest,
131}
132
133#[derive(Parser)]
134#[command(rename_all = "kebab-case")]
135pub struct RemoveTransactionOptions {
136    #[arg(long, help = "The transaction digest to remove")]
137    digest: TransactionDigest,
138
139    #[arg(long)]
140    confirm: bool,
141
142    /// The epoch to use when loading AuthorityEpochTables.
143    /// Defaults to the current epoch.
144    #[arg(long = "epoch", short = 'e')]
145    epoch: Option<EpochId>,
146}
147
148#[derive(Parser)]
149#[command(rename_all = "kebab-case")]
150pub struct RemoveObjectLockOptions {
151    #[arg(long, help = "The object ID to remove")]
152    id: ObjectID,
153
154    #[arg(long, help = "The object version to remove")]
155    version: u64,
156
157    #[arg(long)]
158    confirm: bool,
159}
160
161#[derive(Parser)]
162#[command(rename_all = "kebab-case")]
163pub struct RewindCheckpointExecutionOptions {
164    #[arg(long = "epoch")]
165    epoch: EpochId,
166
167    #[arg(long = "checkpoint-sequence-number")]
168    checkpoint_sequence_number: u64,
169}
170
171#[derive(Parser)]
172#[command(rename_all = "kebab-case")]
173pub struct SetCheckpointWatermarkOptions {
174    #[arg(long)]
175    highest_verified: Option<CheckpointSequenceNumber>,
176
177    #[arg(long)]
178    highest_synced: Option<CheckpointSequenceNumber>,
179}
180
181pub async fn execute_db_tool_command(db_path: PathBuf, cmd: DbToolCommand) -> anyhow::Result<()> {
182    match cmd {
183        DbToolCommand::ListTables => print_db_all_tables(db_path),
184        DbToolCommand::Dump(d) => print_all_entries(
185            d.store_name,
186            d.epoch,
187            db_path,
188            &d.table_name,
189            d.page_size,
190            d.page_number,
191        ),
192        DbToolCommand::TableSummary(d) => {
193            print_db_table_summary(d.store_name, d.epoch, db_path, &d.table_name)
194        }
195        DbToolCommand::DuplicatesSummary => print_db_duplicates_summary(db_path),
196        DbToolCommand::ListDBMetadata(d) => {
197            print_table_metadata(d.store_name, d.epoch, db_path, &d.table_name)
198        }
199        DbToolCommand::PrintLastConsensusIndex => print_last_consensus_index(&db_path),
200        DbToolCommand::PrintConsensusCommit(d) => print_consensus_commit(&db_path, d),
201        DbToolCommand::PrintTransaction(d) => print_transaction(&db_path, d),
202        DbToolCommand::PrintObject(o) => print_object(&db_path, o),
203        DbToolCommand::PrintCheckpoint(d) => print_checkpoint(&db_path, d),
204        DbToolCommand::PrintCheckpointContent(d) => print_checkpoint_content(&db_path, d),
205        DbToolCommand::ResetDB => reset_db_to_genesis(&db_path).await,
206        DbToolCommand::RewindCheckpointExecution(d) => {
207            rewind_checkpoint_execution(&db_path, d.epoch, d.checkpoint_sequence_number)
208        }
209        DbToolCommand::Compact => compact(db_path),
210        DbToolCommand::PruneObjects => prune_objects(db_path).await,
211        DbToolCommand::PruneCheckpoints => prune_checkpoints(db_path).await,
212        DbToolCommand::IndexSearchKeyRange(rg) => {
213            let res = search_index(
214                db_path,
215                rg.table_name,
216                rg.start,
217                SearchRange::ExclusiveLastKey(rg.end_key),
218            )?;
219            for (k, v) in res {
220                println!("{}: {}", k, v);
221            }
222            Ok(())
223        }
224        DbToolCommand::IndexSearchCount(sc) => {
225            let res = search_index(
226                db_path,
227                sc.table_name,
228                sc.start,
229                SearchRange::Count(sc.count),
230            )?;
231            for (k, v) in res {
232                println!("{}: {}", k, v);
233            }
234            Ok(())
235        }
236        DbToolCommand::SetCheckpointWatermark(d) => set_checkpoint_watermark(&db_path, d),
237    }
238}
239
240pub fn print_db_all_tables(db_path: PathBuf) -> anyhow::Result<()> {
241    list_tables(db_path)?.iter().for_each(|t| println!("{}", t));
242    Ok(())
243}
244
245pub fn print_db_duplicates_summary(db_path: PathBuf) -> anyhow::Result<()> {
246    let (total_count, duplicate_count, total_bytes, duplicated_bytes) =
247        duplicate_objects_summary(db_path)?;
248    println!(
249        "Total objects = {}, duplicated objects = {}, total bytes = {}, duplicated bytes = {}",
250        total_count, duplicate_count, total_bytes, duplicated_bytes
251    );
252    Ok(())
253}
254
255pub fn print_last_consensus_index(path: &Path) -> anyhow::Result<()> {
256    let epoch_tables = AuthorityEpochTables::open_tables_read_write(
257        path.to_path_buf(),
258        MetricConf::default(),
259        None,
260        None,
261    );
262    let last_index = epoch_tables.get_last_consensus_index()?;
263    println!("Last consensus index is {:?}", last_index);
264    Ok(())
265}
266
267// TODO: implement for consensus.
268pub fn print_consensus_commit(
269    _path: &Path,
270    _opt: PrintConsensusCommitOptions,
271) -> anyhow::Result<()> {
272    println!("Printing consensus commit is unimplemented");
273    Ok(())
274}
275
276pub fn print_transaction(path: &Path, opt: PrintTransactionOptions) -> anyhow::Result<()> {
277    let perpetual_db = AuthorityPerpetualTables::open(&path.join("store"), None, None);
278    if let Some((epoch, checkpoint_seq_num)) =
279        perpetual_db.get_checkpoint_sequence_number(&opt.digest)?
280    {
281        println!(
282            "Transaction {:?} executed in epoch {} checkpoint {}",
283            opt.digest, epoch, checkpoint_seq_num
284        );
285    };
286    if let Some(effects) = perpetual_db.get_effects(&opt.digest)? {
287        println!(
288            "Transaction {:?} dependencies: {:#?}",
289            opt.digest,
290            effects.dependencies(),
291        );
292    };
293    Ok(())
294}
295
296pub fn print_object(path: &Path, opt: PrintObjectOptions) -> anyhow::Result<()> {
297    let perpetual_db = AuthorityPerpetualTables::open(&path.join("store"), None, None);
298
299    let obj = if let Some(version) = opt.version {
300        perpetual_db.get_object_by_key_fallible(&opt.id, version.into())?
301    } else {
302        perpetual_db.get_object_fallible(&opt.id)?
303    };
304
305    if let Some(obj) = obj {
306        println!("Object {:?}:\n{:#?}", opt.id, obj);
307    } else {
308        println!("Object {:?} not found", opt.id);
309    }
310
311    Ok(())
312}
313
314pub fn print_checkpoint(path: &Path, opt: PrintCheckpointOptions) -> anyhow::Result<()> {
315    let checkpoint_store = CheckpointStore::new(
316        &path.join("checkpoints"),
317        Arc::new(PrunerWatermarks::default()),
318    );
319    let checkpoint = checkpoint_store
320        .get_checkpoint_by_digest(&opt.digest)?
321        .ok_or(anyhow!(
322            "Checkpoint digest {:?} not found in checkpoint store",
323            opt.digest
324        ))?;
325    println!("Checkpoint: {:?}", checkpoint);
326    drop(checkpoint_store);
327    print_checkpoint_content(
328        path,
329        PrintCheckpointContentOptions {
330            digest: checkpoint.content_digest,
331        },
332    )
333}
334
335pub fn print_checkpoint_content(
336    path: &Path,
337    opt: PrintCheckpointContentOptions,
338) -> anyhow::Result<()> {
339    let checkpoint_store = CheckpointStore::new(
340        &path.join("checkpoints"),
341        Arc::new(PrunerWatermarks::default()),
342    );
343    let contents = checkpoint_store
344        .get_checkpoint_contents(&opt.digest)?
345        .ok_or(anyhow!(
346            "Checkpoint content digest {:?} not found in checkpoint store",
347            opt.digest
348        ))?;
349    println!("Checkpoint content: {:?}", contents);
350    Ok(())
351}
352
353pub async fn reset_db_to_genesis(path: &Path) -> anyhow::Result<()> {
354    // Follow the below steps to test:
355    //
356    // Get a db snapshot. Either generate one by running stress locally and enabling db checkpoints or download one from S3 bucket (pretty big in size though).
357    // Download the snapshot for the epoch you want to restore to the local disk. You will find one snapshot per epoch in the S3 bucket. We need to place the snapshot in the dir where config is pointing to. If db-config in fullnode.yaml is /opt/sui/db/authorities_db and we want to restore from epoch 10, we want to copy the snapshot to /opt/sui/db/authorities_dblike this:
358    // aws s3 cp s3://myBucket/dir /opt/sui/db/authorities_db/ --recursive —exclude “*” —include “epoch_10*”
359    // Mark downloaded snapshot as live: mv  /opt/sui/db/authorities_db/epoch_10  /opt/sui/db/authorities_db/live
360    // Reset the downloaded db to execute from genesis with: cargo run --package sui-tool -- db-tool --db-path /opt/sui/db/authorities_db/live reset-db
361    // Start the sui full node: cargo run --release --bin sui-node -- --config-path ~/db_checkpoints/fullnode.yaml
362    // A sample fullnode.yaml config would be:
363    // ---
364    // db-path:  /opt/sui/db/authorities_db
365    // network-address: /ip4/0.0.0.0/tcp/8080/http
366    // json-rpc-address: "0.0.0.0:9000"
367    // websocket-address: "0.0.0.0:9001"
368    // metrics-address: "0.0.0.0:9184"
369    // admin-interface-port: 1337
370    // enable-event-processing: true
371    // grpc-load-shed: ~
372    // grpc-concurrency-limit: ~
373    // p2p-config:
374    //   listen-address: "0.0.0.0:8084"
375    // genesis:
376    //   genesis-file-location:  <path to genesis blob for the network>
377    // authority-store-pruning-config:
378    //   num-latest-epoch-dbs-to-retain: 3
379    //   epoch-db-pruning-period-secs: 3600
380    //   num-epochs-to-retain: 18446744073709551615
381    //   max-checkpoints-in-batch: 10
382    //   max-transactions-in-batch: 1000
383    safe_drop_db(
384        path.join("store").join("perpetual"),
385        std::time::Duration::from_secs(60),
386    )
387    .await?;
388
389    let checkpoint_db = CheckpointStore::new(
390        &path.join("checkpoints"),
391        Arc::new(PrunerWatermarks::default()),
392    );
393    checkpoint_db.reset_db_for_execution_since_genesis()?;
394
395    Ok(())
396}
397
398/// Force sets the highest executed checkpoint.
399/// NOTE: Does not force re-execution of transactions.
400/// Run with: cargo run --package sui-tool -- db-tool --db-path /opt/sui/db/authorities_db/live rewind-checkpoint-execution --epoch 3 --checkpoint-sequence-number 300000
401pub fn rewind_checkpoint_execution(
402    path: &Path,
403    epoch: EpochId,
404    checkpoint_sequence_number: u64,
405) -> anyhow::Result<()> {
406    let checkpoint_db = CheckpointStore::new(
407        &path.join("checkpoints"),
408        Arc::new(PrunerWatermarks::default()),
409    );
410    let Some(checkpoint) =
411        checkpoint_db.get_checkpoint_by_sequence_number(checkpoint_sequence_number)?
412    else {
413        bail!("Checkpoint {checkpoint_sequence_number} not found!");
414    };
415    if epoch != checkpoint.epoch() {
416        bail!(
417            "Checkpoint {checkpoint_sequence_number} is in epoch {} not {epoch}!",
418            checkpoint.epoch()
419        );
420    }
421
422    let highest_executed_sequence_number = checkpoint_db
423        .get_highest_executed_checkpoint_seq_number()?
424        .unwrap_or_default();
425    if checkpoint_sequence_number > highest_executed_sequence_number {
426        bail!(
427            "Must rewind checkpoint execution to be not later than highest executed ({} > {})!",
428            checkpoint_sequence_number,
429            highest_executed_sequence_number
430        );
431    }
432    checkpoint_db.set_highest_executed_checkpoint_subtle(&checkpoint)?;
433    Ok(())
434}
435
436pub fn print_db_table_summary(
437    store: StoreName,
438    epoch: Option<EpochId>,
439    path: PathBuf,
440    table_name: &str,
441) -> anyhow::Result<()> {
442    let summary = table_summary(store, epoch, path, table_name)?;
443    let quantiles = [25, 50, 75, 90, 99];
444    println!(
445        "Total num keys = {}, total key bytes = {}, total value bytes = {}",
446        summary.num_keys, summary.key_bytes_total, summary.value_bytes_total
447    );
448    println!("Key size distribution:\n");
449    quantiles.iter().for_each(|q| {
450        println!(
451            "p{:?} -> {:?} bytes\n",
452            q,
453            summary.key_hist.value_at_quantile(*q as f64 / 100.0)
454        );
455    });
456    println!("Value size distribution:\n");
457    quantiles.iter().for_each(|q| {
458        println!(
459            "p{:?} -> {:?} bytes\n",
460            q,
461            summary.value_hist.value_at_quantile(*q as f64 / 100.0)
462        );
463    });
464    Ok(())
465}
466
467pub fn print_all_entries(
468    store: StoreName,
469    epoch: Option<EpochId>,
470    path: PathBuf,
471    table_name: &str,
472    page_size: u16,
473    page_number: usize,
474) -> anyhow::Result<()> {
475    for (k, v) in dump_table(store, epoch, path, table_name, page_size, page_number)? {
476        println!("{:>100?}: {:?}", k, v);
477    }
478    Ok(())
479}
480
481/// Force sets state sync checkpoint watermarks.
482/// Run with (for example):
483/// cargo run --package sui-tool -- db-tool --db-path /opt/sui/db/authorities_db/live set_checkpoint_watermark --highest-synced 300000
484pub fn set_checkpoint_watermark(
485    path: &Path,
486    options: SetCheckpointWatermarkOptions,
487) -> anyhow::Result<()> {
488    let checkpoint_db = CheckpointStore::new(
489        &path.join("checkpoints"),
490        Arc::new(PrunerWatermarks::default()),
491    );
492
493    if let Some(highest_verified) = options.highest_verified {
494        let Some(checkpoint) = checkpoint_db.get_checkpoint_by_sequence_number(highest_verified)?
495        else {
496            bail!("Checkpoint {highest_verified} not found");
497        };
498        checkpoint_db.update_highest_verified_checkpoint(&checkpoint)?;
499    }
500    if let Some(highest_synced) = options.highest_synced {
501        let Some(checkpoint) = checkpoint_db.get_checkpoint_by_sequence_number(highest_synced)?
502        else {
503            bail!("Checkpoint {highest_synced} not found");
504        };
505        checkpoint_db.update_highest_synced_checkpoint(&checkpoint)?;
506    }
507    Ok(())
508}