1use super::authority_store_tables::AuthorityPerpetualTables;
5use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
6use crate::jsonrpc_index::IndexStore;
7use crate::rpc_index::RpcIndexStore;
8use anyhow::anyhow;
9use mysten_metrics::{monitored_scope, spawn_monitored_task};
10use once_cell::sync::Lazy;
11use prometheus::{
12 IntCounter, IntGauge, Registry, register_int_counter_with_registry,
13 register_int_gauge_with_registry,
14};
15#[cfg(tidehunter)]
16use serde::de::DeserializeOwned;
17use std::cmp::{max, min};
18use std::collections::{BTreeSet, HashMap};
19use std::sync::Mutex;
20use std::sync::atomic::AtomicU64;
21use std::time::{SystemTime, UNIX_EPOCH};
22use std::{sync::Arc, time::Duration};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_rpc_store::Store as RpcStore;
25use sui_types::committee::EpochId;
26use sui_types::effects::TransactionEffects;
27use sui_types::effects::TransactionEffectsAPI;
28use sui_types::message_envelope::Message;
29use sui_types::messages_checkpoint::{
30 CheckpointContents, CheckpointDigest, CheckpointSequenceNumber,
31};
32use sui_types::{
33 base_types::{ObjectID, SequenceNumber, TransactionDigest, VersionNumber},
34 storage::ObjectKey,
35};
36use tokio::sync::oneshot::{self, Sender};
37use tokio::time::Instant;
38use tracing::{debug, error, info, warn};
39use typed_store::rocksdb::LiveFile;
40use typed_store::{Map, TypedStoreError};
41
42static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
43 [
44 "objects",
45 "effects",
46 "transactions",
47 "events",
48 "executed_effects",
49 "executed_transactions_to_checkpoint",
50 ]
51 .into_iter()
52 .map(|cf| cf.to_string())
53 .collect()
54});
55pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
56pub struct AuthorityStorePruner {
57 _objects_pruner_cancel_handle: oneshot::Sender<()>,
58}
59
60#[derive(Default)]
61pub struct PrunerWatermarks {
62 pub epoch_id: Arc<AtomicU64>,
63 pub checkpoint_id: Arc<AtomicU64>,
64}
65
66static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
67
68pub struct AuthorityStorePruningMetrics {
69 pub last_pruned_checkpoint: IntGauge,
70 pub num_pruned_objects: IntCounter,
71 pub num_pruned_tombstones: IntCounter,
72 pub last_pruned_effects_checkpoint: IntGauge,
73 pub last_pruned_indexes_transaction: IntGauge,
74 pub num_epochs_to_retain_for_objects: IntGauge,
75 pub num_epochs_to_retain_for_checkpoints: IntGauge,
76}
77
78impl AuthorityStorePruningMetrics {
79 pub fn new(registry: &Registry) -> Arc<Self> {
80 let this = Self {
81 last_pruned_checkpoint: register_int_gauge_with_registry!(
82 "last_pruned_checkpoint",
83 "Last pruned checkpoint",
84 registry
85 )
86 .unwrap(),
87 num_pruned_objects: register_int_counter_with_registry!(
88 "num_pruned_objects",
89 "Number of pruned objects",
90 registry
91 )
92 .unwrap(),
93 num_pruned_tombstones: register_int_counter_with_registry!(
94 "num_pruned_tombstones",
95 "Number of pruned tombstones",
96 registry
97 )
98 .unwrap(),
99 last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
100 "last_pruned_effects_checkpoint",
101 "Last pruned effects checkpoint",
102 registry
103 )
104 .unwrap(),
105 last_pruned_indexes_transaction: register_int_gauge_with_registry!(
106 "last_pruned_indexes_transaction",
107 "Last pruned indexes transaction",
108 registry
109 )
110 .unwrap(),
111 num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
112 "num_epochs_to_retain_for_objects",
113 "Number of epochs to retain for objects",
114 registry
115 )
116 .unwrap(),
117 num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
118 "num_epochs_to_retain_for_checkpoints",
119 "Number of epochs to retain for checkpoints",
120 registry
121 )
122 .unwrap(),
123 };
124 Arc::new(this)
125 }
126
127 pub fn new_for_test() -> Arc<Self> {
128 Self::new(&Registry::new())
129 }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq)]
133pub enum PruningMode {
134 Objects,
135 Checkpoints,
136}
137
138impl AuthorityStorePruner {
139 #[cfg(not(tidehunter))]
141 async fn prune_objects_and_indexes(
142 transaction_effects: Vec<TransactionEffects>,
143 perpetual_db: &Arc<AuthorityPerpetualTables>,
144 checkpoint_number: CheckpointSequenceNumber,
145 metrics: Arc<AuthorityStorePruningMetrics>,
146 pruned_tx_seq_exclusive: u64,
147 rpc_index: Option<&RpcIndexStore>,
148 rpc_store: Option<&RpcStore>,
149 enable_pruning_tombstones: bool,
150 ) -> anyhow::Result<()> {
151 let _scope = monitored_scope("ObjectsLivePruner");
152 let mut wb = perpetual_db.objects.batch();
153
154 let mut live_object_keys_to_prune = vec![];
156 let mut object_tombstones_to_prune = vec![];
157 for effects in &transaction_effects {
158 for (object_id, seq_number) in effects.modified_at_versions() {
159 live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
160 }
161
162 if enable_pruning_tombstones {
163 for deleted_object_key in effects.all_tombstones() {
164 object_tombstones_to_prune
165 .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
166 }
167 }
168 }
169
170 metrics
171 .num_pruned_objects
172 .inc_by(live_object_keys_to_prune.len() as u64);
173 metrics
174 .num_pruned_tombstones
175 .inc_by(object_tombstones_to_prune.len() as u64);
176
177 let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
178 for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
179 updates
180 .entry(object_id)
181 .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
182 .or_insert((seq_number, seq_number));
183 }
184
185 for (object_id, (min_version, max_version)) in updates {
186 debug!(
187 "Pruning object {:?} versions {:?} - {:?}",
188 object_id, min_version, max_version
189 );
190 let start_range = ObjectKey(object_id, min_version);
191 let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
192 wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
193 }
194
195 if !object_tombstones_to_prune.is_empty() {
201 let mut object_keys_to_delete = vec![];
202 for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
203 for result in perpetual_db.objects.safe_iter_with_bounds(
204 Some(ObjectKey(object_id, VersionNumber::MIN)),
205 Some(ObjectKey(object_id, seq_number.next())),
206 ) {
207 let (object_key, _) = result?;
208 assert_eq!(object_key.0, object_id);
209 object_keys_to_delete.push(object_key);
210 }
211 }
212
213 wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
214 }
215
216 perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
217 metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
218
219 wb.write()?;
220
221 if let Some(rpc_index) = rpc_index {
222 rpc_index.prune(checkpoint_number, pruned_tx_seq_exclusive)?;
223 }
224 if let Some(rpc_store) = rpc_store {
225 sui_rpc_store::prune_history_cohort(
228 rpc_store.db(),
229 rpc_store.schema(),
230 checkpoint_number,
231 pruned_tx_seq_exclusive,
232 )?;
233 }
234
235 Ok(())
236 }
237
238 #[cfg(tidehunter)]
239 async fn prune_objects_and_indexes(
240 transaction_effects: Vec<TransactionEffects>,
241 perpetual_db: &Arc<AuthorityPerpetualTables>,
242 checkpoint_number: CheckpointSequenceNumber,
243 metrics: Arc<AuthorityStorePruningMetrics>,
244 pruned_tx_seq_exclusive: u64,
245 rpc_index: Option<&RpcIndexStore>,
246 rpc_store: Option<&RpcStore>,
247 _: bool,
248 ) -> anyhow::Result<()> {
249 let _scope = monitored_scope("ObjectsLivePruner");
250 let mut wb = perpetual_db.objects.batch();
251 let mut objects_to_prune = vec![];
252
253 for effects in &transaction_effects {
254 for (object_id, version) in effects
255 .modified_at_versions()
256 .into_iter()
257 .chain(effects.all_tombstones())
258 {
259 debug!("Pruning object {:?} version {:?}", object_id, version);
260 objects_to_prune.push(ObjectKey(object_id, version));
261 }
262 }
263 metrics
264 .num_pruned_objects
265 .inc_by(objects_to_prune.len() as u64);
266 wb.delete_batch(&perpetual_db.objects, &objects_to_prune)?;
267
268 perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
269 metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
270 wb.write()?;
271
272 if let Some(rpc_index) = rpc_index {
273 rpc_index.prune(checkpoint_number, pruned_tx_seq_exclusive)?;
274 }
275 if let Some(rpc_store) = rpc_store {
276 sui_rpc_store::prune_history_cohort(
279 rpc_store.db(),
280 rpc_store.schema(),
281 checkpoint_number,
282 pruned_tx_seq_exclusive,
283 )?;
284 }
285
286 Ok(())
287 }
288
289 fn prune_checkpoints(
290 perpetual_db: &Arc<AuthorityPerpetualTables>,
291 checkpoint_db: &Arc<CheckpointStore>,
292 checkpoint_number: CheckpointSequenceNumber,
293 checkpoints_to_prune: Vec<CheckpointDigest>,
294 checkpoint_content_to_prune: Vec<CheckpointContents>,
295 effects_to_prune: &Vec<TransactionEffects>,
296 metrics: Arc<AuthorityStorePruningMetrics>,
297 ) -> anyhow::Result<()> {
298 let _scope = monitored_scope("EffectsLivePruner");
299
300 let mut perpetual_batch = perpetual_db.objects.batch();
301 let transactions: Vec<_> = checkpoint_content_to_prune
302 .iter()
303 .flat_map(|content| content.iter().map(|tx| tx.transaction))
304 .collect();
305
306 perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
307 perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
308 perpetual_batch.delete_batch(
309 &perpetual_db.executed_transactions_to_checkpoint,
310 transactions.iter(),
311 )?;
312
313 let mut effect_digests = vec![];
314 for effects in effects_to_prune {
315 let effects_digest = effects.digest();
316 debug!("Pruning effects {:?}", effects_digest);
317 effect_digests.push(effects_digest);
318
319 if effects.events_digest().is_some() {
320 perpetual_batch
321 .delete_batch(&perpetual_db.events_2, [effects.transaction_digest()])?;
322 }
323 }
324 perpetual_batch.delete_batch(
325 &perpetual_db.unchanged_loaded_runtime_objects,
326 transactions.iter(),
327 )?;
328 perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
329
330 let mut checkpoints_batch = checkpoint_db.tables.certified_checkpoints.batch();
331
332 let checkpoint_content_digests =
333 checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
334 checkpoints_batch.delete_batch(
335 &checkpoint_db.tables.checkpoint_content,
336 checkpoint_content_digests.clone(),
337 )?;
338 checkpoints_batch.delete_batch(
339 &checkpoint_db.tables.checkpoint_sequence_by_contents_digest,
340 checkpoint_content_digests,
341 )?;
342
343 checkpoints_batch.delete_batch(
344 &checkpoint_db.tables.checkpoint_by_digest,
345 checkpoints_to_prune,
346 )?;
347
348 checkpoints_batch.insert_batch(
349 &checkpoint_db.tables.watermarks,
350 [(
351 &CheckpointWatermark::HighestPruned,
352 &(checkpoint_number, CheckpointDigest::random()),
353 )],
354 )?;
355
356 perpetual_batch.write()?;
357 checkpoints_batch.write()?;
358 metrics
359 .last_pruned_effects_checkpoint
360 .set(checkpoint_number as i64);
361
362 Ok(())
363 }
364
365 pub async fn prune_objects_for_eligible_epochs(
367 perpetual_db: &Arc<AuthorityPerpetualTables>,
368 checkpoint_store: &Arc<CheckpointStore>,
369 rpc_index: Option<&RpcIndexStore>,
370 rpc_store: Option<&RpcStore>,
371 config: AuthorityStorePruningConfig,
372 metrics: Arc<AuthorityStorePruningMetrics>,
373 epoch_duration_ms: u64,
374 ) -> anyhow::Result<()> {
375 let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
376 let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
377 .get_highest_executed_checkpoint()?
378 .map(|c| (*c.sequence_number(), c.epoch))
379 .unwrap_or_default();
380 let pruned_checkpoint_number = perpetual_db
381 .get_highest_pruned_checkpoint()?
382 .unwrap_or_default();
383 if config.smooth && config.num_epochs_to_retain > 0 {
384 max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
385 checkpoint_store,
386 max_eligible_checkpoint_number,
387 pruned_checkpoint_number,
388 epoch_id,
389 epoch_duration_ms,
390 config.num_epochs_to_retain,
391 )?;
392 }
393 Self::prune_for_eligible_epochs(
394 perpetual_db,
395 checkpoint_store,
396 rpc_index,
397 rpc_store,
398 PruningMode::Objects,
399 config.num_epochs_to_retain,
400 pruned_checkpoint_number,
401 max_eligible_checkpoint_number,
402 config,
403 metrics.clone(),
404 )
405 .await
406 }
407
408 pub async fn prune_checkpoints_for_eligible_epochs(
409 perpetual_db: &Arc<AuthorityPerpetualTables>,
410 checkpoint_store: &Arc<CheckpointStore>,
411 rpc_index: Option<&RpcIndexStore>,
412 rpc_store: Option<&RpcStore>,
413 config: AuthorityStorePruningConfig,
414 metrics: Arc<AuthorityStorePruningMetrics>,
415 epoch_duration_ms: u64,
416 pruner_watermarks: &Arc<PrunerWatermarks>,
417 ) -> anyhow::Result<()> {
418 let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
419 let pruned_checkpoint_number = checkpoint_store
420 .get_highest_pruned_checkpoint_seq_number()?
421 .unwrap_or(0);
422 let (mut max_eligible_checkpoint, epoch_id) = checkpoint_store
423 .get_highest_executed_checkpoint()?
424 .map(|c| (*c.sequence_number(), c.epoch))
425 .unwrap_or_default();
426 if config.num_epochs_to_retain != u64::MAX {
427 max_eligible_checkpoint = min(
428 max_eligible_checkpoint,
429 perpetual_db
430 .get_highest_pruned_checkpoint()?
431 .unwrap_or_default(),
432 );
433 }
434 if config.smooth
435 && let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints
436 {
437 max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
438 checkpoint_store,
439 max_eligible_checkpoint,
440 pruned_checkpoint_number,
441 epoch_id,
442 epoch_duration_ms,
443 num_epochs_to_retain,
444 )?;
445 }
446 debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
447 Self::prune_for_eligible_epochs(
448 perpetual_db,
449 checkpoint_store,
450 rpc_index,
451 rpc_store,
452 PruningMode::Checkpoints,
453 config
454 .num_epochs_to_retain_for_checkpoints()
455 .ok_or_else(|| anyhow!("config value not set"))?,
456 pruned_checkpoint_number,
457 max_eligible_checkpoint,
458 config.clone(),
459 metrics.clone(),
460 )
461 .await?;
462
463 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
464 Self::update_pruning_watermarks(
465 perpetual_db,
466 checkpoint_store,
467 num_epochs_to_retain,
468 pruner_watermarks,
469 false,
470 )?;
471 }
472 Ok(())
473 }
474
475 pub async fn prune_for_eligible_epochs(
477 perpetual_db: &Arc<AuthorityPerpetualTables>,
478 checkpoint_store: &Arc<CheckpointStore>,
479 rpc_index: Option<&RpcIndexStore>,
480 rpc_store: Option<&RpcStore>,
481 mode: PruningMode,
482 num_epochs_to_retain: u64,
483 starting_checkpoint_number: CheckpointSequenceNumber,
484 max_eligible_checkpoint: CheckpointSequenceNumber,
485 config: AuthorityStorePruningConfig,
486 metrics: Arc<AuthorityStorePruningMetrics>,
487 ) -> anyhow::Result<()> {
488 let _scope = monitored_scope("PruneForEligibleEpochs");
489
490 let mut checkpoint_number = starting_checkpoint_number;
491 let current_epoch = checkpoint_store
492 .get_highest_executed_checkpoint()?
493 .map(|c| c.epoch())
494 .unwrap_or_default();
495
496 let mut checkpoints_to_prune = vec![];
497 let mut checkpoint_content_to_prune = vec![];
498 let mut effects_to_prune = vec![];
499 let mut pruned_tx_seq_exclusive = 0u64;
504
505 loop {
506 let Some(ckpt) = checkpoint_store
507 .tables
508 .certified_checkpoints
509 .get(&(checkpoint_number + 1))?
510 else {
511 break;
512 };
513 let checkpoint = ckpt.into_inner();
514 if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
519 || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
520 {
521 break;
522 }
523 checkpoint_number = *checkpoint.sequence_number();
524 pruned_tx_seq_exclusive = checkpoint.network_total_transactions;
525
526 let content = checkpoint_store
527 .get_checkpoint_contents(&checkpoint.content_digest)?
528 .ok_or_else(|| {
529 anyhow::anyhow!(
530 "checkpoint content data is missing: {}",
531 checkpoint.sequence_number
532 )
533 })?;
534 let effects = perpetual_db
535 .effects
536 .multi_get(content.iter().map(|tx| tx.effects))?;
537
538 info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
539 checkpoints_to_prune.push(*checkpoint.digest());
540 checkpoint_content_to_prune.push(content);
541 effects_to_prune.extend(effects.into_iter().flatten());
542
543 if effects_to_prune.len() >= config.max_transactions_in_batch
544 || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
545 {
546 match mode {
547 PruningMode::Objects => {
548 Self::prune_objects_and_indexes(
549 effects_to_prune,
550 perpetual_db,
551 checkpoint_number,
552 metrics.clone(),
553 pruned_tx_seq_exclusive,
554 rpc_index,
555 rpc_store,
556 !config.killswitch_tombstone_pruning,
557 )
558 .await?
559 }
560 PruningMode::Checkpoints => Self::prune_checkpoints(
561 perpetual_db,
562 checkpoint_store,
563 checkpoint_number,
564 checkpoints_to_prune,
565 checkpoint_content_to_prune,
566 &effects_to_prune,
567 metrics.clone(),
568 )?,
569 };
570 checkpoints_to_prune = vec![];
571 checkpoint_content_to_prune = vec![];
572 effects_to_prune = vec![];
573 tokio::task::yield_now().await;
575 }
576 }
577
578 if !checkpoints_to_prune.is_empty() {
579 match mode {
580 PruningMode::Objects => {
581 Self::prune_objects_and_indexes(
582 effects_to_prune,
583 perpetual_db,
584 checkpoint_number,
585 metrics.clone(),
586 pruned_tx_seq_exclusive,
587 rpc_index,
588 rpc_store,
589 !config.killswitch_tombstone_pruning,
590 )
591 .await?
592 }
593 PruningMode::Checkpoints => Self::prune_checkpoints(
594 perpetual_db,
595 checkpoint_store,
596 checkpoint_number,
597 checkpoints_to_prune,
598 checkpoint_content_to_prune,
599 &effects_to_prune,
600 metrics.clone(),
601 )?,
602 };
603 }
604 Ok(())
605 }
606
607 fn prune_indexes(
608 indexes: Option<&IndexStore>,
609 config: &AuthorityStorePruningConfig,
610 epoch_duration_ms: u64,
611 metrics: &AuthorityStorePruningMetrics,
612 ) -> anyhow::Result<()> {
613 if let (Some(mut epochs_to_retain), Some(indexes)) =
614 (config.num_epochs_to_retain_for_indexes, indexes)
615 {
616 if epochs_to_retain < 7 {
617 warn!("num_epochs_to_retain_for_indexes is too low. Reseting it to 7");
618 epochs_to_retain = 7;
619 }
620 let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
621 if let Some(cut_time_ms) =
622 u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
623 {
624 let transaction_id = indexes.prune(cut_time_ms)?;
625 metrics
626 .last_pruned_indexes_transaction
627 .set(transaction_id as i64);
628 }
629 }
630 Ok(())
631 }
632
633 async fn prune_executed_tx_digests(
634 perpetual_db: &Arc<AuthorityPerpetualTables>,
635 checkpoint_store: &Arc<CheckpointStore>,
636 ) -> anyhow::Result<()> {
637 let current_epoch = checkpoint_store
638 .get_highest_executed_checkpoint()?
639 .map(|c| c.epoch)
640 .unwrap_or_default();
641
642 if current_epoch < 2 {
643 return Ok(());
644 }
645
646 let target_epoch = current_epoch - 1;
647
648 let start_key = (0u64, TransactionDigest::ZERO);
649 let end_key = (target_epoch, TransactionDigest::ZERO);
650
651 info!(
652 "Pruning executed_transaction_digests for epochs < {} (current epoch: {})",
653 target_epoch, current_epoch
654 );
655
656 let mut batch = perpetual_db.executed_transaction_digests.batch();
657 batch.schedule_delete_range(
658 &perpetual_db.executed_transaction_digests,
659 &start_key,
660 &end_key,
662 )?;
663 batch.write()?;
664 Ok(())
665 }
666
667 #[cfg(tidehunter)]
668 fn prune_executed_tx_digests_th(
669 perpetual_db: &Arc<AuthorityPerpetualTables>,
670 checkpoint_store: &Arc<CheckpointStore>,
671 ) -> anyhow::Result<()> {
672 let current_epoch = checkpoint_store
673 .get_highest_executed_checkpoint()?
674 .map(|c| c.epoch)
675 .unwrap_or_default();
676
677 if current_epoch < 2 {
678 return Ok(());
679 }
680
681 let last_epoch_to_delete = current_epoch - 2;
682 let from_key = (0u64, TransactionDigest::ZERO);
683 let to_key = (last_epoch_to_delete, TransactionDigest::new([0xff; 32]));
684 info!(
685 "Pruning executed_transaction_digests for epochs 0 to {} (current epoch: {})",
686 last_epoch_to_delete, current_epoch
687 );
688 perpetual_db
689 .executed_transaction_digests
690 .drop_cells_in_range(&from_key, &to_key)?;
691 Ok(())
692 }
693
694 fn update_pruning_watermarks(
695 perpetual_db: &Arc<AuthorityPerpetualTables>,
696 checkpoint_store: &Arc<CheckpointStore>,
697 num_epochs_to_retain: u64,
698 pruning_watermark: &Arc<PrunerWatermarks>,
699 objects_compactor_active: bool,
700 ) -> anyhow::Result<bool> {
701 use std::sync::atomic::Ordering;
702 let objects_pruning_checkpoint_id = perpetual_db
703 .get_highest_pruned_checkpoint()?
704 .unwrap_or_default();
705 let objects_pruning_epoch_id = checkpoint_store
706 .get_checkpoint_by_sequence_number(objects_pruning_checkpoint_id)?
707 .map(|chk| chk.epoch)
708 .unwrap_or_default();
709
710 let current_watermark = pruning_watermark.epoch_id.load(Ordering::Relaxed);
711 let current_epoch_id = checkpoint_store
712 .get_highest_executed_checkpoint()?
713 .map(|c| c.epoch)
714 .unwrap_or_default();
715 if current_epoch_id < num_epochs_to_retain {
716 return Ok(false);
717 }
718 let target_epoch_id = current_epoch_id - num_epochs_to_retain;
719 let checkpoint_id =
720 checkpoint_store.get_epoch_last_checkpoint_seq_number(target_epoch_id)?;
721
722 let new_watermark = if objects_compactor_active {
725 target_epoch_id + 1
726 } else {
727 min(target_epoch_id + 1, objects_pruning_epoch_id)
728 };
729 if current_watermark == new_watermark {
730 return Ok(false);
731 }
732 info!("relocation: setting epoch watermark to {}", new_watermark);
733 pruning_watermark
734 .epoch_id
735 .store(new_watermark, Ordering::Relaxed);
736 if let Some(checkpoint_id) = checkpoint_id {
737 let watermark = if objects_compactor_active {
738 checkpoint_id
739 } else {
740 min(checkpoint_id, objects_pruning_checkpoint_id)
741 };
742 info!("relocation: setting checkpoint watermark to {}", watermark);
743 pruning_watermark
744 .checkpoint_id
745 .store(watermark, Ordering::Relaxed);
746 }
747 Ok(true)
748 }
749
750 #[cfg(tidehunter)]
751 fn prune_th(
752 perpetual_db: &Arc<AuthorityPerpetualTables>,
753 checkpoint_store: &Arc<CheckpointStore>,
754 num_epochs_to_retain: u64,
755 pruning_watermark: Arc<PrunerWatermarks>,
756 objects_compactor_active: bool,
757 ) -> anyhow::Result<()> {
758 let watermark_updated = Self::update_pruning_watermarks(
759 perpetual_db,
760 checkpoint_store,
761 num_epochs_to_retain,
762 &pruning_watermark,
763 objects_compactor_active,
764 )?;
765 if !watermark_updated {
766 info!("skip relocation. Watermark hasn't changed");
767 return Ok(());
768 }
769 perpetual_db.objects.db.start_relocation()?;
770 checkpoint_store.tables.watermarks.db.start_relocation()?;
771 Self::prune_executed_tx_digests_th(perpetual_db, checkpoint_store)?;
772 Ok(())
773 }
774
775 fn compact_next_sst_file(
776 perpetual_db: Arc<AuthorityPerpetualTables>,
777 delay_days: usize,
778 last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
779 ) -> anyhow::Result<Option<LiveFile>> {
780 let db_path = perpetual_db.objects.db.path_for_pruning();
781 let mut state = last_processed
782 .lock()
783 .expect("failed to obtain a lock for last processed SST files");
784 let mut sst_file_for_compaction: Option<LiveFile> = None;
785 let time_threshold =
786 SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
787 for sst_file in perpetual_db.objects.db.live_files()? {
788 let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
789 let last_modified = std::fs::metadata(file_path)?.modified()?;
790 if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
791 || sst_file.level < 1
792 || sst_file.start_key.is_none()
793 || sst_file.end_key.is_none()
794 || last_modified > time_threshold
795 || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
796 {
797 continue;
798 }
799 if let Some(candidate) = &sst_file_for_compaction
800 && candidate.size > sst_file.size
801 {
802 continue;
803 }
804 sst_file_for_compaction = Some(sst_file);
805 }
806 let Some(sst_file) = sst_file_for_compaction else {
807 return Ok(None);
808 };
809 info!(
810 "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
811 sst_file.name, sst_file.size, sst_file.level
812 );
813 perpetual_db.objects.compact_range_raw(
814 &sst_file.column_family_name,
815 sst_file.start_key.clone().unwrap(),
816 sst_file.end_key.clone().unwrap(),
817 )?;
818 state.insert(sst_file.name.clone(), SystemTime::now());
819 Ok(Some(sst_file))
820 }
821
822 fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
823 min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
824 }
825
826 fn smoothed_max_eligible_checkpoint_number(
827 checkpoint_store: &Arc<CheckpointStore>,
828 mut max_eligible_checkpoint: CheckpointSequenceNumber,
829 pruned_checkpoint: CheckpointSequenceNumber,
830 epoch_id: EpochId,
831 epoch_duration_ms: u64,
832 num_epochs_to_retain: u64,
833 ) -> anyhow::Result<CheckpointSequenceNumber> {
834 if epoch_id < num_epochs_to_retain {
835 return Ok(0);
836 }
837 let last_checkpoint_in_epoch = checkpoint_store
838 .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
839 .map(|checkpoint| checkpoint.sequence_number)
840 .unwrap_or_default();
841 max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
842 if max_eligible_checkpoint == 0 {
843 return Ok(max_eligible_checkpoint);
844 }
845 let num_intervals = epoch_duration_ms
846 .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
847 .unwrap_or(1);
848 let delta = max_eligible_checkpoint
849 .checked_sub(pruned_checkpoint)
850 .unwrap_or_default()
851 .checked_div(num_intervals)
852 .unwrap_or(1);
853 Ok(pruned_checkpoint + delta)
854 }
855
856 fn setup_pruning(
857 config: AuthorityStorePruningConfig,
858 epoch_duration_ms: u64,
859 perpetual_db: Arc<AuthorityPerpetualTables>,
860 checkpoint_store: Arc<CheckpointStore>,
861 rpc_index: Option<Arc<RpcIndexStore>>,
862 rpc_store: Option<RpcStore>,
863 jsonrpc_index: Option<Arc<IndexStore>>,
864 metrics: Arc<AuthorityStorePruningMetrics>,
865 pruner_watermarks: Arc<PrunerWatermarks>,
866 ) -> Sender<()> {
867 let (sender, mut recv) = tokio::sync::oneshot::channel();
868 debug!(
869 "Starting object pruning service with num_epochs_to_retain={}",
870 config.num_epochs_to_retain
871 );
872
873 let tick_duration =
874 Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
875 let pruning_initial_delay = if cfg!(msim) {
876 Duration::from_millis(1)
877 } else {
878 Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
879 };
880 let mut objects_prune_interval =
881 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
882 let mut checkpoints_prune_interval =
883 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
884
885 metrics
886 .num_epochs_to_retain_for_objects
887 .set(config.num_epochs_to_retain as i64);
888 metrics.num_epochs_to_retain_for_checkpoints.set(
889 config
890 .num_epochs_to_retain_for_checkpoints
891 .unwrap_or_default() as i64,
892 );
893
894 #[cfg(tidehunter)]
895 {
896 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
897 let prune_objects = config.num_epochs_to_retain != u64::MAX;
898 tokio::task::spawn(async move {
899 loop {
900 tokio::select! {
901 _ = objects_prune_interval.tick(), if prune_objects => {
902 if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), rpc_store.as_ref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
903 error!("Failed to prune objects: {:?}", err);
904 }
905 },
906 _ = checkpoints_prune_interval.tick() => {
907 if let Err(err) = Self::prune_th(&perpetual_db, &checkpoint_store, num_epochs_to_retain, pruner_watermarks.clone(), !prune_objects) {
908 error!("Failed to prune checkpoints: {:?}", err);
909 }
910 },
911 _ = &mut recv => break,
912 }
913 }
914 });
915 }
916 }
917 #[cfg(not(tidehunter))]
918 {
919 let mut indexes_prune_interval =
920 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
921
922 let perpetual_db_for_compaction = perpetual_db.clone();
923 if let Some(delay_days) = config.periodic_compaction_threshold_days {
924 spawn_monitored_task!(async move {
925 let last_processed = Arc::new(Mutex::new(HashMap::new()));
926 loop {
927 let db = perpetual_db_for_compaction.clone();
928 let state = Arc::clone(&last_processed);
929 let result = tokio::task::spawn_blocking(move || {
930 Self::compact_next_sst_file(db, delay_days, state)
931 })
932 .await;
933 let mut sleep_interval_secs = 1;
934 match result {
935 Err(err) => error!("Failed to compact sst file: {:?}", err),
936 Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
937 Ok(Ok(None)) => {
938 sleep_interval_secs = 3600;
939 }
940 _ => {}
941 }
942 tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
943 }
944 });
945 }
946 tokio::task::spawn(async move {
947 loop {
948 tokio::select! {
949 _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
950 if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), rpc_store.as_ref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
951 error!("Failed to prune objects: {:?}", err);
952 }
953 if let Err(err) = Self::prune_executed_tx_digests(&perpetual_db, &checkpoint_store).await {
954 error!("Failed to prune executed_tx_digests: {:?}", err);
955 }
956 },
957 _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
958 if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), rpc_store.as_ref(), config.clone(), metrics.clone(), epoch_duration_ms, &pruner_watermarks).await {
959 error!("Failed to prune checkpoints: {:?}", err);
960 }
961 },
962 _ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
963 if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
964 error!("Failed to prune indexes: {:?}", err);
965 }
966 }
967 _ = &mut recv => break,
968 }
969 }
970 });
971 }
972 sender
973 }
974
975 pub fn new(
976 perpetual_db: Arc<AuthorityPerpetualTables>,
977 checkpoint_store: Arc<CheckpointStore>,
978 rpc_index: Option<Arc<RpcIndexStore>>,
979 rpc_store: Option<RpcStore>,
980 jsonrpc_index: Option<Arc<IndexStore>>,
981 mut pruning_config: AuthorityStorePruningConfig,
982 is_validator: bool,
983 epoch_duration_ms: u64,
984 registry: &Registry,
985 pruner_watermarks: Arc<PrunerWatermarks>, ) -> Self {
987 #[cfg(tidehunter)]
995 {
996 let objects_compactor_enabled =
997 is_validator || pruning_config.num_epochs_to_retain == 0;
998 if objects_compactor_enabled && pruning_config.num_epochs_to_retain != u64::MAX {
999 info!(
1000 "Tidehunter: disabling object pruner (was num_epochs_to_retain={}). The objects compactor performs equivalent compaction.",
1001 pruning_config.num_epochs_to_retain
1002 );
1003 pruning_config.num_epochs_to_retain = u64::MAX;
1004 }
1005 }
1006
1007 if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
1008 {
1009 warn!(
1010 "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
1011 pruning_config.num_epochs_to_retain
1012 );
1013 if is_validator {
1014 warn!("Resetting to aggressive pruner.");
1015 pruning_config.num_epochs_to_retain = 0;
1016 } else {
1017 warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
1018 }
1019 }
1020 AuthorityStorePruner {
1021 _objects_pruner_cancel_handle: Self::setup_pruning(
1022 pruning_config,
1023 epoch_duration_ms,
1024 perpetual_db,
1025 checkpoint_store,
1026 rpc_index,
1027 rpc_store,
1028 jsonrpc_index,
1029 AuthorityStorePruningMetrics::new(registry),
1030 pruner_watermarks,
1031 ),
1032 }
1033 }
1034
1035 pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
1036 perpetual_db.objects.compact_range(
1037 &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
1038 &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
1039 )
1040 }
1041}
1042
1043#[cfg(tidehunter)]
1044pub(crate) fn apply_relocation_filter<T: DeserializeOwned>(
1045 config: typed_store::tidehunter_util::KeySpaceConfig,
1046 pruner_watermark: Arc<AtomicU64>,
1047 extractor: impl Fn(T) -> u64 + Send + Sync + 'static,
1048 by_key: bool,
1049) -> typed_store::tidehunter_util::KeySpaceConfig {
1050 use bincode::Options;
1051 use std::sync::atomic::Ordering;
1052 use typed_store::tidehunter_util::Decision;
1053 config.with_relocation_filter(move |key, value| {
1054 let data = if by_key {
1055 bincode::DefaultOptions::new()
1056 .with_big_endian()
1057 .with_fixint_encoding()
1058 .deserialize(&key)
1059 .expect("relocation filter deserialization error")
1060 } else {
1061 bcs::from_bytes(&value).expect("relocation filter deserialization error")
1062 };
1063 if extractor(data) < pruner_watermark.load(Ordering::Relaxed) {
1064 Decision::Remove
1065 } else {
1066 Decision::StopRelocation
1067 }
1068 })
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073 use more_asserts as ma;
1074 use std::path::Path;
1075 use std::time::Duration;
1076 use std::{collections::HashSet, sync::Arc};
1077 use tracing::log::info;
1078
1079 use crate::authority::authority_store_pruner::AuthorityStorePruningMetrics;
1080 use crate::authority::authority_store_tables::AuthorityPerpetualTables;
1081 use crate::authority::authority_store_types::{
1082 StoreObject, StoreObjectWrapper, get_store_object,
1083 };
1084 use prometheus::Registry;
1085 use sui_types::base_types::ObjectDigest;
1086 use sui_types::effects::TransactionEffects;
1087 use sui_types::effects::TransactionEffectsAPI;
1088 use sui_types::{
1089 base_types::{ObjectID, SequenceNumber},
1090 object::Object,
1091 storage::ObjectKey,
1092 };
1093 use typed_store::Map;
1094 use typed_store::rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options};
1095
1096 use super::AuthorityStorePruner;
1097
1098 fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
1099 let perpetual_db_path = path.join(Path::new("perpetual"));
1100 let cf_names = AuthorityPerpetualTables::describe_tables();
1101 let cfs: Vec<_> = cf_names
1102 .keys()
1103 .map(|x| (x.as_str(), default_db_options().options))
1104 .collect();
1105 let perpetual_db = typed_store::rocks::open_cf_opts(
1106 perpetual_db_path,
1107 None,
1108 MetricConf::new("perpetual_pruning"),
1109 &cfs,
1110 );
1111
1112 let mut after_pruning = HashSet::new();
1113 let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
1114 &perpetual_db?,
1115 Some("objects"),
1116 &ReadWriteOptions::default(),
1119 false,
1120 )?;
1121 let iter = objects.safe_iter();
1122 for item in iter {
1123 after_pruning.insert(item?.0);
1124 }
1125 Ok(after_pruning)
1126 }
1127
1128 type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
1129
1130 fn generate_test_data(
1131 db: Arc<AuthorityPerpetualTables>,
1132 num_versions_per_object: u64,
1133 num_object_versions_to_retain: u64,
1134 total_unique_object_ids: u32,
1135 ) -> Result<GenerateTestDataResult, anyhow::Error> {
1136 assert!(num_versions_per_object >= num_object_versions_to_retain);
1137
1138 let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
1139 let mut batch = db.objects.batch();
1140
1141 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
1142 for id in ids {
1143 for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
1144 let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
1145 if counter < num_object_versions_to_retain.try_into().unwrap() {
1146 to_keep.push(object_key);
1148 } else {
1149 to_delete.push(object_key);
1150 }
1151 let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1152 batch.insert_batch(
1153 &db.objects,
1154 [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
1155 )?;
1156 }
1157
1158 if num_object_versions_to_retain == 0 {
1160 let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
1161 println!("Adding tombstone object {:?}", tombstone_key);
1162 batch.insert_batch(
1163 &db.objects,
1164 [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
1165 )?;
1166 tombstones.push(tombstone_key);
1167 }
1168 }
1169 batch.write().unwrap();
1170 assert_eq!(
1171 to_keep.len() as u64,
1172 std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
1173 * total_unique_object_ids as u64
1174 );
1175 assert_eq!(
1176 tombstones.len() as u64,
1177 if num_object_versions_to_retain == 0 {
1178 total_unique_object_ids as u64
1179 } else {
1180 0
1181 }
1182 );
1183 Ok((to_keep, to_delete, tombstones))
1184 }
1185
1186 async fn run_pruner(
1187 path: &Path,
1188 num_versions_per_object: u64,
1189 num_object_versions_to_retain: u64,
1190 total_unique_object_ids: u32,
1191 ) -> Vec<ObjectKey> {
1192 let registry = Registry::default();
1193 let metrics = AuthorityStorePruningMetrics::new(®istry);
1194 let to_keep = {
1195 let db = Arc::new(AuthorityPerpetualTables::open(path, None, None));
1196 let (to_keep, to_delete, tombstones) = generate_test_data(
1197 db.clone(),
1198 num_versions_per_object,
1199 num_object_versions_to_retain,
1200 total_unique_object_ids,
1201 )
1202 .unwrap();
1203 let mut effects = TransactionEffects::default();
1204 for object in to_delete {
1205 effects.unsafe_add_deleted_live_object_for_testing((
1206 object.0,
1207 object.1,
1208 ObjectDigest::MIN,
1209 ));
1210 }
1211 for object in tombstones {
1212 effects.unsafe_add_object_tombstone_for_testing((
1213 object.0,
1214 object.1,
1215 ObjectDigest::MIN,
1216 ));
1217 }
1218 AuthorityStorePruner::prune_objects_and_indexes(
1219 vec![effects],
1220 &db,
1221 0,
1222 metrics,
1223 0,
1224 None,
1225 None,
1226 true,
1227 )
1228 .await
1229 .unwrap();
1230 to_keep
1231 };
1232 tokio::time::sleep(Duration::from_secs(3)).await;
1233 to_keep
1234 }
1235
1236 #[cfg(not(tidehunter))]
1238 #[tokio::test]
1239 async fn test_pruning_objects() {
1240 let path = tempfile::tempdir().unwrap().keep();
1241 let to_keep = run_pruner(&path, 3, 2, 1000).await;
1242 assert_eq!(
1243 HashSet::from_iter(to_keep),
1244 get_keys_after_pruning(&path).unwrap()
1245 );
1246 run_pruner(&tempfile::tempdir().unwrap().keep(), 3, 2, 1000).await;
1247 }
1248
1249 #[cfg(not(tidehunter))]
1251 #[tokio::test]
1252 async fn test_pruning_tombstones() {
1253 let path = tempfile::tempdir().unwrap().keep();
1254 let to_keep = run_pruner(&path, 0, 0, 1000).await;
1255 assert_eq!(to_keep.len(), 0);
1256 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1257
1258 let path = tempfile::tempdir().unwrap().keep();
1259 let to_keep = run_pruner(&path, 3, 0, 1000).await;
1260 assert_eq!(to_keep.len(), 0);
1261 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1262 }
1263
1264 #[cfg(not(target_env = "msvc"))]
1265 #[tokio::test]
1266 async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1267 let primary_path = tempfile::tempdir()?.keep();
1268 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None, None));
1269 let total_unique_object_ids = 10_000;
1270 let num_versions_per_object = 10;
1271 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1272 let mut to_delete = vec![];
1273 for id in ids {
1274 for i in (0..num_versions_per_object).rev() {
1275 if i < num_versions_per_object - 2 {
1276 to_delete.push((id, SequenceNumber::from(i)));
1277 }
1278 let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1279 perpetual_db
1280 .objects
1281 .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1282 }
1283 }
1284
1285 fn get_sst_size(path: &Path) -> u64 {
1286 let mut size = 0;
1287 for entry in std::fs::read_dir(path).unwrap() {
1288 let entry = entry.unwrap();
1289 let path = entry.path();
1290 if let Some(ext) = path.extension() {
1291 if ext != "sst" {
1292 continue;
1293 }
1294 size += std::fs::metadata(path).unwrap().len();
1295 }
1296 }
1297 size
1298 }
1299
1300 let db_path = primary_path.clone().join("perpetual");
1301 let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1302 let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1303
1304 perpetual_db.objects.compact_range(&start, &end)?;
1305 let before_compaction_size = get_sst_size(&db_path);
1306
1307 let mut effects = TransactionEffects::default();
1308 for object in to_delete {
1309 effects.unsafe_add_deleted_live_object_for_testing((
1310 object.0,
1311 object.1,
1312 ObjectDigest::MIN,
1313 ));
1314 }
1315 let registry = Registry::default();
1316 let metrics = AuthorityStorePruningMetrics::new(®istry);
1317 let total_pruned = AuthorityStorePruner::prune_objects_and_indexes(
1318 vec![effects],
1319 &perpetual_db,
1320 0,
1321 metrics,
1322 0,
1323 None,
1324 None,
1325 true,
1326 )
1327 .await;
1328 info!("Total pruned keys = {:?}", total_pruned);
1329
1330 perpetual_db.objects.compact_range(&start, &end)?;
1331 let after_compaction_size = get_sst_size(&db_path);
1332
1333 info!(
1334 "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1335 before_compaction_size, after_compaction_size
1336 );
1337 ma::assert_le!(after_compaction_size, before_compaction_size);
1338 Ok(())
1339 }
1340}