1use anyhow::{Ok, anyhow};
5use clap::{Parser, ValueEnum};
6use comfy_table::{Cell, ContentArrangement, Row, Table};
7use prometheus::Registry;
8use std::collections::BTreeMap;
9use std::path::PathBuf;
10use std::str;
11use std::sync::Arc;
12use strum_macros::EnumString;
13use sui_config::node::AuthorityStorePruningConfig;
14use sui_core::authority::authority_per_epoch_store::AuthorityEpochTables;
15use sui_core::authority::authority_store_pruner::{
16 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
17 PrunerWatermarks,
18};
19use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
20use sui_core::checkpoints::CheckpointStore;
21use sui_core::epoch::committee_store::CommitteeStoreTables;
22use sui_core::jsonrpc_index::IndexStoreTables;
23use sui_core::rpc_index::RpcIndexStore;
24use sui_types::base_types::EpochId;
25use tracing::info;
26use typed_store::rocks::{MetricConf, default_db_options};
27use typed_store::rocksdb::MultiThreaded;
28use typed_store::traits::TableSummary;
29
30#[derive(EnumString, Clone, Parser, Debug, ValueEnum)]
31pub enum StoreName {
32 Validator,
33 Index,
34 Epoch,
35 }
37impl std::fmt::Display for StoreName {
38 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
39 write!(f, "{:?}", self)
40 }
41}
42
43pub fn list_tables(path: PathBuf) -> anyhow::Result<Vec<String>> {
44 typed_store::rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(
45 &default_db_options().options,
46 path,
47 )
48 .map_err(|e| e.into())
49 .map(|q| {
50 q.iter()
51 .filter_map(|s| {
52 if s != "default" {
54 Some(s.clone())
55 } else {
56 None
57 }
58 })
59 .collect()
60 })
61}
62
63pub fn table_summary(
64 store_name: StoreName,
65 epoch: Option<EpochId>,
66 db_path: PathBuf,
67 table_name: &str,
68) -> anyhow::Result<TableSummary> {
69 match store_name {
70 StoreName::Validator => {
71 let epoch_tables = AuthorityEpochTables::describe_tables();
72 if epoch_tables.contains_key(table_name) {
73 let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
74 AuthorityEpochTables::open_readonly(epoch, &db_path).table_summary(table_name)
75 } else {
76 AuthorityPerpetualTables::open_readonly(&db_path).table_summary(table_name)
77 }
78 }
79 StoreName::Index => {
80 IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
81 .table_summary(table_name)
82 }
83 StoreName::Epoch => {
84 CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
85 .table_summary(table_name)
86 }
87 }
88 .map_err(|err| anyhow!(err.to_string()))
89}
90
91pub fn print_table_metadata(
92 store_name: StoreName,
93 epoch: Option<EpochId>,
94 db_path: PathBuf,
95 table_name: &str,
96) -> anyhow::Result<()> {
97 #[cfg(not(tidehunter))]
98 {
99 let db = match store_name {
100 StoreName::Validator => {
101 let epoch_tables = AuthorityEpochTables::describe_tables();
102 if epoch_tables.contains_key(table_name) {
103 let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
104 AuthorityEpochTables::open_readonly(epoch, &db_path)
105 .next_shared_object_versions_v2
106 .db
107 } else {
108 AuthorityPerpetualTables::open_readonly(&db_path).objects.db
109 }
110 }
111 StoreName::Index => {
112 IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
113 .event_by_move_module
114 .db
115 }
116 StoreName::Epoch => {
117 CommitteeStoreTables::get_read_only_handle(
118 db_path,
119 None,
120 None,
121 MetricConf::default(),
122 )
123 .committee_map
124 .db
125 }
126 };
127
128 let mut table = Table::new();
129 table
130 .set_content_arrangement(ContentArrangement::Dynamic)
131 .set_width(200)
132 .set_header(vec![
133 "name",
134 "level",
135 "num_entries",
136 "start_key",
137 "end_key",
138 "num_deletions",
139 "file_size",
140 ]);
141
142 for file in db.live_files()?.iter() {
143 if file.column_family_name != table_name {
144 continue;
145 }
146 let mut row = Row::new();
147 row.add_cell(Cell::new(&file.name));
148 row.add_cell(Cell::new(file.level));
149 row.add_cell(Cell::new(file.num_entries));
150 row.add_cell(Cell::new(hex::encode(
151 file.start_key.as_ref().unwrap_or(&"".as_bytes().to_vec()),
152 )));
153 row.add_cell(Cell::new(hex::encode(
154 file.end_key.as_ref().unwrap_or(&"".as_bytes().to_vec()),
155 )));
156 row.add_cell(Cell::new(file.num_deletions));
157 row.add_cell(Cell::new(file.size));
158 table.add_row(row);
159 }
160
161 eprintln!("{}", table);
162 }
163 Ok(())
164}
165
166pub fn compact(db_path: PathBuf) -> anyhow::Result<()> {
167 let perpetual = Arc::new(AuthorityPerpetualTables::open(&db_path, None, None));
168 AuthorityStorePruner::compact(&perpetual)?;
169 Ok(())
170}
171
172pub async fn prune_objects(db_path: PathBuf) -> anyhow::Result<()> {
173 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
174 &db_path.join("store"),
175 None,
176 None,
177 ));
178 let checkpoint_store = CheckpointStore::new(
179 &db_path.join("checkpoints"),
180 Arc::new(PrunerWatermarks::default()),
181 );
182 let rpc_index = RpcIndexStore::new_without_init(&db_path);
183 let highest_pruned_checkpoint = checkpoint_store
184 .get_highest_pruned_checkpoint_seq_number()?
185 .unwrap_or(0);
186 let latest_checkpoint = checkpoint_store.get_highest_executed_checkpoint()?;
187 info!(
188 "Latest executed checkpoint sequence num: {}",
189 latest_checkpoint.map(|x| x.sequence_number).unwrap_or(0)
190 );
191 info!("Highest pruned checkpoint: {}", highest_pruned_checkpoint);
192 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
193 info!("Pruning setup for db at path: {:?}", db_path.display());
194 let pruning_config = AuthorityStorePruningConfig {
195 num_epochs_to_retain: 0,
196 ..Default::default()
197 };
198 info!("Starting object pruning");
199 AuthorityStorePruner::prune_objects_for_eligible_epochs(
200 &perpetual_db,
201 &checkpoint_store,
202 Some(&rpc_index),
203 pruning_config,
204 metrics,
205 EPOCH_DURATION_MS_FOR_TESTING,
206 )
207 .await?;
208 Ok(())
209}
210
211pub async fn prune_checkpoints(db_path: PathBuf) -> anyhow::Result<()> {
212 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
213 &db_path.join("store"),
214 None,
215 None,
216 ));
217 let checkpoint_store = CheckpointStore::new(
218 &db_path.join("checkpoints"),
219 Arc::new(PrunerWatermarks::default()),
220 );
221 let rpc_index = RpcIndexStore::new_without_init(&db_path);
222 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
223 info!("Pruning setup for db at path: {:?}", db_path.display());
224 let pruning_config = AuthorityStorePruningConfig {
225 num_epochs_to_retain_for_checkpoints: Some(1),
226 ..Default::default()
227 };
228 info!("Starting txns and effects pruning");
229 use sui_core::authority::authority_store_pruner::PrunerWatermarks;
230 let watermarks = std::sync::Arc::new(PrunerWatermarks::default());
231 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
232 &perpetual_db,
233 &checkpoint_store,
234 Some(&rpc_index),
235 pruning_config,
236 metrics,
237 EPOCH_DURATION_MS_FOR_TESTING,
238 &watermarks,
239 )
240 .await?;
241 Ok(())
242}
243
244pub fn dump_table(
246 store_name: StoreName,
247 epoch: Option<EpochId>,
248 db_path: PathBuf,
249 table_name: &str,
250 page_size: u16,
251 page_number: usize,
252) -> anyhow::Result<BTreeMap<String, String>> {
253 match store_name {
254 StoreName::Validator => {
255 let epoch_tables = AuthorityEpochTables::describe_tables();
256 if epoch_tables.contains_key(table_name) {
257 let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
258 AuthorityEpochTables::open_readonly(epoch, &db_path).dump(
259 table_name,
260 page_size,
261 page_number,
262 )
263 } else {
264 let perpetual_tables = AuthorityPerpetualTables::describe_tables();
265 assert!(perpetual_tables.contains_key(table_name));
266 AuthorityPerpetualTables::open_readonly(&db_path).dump(
267 table_name,
268 page_size,
269 page_number,
270 )
271 }
272 }
273 StoreName::Index => {
274 IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default()).dump(
275 table_name,
276 page_size,
277 page_number,
278 )
279 }
280 StoreName::Epoch => {
281 CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
282 .dump(table_name, page_size, page_number)
283 }
284 }
285 .map_err(|err| anyhow!(err.to_string()))
286}
287
288#[cfg(test)]
289mod test {
290 use sui_core::authority::authority_per_epoch_store::AuthorityEpochTables;
291 use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
292
293 use crate::db_tool::db_dump::{StoreName, dump_table, list_tables};
294
295 #[tokio::test]
296 async fn db_dump_population() -> Result<(), anyhow::Error> {
297 let primary_path = tempfile::tempdir()?.keep();
298
299 let _: AuthorityEpochTables = AuthorityEpochTables::open(0, &primary_path, None);
301 let _: AuthorityPerpetualTables = AuthorityPerpetualTables::open(&primary_path, None, None);
302
303 let tables = {
305 let mut epoch_tables =
306 list_tables(AuthorityEpochTables::path(0, &primary_path)).unwrap();
307 let mut perpetual_tables =
308 list_tables(AuthorityPerpetualTables::path(&primary_path)).unwrap();
309 epoch_tables.append(&mut perpetual_tables);
310 epoch_tables
311 };
312
313 let mut missing_tables = vec![];
314 for t in tables {
315 println!("{}", t);
316 if dump_table(
317 StoreName::Validator,
318 Some(0),
319 primary_path.clone(),
320 &t,
321 0,
322 0,
323 )
324 .is_err()
325 {
326 missing_tables.push(t);
327 }
328 }
329 if missing_tables.is_empty() {
330 return Ok(());
331 }
332 panic!(
333 "{}",
334 format!(
335 "Missing {} table(s) from DB dump registration function: {:?} \n Update the dump function.",
336 missing_tables.len(),
337 missing_tables
338 )
339 );
340 }
341}