1use anyhow::{Ok, anyhow};
5use clap::{Parser, ValueEnum};
6use comfy_table::{Cell, ContentArrangement, Row, Table};
7use prometheus::Registry;
8use std::collections::{BTreeMap, HashMap};
9use std::path::PathBuf;
10use std::str;
11use std::sync::Arc;
12use strum_macros::EnumString;
13use sui_config::node::AuthorityStorePruningConfig;
14use sui_core::authority::authority_per_epoch_store::AuthorityEpochTables;
15use sui_core::authority::authority_store_pruner::{
16 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
17 PrunerWatermarks,
18};
19use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
20use sui_core::authority::authority_store_types::{StoreData, StoreObject};
21use sui_core::checkpoints::CheckpointStore;
22use sui_core::epoch::committee_store::CommitteeStoreTables;
23use sui_core::jsonrpc_index::IndexStoreTables;
24use sui_core::rpc_index::RpcIndexStore;
25use sui_types::base_types::{EpochId, ObjectID};
26use tracing::info;
27use typed_store::rocks::{MetricConf, default_db_options};
28use typed_store::rocksdb::MultiThreaded;
29use typed_store::traits::{Map, TableSummary};
30
31#[derive(EnumString, Clone, Parser, Debug, ValueEnum)]
32pub enum StoreName {
33 Validator,
34 Index,
35 Epoch,
36 }
38impl std::fmt::Display for StoreName {
39 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
40 write!(f, "{:?}", self)
41 }
42}
43
44pub fn list_tables(path: PathBuf) -> anyhow::Result<Vec<String>> {
45 typed_store::rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(
46 &default_db_options().options,
47 path,
48 )
49 .map_err(|e| e.into())
50 .map(|q| {
51 q.iter()
52 .filter_map(|s| {
53 if s != "default" {
55 Some(s.clone())
56 } else {
57 None
58 }
59 })
60 .collect()
61 })
62}
63
64pub fn table_summary(
65 store_name: StoreName,
66 epoch: Option<EpochId>,
67 db_path: PathBuf,
68 table_name: &str,
69) -> anyhow::Result<TableSummary> {
70 match store_name {
71 StoreName::Validator => {
72 let epoch_tables = AuthorityEpochTables::describe_tables();
73 if epoch_tables.contains_key(table_name) {
74 let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
75 AuthorityEpochTables::open_readonly(epoch, &db_path).table_summary(table_name)
76 } else {
77 AuthorityPerpetualTables::open_readonly(&db_path).table_summary(table_name)
78 }
79 }
80 StoreName::Index => {
81 IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
82 .table_summary(table_name)
83 }
84 StoreName::Epoch => {
85 CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
86 .table_summary(table_name)
87 }
88 }
89 .map_err(|err| anyhow!(err.to_string()))
90}
91
92pub fn print_table_metadata(
93 store_name: StoreName,
94 epoch: Option<EpochId>,
95 db_path: PathBuf,
96 table_name: &str,
97) -> anyhow::Result<()> {
98 let db = match store_name {
99 StoreName::Validator => {
100 let epoch_tables = AuthorityEpochTables::describe_tables();
101 if epoch_tables.contains_key(table_name) {
102 let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
103 AuthorityEpochTables::open_readonly(epoch, &db_path)
104 .next_shared_object_versions_v2
105 .db
106 } else {
107 AuthorityPerpetualTables::open_readonly(&db_path).objects.db
108 }
109 }
110 StoreName::Index => {
111 IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
112 .event_by_move_module
113 .db
114 }
115 StoreName::Epoch => {
116 CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
117 .committee_map
118 .db
119 }
120 };
121
122 let mut table = Table::new();
123 table
124 .set_content_arrangement(ContentArrangement::Dynamic)
125 .set_width(200)
126 .set_header(vec![
127 "name",
128 "level",
129 "num_entries",
130 "start_key",
131 "end_key",
132 "num_deletions",
133 "file_size",
134 ]);
135
136 for file in db.live_files()?.iter() {
137 if file.column_family_name != table_name {
138 continue;
139 }
140 let mut row = Row::new();
141 row.add_cell(Cell::new(&file.name));
142 row.add_cell(Cell::new(file.level));
143 row.add_cell(Cell::new(file.num_entries));
144 row.add_cell(Cell::new(hex::encode(
145 file.start_key.as_ref().unwrap_or(&"".as_bytes().to_vec()),
146 )));
147 row.add_cell(Cell::new(hex::encode(
148 file.end_key.as_ref().unwrap_or(&"".as_bytes().to_vec()),
149 )));
150 row.add_cell(Cell::new(file.num_deletions));
151 row.add_cell(Cell::new(file.size));
152 table.add_row(row);
153 }
154
155 eprintln!("{}", table);
156 Ok(())
157}
158
159pub fn duplicate_objects_summary(db_path: PathBuf) -> anyhow::Result<(usize, usize, usize, usize)> {
160 let perpetual_tables = AuthorityPerpetualTables::open_readonly(&db_path);
161 let iter = perpetual_tables.objects.safe_iter();
162 let mut total_count = 0;
163 let mut duplicate_count = 0;
164 let mut total_bytes = 0;
165 let mut duplicated_bytes = 0;
166
167 let mut object_id: ObjectID = ObjectID::random();
168 let mut data: HashMap<Vec<u8>, usize> = HashMap::new();
169
170 for item in iter {
171 let (key, value) = item?;
172 if let StoreObject::Value(store_object) = value.migrate().into_inner()
173 && let StoreData::Move(object) = store_object.data
174 {
175 if object_id != key.0 {
176 for (k, cnt) in data.iter() {
177 total_bytes += k.len() * cnt;
178 duplicated_bytes += k.len() * (cnt - 1);
179 total_count += cnt;
180 duplicate_count += cnt - 1;
181 }
182 object_id = key.0;
183 data.clear();
184 }
185 *data.entry(object.contents().to_vec()).or_default() += 1;
186 }
187 }
188 Ok((total_count, duplicate_count, total_bytes, duplicated_bytes))
189}
190
191pub fn compact(db_path: PathBuf) -> anyhow::Result<()> {
192 let perpetual = Arc::new(AuthorityPerpetualTables::open(&db_path, None, None));
193 AuthorityStorePruner::compact(&perpetual)?;
194 Ok(())
195}
196
197pub async fn prune_objects(db_path: PathBuf) -> anyhow::Result<()> {
198 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
199 &db_path.join("store"),
200 None,
201 None,
202 ));
203 let checkpoint_store = CheckpointStore::new(
204 &db_path.join("checkpoints"),
205 Arc::new(PrunerWatermarks::default()),
206 );
207 let rpc_index = RpcIndexStore::new_without_init(&db_path);
208 let highest_pruned_checkpoint = checkpoint_store
209 .get_highest_pruned_checkpoint_seq_number()?
210 .unwrap_or(0);
211 let latest_checkpoint = checkpoint_store.get_highest_executed_checkpoint()?;
212 info!(
213 "Latest executed checkpoint sequence num: {}",
214 latest_checkpoint.map(|x| x.sequence_number).unwrap_or(0)
215 );
216 info!("Highest pruned checkpoint: {}", highest_pruned_checkpoint);
217 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
218 info!("Pruning setup for db at path: {:?}", db_path.display());
219 let pruning_config = AuthorityStorePruningConfig {
220 num_epochs_to_retain: 0,
221 ..Default::default()
222 };
223 info!("Starting object pruning");
224 AuthorityStorePruner::prune_objects_for_eligible_epochs(
225 &perpetual_db,
226 &checkpoint_store,
227 Some(&rpc_index),
228 None,
229 pruning_config,
230 metrics,
231 EPOCH_DURATION_MS_FOR_TESTING,
232 )
233 .await?;
234 Ok(())
235}
236
237pub async fn prune_checkpoints(db_path: PathBuf) -> anyhow::Result<()> {
238 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
239 &db_path.join("store"),
240 None,
241 None,
242 ));
243 let checkpoint_store = CheckpointStore::new(
244 &db_path.join("checkpoints"),
245 Arc::new(PrunerWatermarks::default()),
246 );
247 let rpc_index = RpcIndexStore::new_without_init(&db_path);
248 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
249 info!("Pruning setup for db at path: {:?}", db_path.display());
250 let pruning_config = AuthorityStorePruningConfig {
251 num_epochs_to_retain_for_checkpoints: Some(1),
252 ..Default::default()
253 };
254 info!("Starting txns and effects pruning");
255 use sui_core::authority::authority_store_pruner::PrunerWatermarks;
256 let watermarks = std::sync::Arc::new(PrunerWatermarks::default());
257 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
258 &perpetual_db,
259 &checkpoint_store,
260 Some(&rpc_index),
261 None,
262 pruning_config,
263 metrics,
264 EPOCH_DURATION_MS_FOR_TESTING,
265 &watermarks,
266 )
267 .await?;
268 Ok(())
269}
270
271pub fn dump_table(
273 store_name: StoreName,
274 epoch: Option<EpochId>,
275 db_path: PathBuf,
276 table_name: &str,
277 page_size: u16,
278 page_number: usize,
279) -> anyhow::Result<BTreeMap<String, String>> {
280 match store_name {
281 StoreName::Validator => {
282 let epoch_tables = AuthorityEpochTables::describe_tables();
283 if epoch_tables.contains_key(table_name) {
284 let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
285 AuthorityEpochTables::open_readonly(epoch, &db_path).dump(
286 table_name,
287 page_size,
288 page_number,
289 )
290 } else {
291 let perpetual_tables = AuthorityPerpetualTables::describe_tables();
292 assert!(perpetual_tables.contains_key(table_name));
293 AuthorityPerpetualTables::open_readonly(&db_path).dump(
294 table_name,
295 page_size,
296 page_number,
297 )
298 }
299 }
300 StoreName::Index => {
301 IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default()).dump(
302 table_name,
303 page_size,
304 page_number,
305 )
306 }
307 StoreName::Epoch => {
308 CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
309 .dump(table_name, page_size, page_number)
310 }
311 }
312 .map_err(|err| anyhow!(err.to_string()))
313}
314
315#[cfg(test)]
316mod test {
317 use sui_core::authority::authority_per_epoch_store::AuthorityEpochTables;
318 use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
319
320 use crate::db_tool::db_dump::{StoreName, dump_table, list_tables};
321
322 #[tokio::test]
323 async fn db_dump_population() -> Result<(), anyhow::Error> {
324 let primary_path = tempfile::tempdir()?.keep();
325
326 let _: AuthorityEpochTables = AuthorityEpochTables::open(0, &primary_path, None);
328 let _: AuthorityPerpetualTables = AuthorityPerpetualTables::open(&primary_path, None, None);
329
330 let tables = {
332 let mut epoch_tables =
333 list_tables(AuthorityEpochTables::path(0, &primary_path)).unwrap();
334 let mut perpetual_tables =
335 list_tables(AuthorityPerpetualTables::path(&primary_path)).unwrap();
336 epoch_tables.append(&mut perpetual_tables);
337 epoch_tables
338 };
339
340 let mut missing_tables = vec![];
341 for t in tables {
342 println!("{}", t);
343 if dump_table(
344 StoreName::Validator,
345 Some(0),
346 primary_path.clone(),
347 &t,
348 0,
349 0,
350 )
351 .is_err()
352 {
353 missing_tables.push(t);
354 }
355 }
356 if missing_tables.is_empty() {
357 return Ok(());
358 }
359 panic!(
360 "{}",
361 format!(
362 "Missing {} table(s) from DB dump registration function: {:?} \n Update the dump function.",
363 missing_tables.len(),
364 missing_tables
365 )
366 );
367 }
368}