sui_core/authority/
authority_store_pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::authority_store_tables::AuthorityPerpetualTables;
5use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
6use crate::jsonrpc_index::IndexStore;
7use crate::rpc_index::RpcIndexStore;
8use anyhow::anyhow;
9use mysten_metrics::{monitored_scope, spawn_monitored_task};
10use once_cell::sync::Lazy;
11use prometheus::{
12    IntCounter, IntGauge, Registry, register_int_counter_with_registry,
13    register_int_gauge_with_registry,
14};
15#[cfg(tidehunter)]
16use serde::de::DeserializeOwned;
17use std::cmp::{max, min};
18use std::collections::{BTreeSet, HashMap};
19use std::sync::Mutex;
20use std::sync::atomic::AtomicU64;
21use std::time::{SystemTime, UNIX_EPOCH};
22use std::{sync::Arc, time::Duration};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_types::committee::EpochId;
25use sui_types::effects::TransactionEffects;
26use sui_types::effects::TransactionEffectsAPI;
27use sui_types::message_envelope::Message;
28use sui_types::messages_checkpoint::{
29    CheckpointContents, CheckpointDigest, CheckpointSequenceNumber,
30};
31use sui_types::{
32    base_types::{ObjectID, SequenceNumber, TransactionDigest, VersionNumber},
33    storage::ObjectKey,
34};
35use tokio::sync::oneshot::{self, Sender};
36use tokio::time::Instant;
37use tracing::{debug, error, info, warn};
38use typed_store::rocksdb::LiveFile;
39use typed_store::{Map, TypedStoreError};
40
41static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
42    [
43        "objects",
44        "effects",
45        "transactions",
46        "events",
47        "executed_effects",
48        "executed_transactions_to_checkpoint",
49    ]
50    .into_iter()
51    .map(|cf| cf.to_string())
52    .collect()
53});
54pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
55pub struct AuthorityStorePruner {
56    _objects_pruner_cancel_handle: oneshot::Sender<()>,
57}
58
59#[derive(Default)]
60pub struct PrunerWatermarks {
61    pub epoch_id: Arc<AtomicU64>,
62    pub checkpoint_id: Arc<AtomicU64>,
63}
64
65static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
66
67pub struct AuthorityStorePruningMetrics {
68    pub last_pruned_checkpoint: IntGauge,
69    pub num_pruned_objects: IntCounter,
70    pub num_pruned_tombstones: IntCounter,
71    pub last_pruned_effects_checkpoint: IntGauge,
72    pub last_pruned_indexes_transaction: IntGauge,
73    pub num_epochs_to_retain_for_objects: IntGauge,
74    pub num_epochs_to_retain_for_checkpoints: IntGauge,
75}
76
77impl AuthorityStorePruningMetrics {
78    pub fn new(registry: &Registry) -> Arc<Self> {
79        let this = Self {
80            last_pruned_checkpoint: register_int_gauge_with_registry!(
81                "last_pruned_checkpoint",
82                "Last pruned checkpoint",
83                registry
84            )
85            .unwrap(),
86            num_pruned_objects: register_int_counter_with_registry!(
87                "num_pruned_objects",
88                "Number of pruned objects",
89                registry
90            )
91            .unwrap(),
92            num_pruned_tombstones: register_int_counter_with_registry!(
93                "num_pruned_tombstones",
94                "Number of pruned tombstones",
95                registry
96            )
97            .unwrap(),
98            last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
99                "last_pruned_effects_checkpoint",
100                "Last pruned effects checkpoint",
101                registry
102            )
103            .unwrap(),
104            last_pruned_indexes_transaction: register_int_gauge_with_registry!(
105                "last_pruned_indexes_transaction",
106                "Last pruned indexes transaction",
107                registry
108            )
109            .unwrap(),
110            num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
111                "num_epochs_to_retain_for_objects",
112                "Number of epochs to retain for objects",
113                registry
114            )
115            .unwrap(),
116            num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
117                "num_epochs_to_retain_for_checkpoints",
118                "Number of epochs to retain for checkpoints",
119                registry
120            )
121            .unwrap(),
122        };
123        Arc::new(this)
124    }
125
126    pub fn new_for_test() -> Arc<Self> {
127        Self::new(&Registry::new())
128    }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq)]
132pub enum PruningMode {
133    Objects,
134    Checkpoints,
135}
136
137impl AuthorityStorePruner {
138    /// prunes old versions of objects based on transaction effects
139    #[cfg(not(tidehunter))]
140    async fn prune_objects_and_indexes(
141        transaction_effects: Vec<TransactionEffects>,
142        perpetual_db: &Arc<AuthorityPerpetualTables>,
143        checkpoint_number: CheckpointSequenceNumber,
144        metrics: Arc<AuthorityStorePruningMetrics>,
145        checkpoint_content_to_prune: Vec<CheckpointContents>,
146        rpc_index: Option<&RpcIndexStore>,
147        enable_pruning_tombstones: bool,
148    ) -> anyhow::Result<()> {
149        let _scope = monitored_scope("ObjectsLivePruner");
150        let mut wb = perpetual_db.objects.batch();
151
152        // Collect objects keys that need to be deleted from `transaction_effects`.
153        let mut live_object_keys_to_prune = vec![];
154        let mut object_tombstones_to_prune = vec![];
155        for effects in &transaction_effects {
156            for (object_id, seq_number) in effects.modified_at_versions() {
157                live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
158            }
159
160            if enable_pruning_tombstones {
161                for deleted_object_key in effects.all_tombstones() {
162                    object_tombstones_to_prune
163                        .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
164                }
165            }
166        }
167
168        metrics
169            .num_pruned_objects
170            .inc_by(live_object_keys_to_prune.len() as u64);
171        metrics
172            .num_pruned_tombstones
173            .inc_by(object_tombstones_to_prune.len() as u64);
174
175        let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
176        for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
177            updates
178                .entry(object_id)
179                .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
180                .or_insert((seq_number, seq_number));
181        }
182
183        for (object_id, (min_version, max_version)) in updates {
184            debug!(
185                "Pruning object {:?} versions {:?} - {:?}",
186                object_id, min_version, max_version
187            );
188            let start_range = ObjectKey(object_id, min_version);
189            let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
190            wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
191        }
192
193        // When enable_pruning_tombstones is enabled, instead of using range deletes, we need to do a scan of all the keys
194        // for the deleted objects and then do point deletes to delete all the existing keys. This is because to improve read
195        // performance, we set `ignore_range_deletions` on all read options, and using range delete to delete tombstones
196        // may leak object (imagine a tombstone is compacted away, but earlier version is still not). Using point deletes
197        // guarantees that all earlier versions are deleted in the database.
198        if !object_tombstones_to_prune.is_empty() {
199            let mut object_keys_to_delete = vec![];
200            for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
201                for result in perpetual_db.objects.safe_iter_with_bounds(
202                    Some(ObjectKey(object_id, VersionNumber::MIN)),
203                    Some(ObjectKey(object_id, seq_number.next())),
204                ) {
205                    let (object_key, _) = result?;
206                    assert_eq!(object_key.0, object_id);
207                    object_keys_to_delete.push(object_key);
208                }
209            }
210
211            wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
212        }
213
214        if let Some(rpc_index) = rpc_index {
215            rpc_index.prune(checkpoint_number, &checkpoint_content_to_prune)?;
216        }
217
218        perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
219        metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
220
221        wb.write()?;
222        Ok(())
223    }
224
225    #[cfg(tidehunter)]
226    async fn prune_objects_and_indexes(
227        transaction_effects: Vec<TransactionEffects>,
228        perpetual_db: &Arc<AuthorityPerpetualTables>,
229        checkpoint_number: CheckpointSequenceNumber,
230        metrics: Arc<AuthorityStorePruningMetrics>,
231        checkpoint_content_to_prune: Vec<CheckpointContents>,
232        rpc_index: Option<&RpcIndexStore>,
233        _: bool,
234    ) -> anyhow::Result<()> {
235        let _scope = monitored_scope("ObjectsLivePruner");
236        let mut wb = perpetual_db.objects.batch();
237        let mut objects_to_prune = vec![];
238
239        for effects in &transaction_effects {
240            for (object_id, version) in effects
241                .modified_at_versions()
242                .into_iter()
243                .chain(effects.all_tombstones())
244            {
245                debug!("Pruning object {:?} version {:?}", object_id, version);
246                objects_to_prune.push(ObjectKey(object_id, version));
247            }
248        }
249        metrics
250            .num_pruned_objects
251            .inc_by(objects_to_prune.len() as u64);
252        wb.delete_batch(&perpetual_db.objects, &objects_to_prune)?;
253
254        if let Some(rpc_index) = rpc_index {
255            rpc_index.prune(checkpoint_number, &checkpoint_content_to_prune)?;
256        }
257
258        perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
259        metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
260        wb.write()?;
261        Ok(())
262    }
263
264    fn prune_checkpoints(
265        perpetual_db: &Arc<AuthorityPerpetualTables>,
266        checkpoint_db: &Arc<CheckpointStore>,
267        checkpoint_number: CheckpointSequenceNumber,
268        checkpoints_to_prune: Vec<CheckpointDigest>,
269        checkpoint_content_to_prune: Vec<CheckpointContents>,
270        effects_to_prune: &Vec<TransactionEffects>,
271        metrics: Arc<AuthorityStorePruningMetrics>,
272    ) -> anyhow::Result<()> {
273        let _scope = monitored_scope("EffectsLivePruner");
274
275        let mut perpetual_batch = perpetual_db.objects.batch();
276        let transactions: Vec<_> = checkpoint_content_to_prune
277            .iter()
278            .flat_map(|content| content.iter().map(|tx| tx.transaction))
279            .collect();
280
281        perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
282        perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
283        perpetual_batch.delete_batch(
284            &perpetual_db.executed_transactions_to_checkpoint,
285            transactions.iter(),
286        )?;
287
288        let mut effect_digests = vec![];
289        for effects in effects_to_prune {
290            let effects_digest = effects.digest();
291            debug!("Pruning effects {:?}", effects_digest);
292            effect_digests.push(effects_digest);
293
294            if effects.events_digest().is_some() {
295                perpetual_batch
296                    .delete_batch(&perpetual_db.events_2, [effects.transaction_digest()])?;
297            }
298        }
299        perpetual_batch.delete_batch(
300            &perpetual_db.unchanged_loaded_runtime_objects,
301            transactions.iter(),
302        )?;
303        perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
304
305        let mut checkpoints_batch = checkpoint_db.tables.certified_checkpoints.batch();
306
307        let checkpoint_content_digests =
308            checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
309        checkpoints_batch.delete_batch(
310            &checkpoint_db.tables.checkpoint_content,
311            checkpoint_content_digests.clone(),
312        )?;
313        checkpoints_batch.delete_batch(
314            &checkpoint_db.tables.checkpoint_sequence_by_contents_digest,
315            checkpoint_content_digests,
316        )?;
317
318        checkpoints_batch.delete_batch(
319            &checkpoint_db.tables.checkpoint_by_digest,
320            checkpoints_to_prune,
321        )?;
322
323        checkpoints_batch.insert_batch(
324            &checkpoint_db.tables.watermarks,
325            [(
326                &CheckpointWatermark::HighestPruned,
327                &(checkpoint_number, CheckpointDigest::random()),
328            )],
329        )?;
330
331        perpetual_batch.write()?;
332        checkpoints_batch.write()?;
333        metrics
334            .last_pruned_effects_checkpoint
335            .set(checkpoint_number as i64);
336
337        Ok(())
338    }
339
340    /// Prunes old data based on effects from all checkpoints from epochs eligible for pruning
341    pub async fn prune_objects_for_eligible_epochs(
342        perpetual_db: &Arc<AuthorityPerpetualTables>,
343        checkpoint_store: &Arc<CheckpointStore>,
344        rpc_index: Option<&RpcIndexStore>,
345        config: AuthorityStorePruningConfig,
346        metrics: Arc<AuthorityStorePruningMetrics>,
347        epoch_duration_ms: u64,
348    ) -> anyhow::Result<()> {
349        let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
350        let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
351            .get_highest_executed_checkpoint()?
352            .map(|c| (*c.sequence_number(), c.epoch))
353            .unwrap_or_default();
354        let pruned_checkpoint_number = perpetual_db
355            .get_highest_pruned_checkpoint()?
356            .unwrap_or_default();
357        if config.smooth && config.num_epochs_to_retain > 0 {
358            max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
359                checkpoint_store,
360                max_eligible_checkpoint_number,
361                pruned_checkpoint_number,
362                epoch_id,
363                epoch_duration_ms,
364                config.num_epochs_to_retain,
365            )?;
366        }
367        Self::prune_for_eligible_epochs(
368            perpetual_db,
369            checkpoint_store,
370            rpc_index,
371            PruningMode::Objects,
372            config.num_epochs_to_retain,
373            pruned_checkpoint_number,
374            max_eligible_checkpoint_number,
375            config,
376            metrics.clone(),
377        )
378        .await
379    }
380
381    pub async fn prune_checkpoints_for_eligible_epochs(
382        perpetual_db: &Arc<AuthorityPerpetualTables>,
383        checkpoint_store: &Arc<CheckpointStore>,
384        rpc_index: Option<&RpcIndexStore>,
385        config: AuthorityStorePruningConfig,
386        metrics: Arc<AuthorityStorePruningMetrics>,
387        epoch_duration_ms: u64,
388        pruner_watermarks: &Arc<PrunerWatermarks>,
389    ) -> anyhow::Result<()> {
390        let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
391        let pruned_checkpoint_number = checkpoint_store
392            .get_highest_pruned_checkpoint_seq_number()?
393            .unwrap_or(0);
394        let (mut max_eligible_checkpoint, epoch_id) = checkpoint_store
395            .get_highest_executed_checkpoint()?
396            .map(|c| (*c.sequence_number(), c.epoch))
397            .unwrap_or_default();
398        if config.num_epochs_to_retain != u64::MAX {
399            max_eligible_checkpoint = min(
400                max_eligible_checkpoint,
401                perpetual_db
402                    .get_highest_pruned_checkpoint()?
403                    .unwrap_or_default(),
404            );
405        }
406        if config.smooth
407            && let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints
408        {
409            max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
410                checkpoint_store,
411                max_eligible_checkpoint,
412                pruned_checkpoint_number,
413                epoch_id,
414                epoch_duration_ms,
415                num_epochs_to_retain,
416            )?;
417        }
418        debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
419        Self::prune_for_eligible_epochs(
420            perpetual_db,
421            checkpoint_store,
422            rpc_index,
423            PruningMode::Checkpoints,
424            config
425                .num_epochs_to_retain_for_checkpoints()
426                .ok_or_else(|| anyhow!("config value not set"))?,
427            pruned_checkpoint_number,
428            max_eligible_checkpoint,
429            config.clone(),
430            metrics.clone(),
431        )
432        .await?;
433
434        if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
435            Self::update_pruning_watermarks(
436                perpetual_db,
437                checkpoint_store,
438                num_epochs_to_retain,
439                pruner_watermarks,
440            )?;
441        }
442        Ok(())
443    }
444
445    /// Prunes old object versions based on effects from all checkpoints from epochs eligible for pruning
446    pub async fn prune_for_eligible_epochs(
447        perpetual_db: &Arc<AuthorityPerpetualTables>,
448        checkpoint_store: &Arc<CheckpointStore>,
449        rpc_index: Option<&RpcIndexStore>,
450        mode: PruningMode,
451        num_epochs_to_retain: u64,
452        starting_checkpoint_number: CheckpointSequenceNumber,
453        max_eligible_checkpoint: CheckpointSequenceNumber,
454        config: AuthorityStorePruningConfig,
455        metrics: Arc<AuthorityStorePruningMetrics>,
456    ) -> anyhow::Result<()> {
457        let _scope = monitored_scope("PruneForEligibleEpochs");
458
459        let mut checkpoint_number = starting_checkpoint_number;
460        let current_epoch = checkpoint_store
461            .get_highest_executed_checkpoint()?
462            .map(|c| c.epoch())
463            .unwrap_or_default();
464
465        let mut checkpoints_to_prune = vec![];
466        let mut checkpoint_content_to_prune = vec![];
467        let mut effects_to_prune = vec![];
468
469        loop {
470            let Some(ckpt) = checkpoint_store
471                .tables
472                .certified_checkpoints
473                .get(&(checkpoint_number + 1))?
474            else {
475                break;
476            };
477            let checkpoint = ckpt.into_inner();
478            // Skipping because  checkpoint's epoch or checkpoint number is too new.
479            // We have to respect the highest executed checkpoint watermark (including the watermark itself)
480            // because there might be parts of the system that still require access to old object versions
481            // (i.e. state accumulator).
482            if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
483                || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
484            {
485                break;
486            }
487            checkpoint_number = *checkpoint.sequence_number();
488
489            let content = checkpoint_store
490                .get_checkpoint_contents(&checkpoint.content_digest)?
491                .ok_or_else(|| {
492                    anyhow::anyhow!(
493                        "checkpoint content data is missing: {}",
494                        checkpoint.sequence_number
495                    )
496                })?;
497            let effects = perpetual_db
498                .effects
499                .multi_get(content.iter().map(|tx| tx.effects))?;
500
501            info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
502            checkpoints_to_prune.push(*checkpoint.digest());
503            checkpoint_content_to_prune.push(content);
504            effects_to_prune.extend(effects.into_iter().flatten());
505
506            if effects_to_prune.len() >= config.max_transactions_in_batch
507                || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
508            {
509                match mode {
510                    PruningMode::Objects => {
511                        Self::prune_objects_and_indexes(
512                            effects_to_prune,
513                            perpetual_db,
514                            checkpoint_number,
515                            metrics.clone(),
516                            checkpoint_content_to_prune,
517                            rpc_index,
518                            !config.killswitch_tombstone_pruning,
519                        )
520                        .await?
521                    }
522                    PruningMode::Checkpoints => Self::prune_checkpoints(
523                        perpetual_db,
524                        checkpoint_store,
525                        checkpoint_number,
526                        checkpoints_to_prune,
527                        checkpoint_content_to_prune,
528                        &effects_to_prune,
529                        metrics.clone(),
530                    )?,
531                };
532                checkpoints_to_prune = vec![];
533                checkpoint_content_to_prune = vec![];
534                effects_to_prune = vec![];
535                // yield back to the tokio runtime. Prevent potential halt of other tasks
536                tokio::task::yield_now().await;
537            }
538        }
539
540        if !checkpoints_to_prune.is_empty() {
541            match mode {
542                PruningMode::Objects => {
543                    Self::prune_objects_and_indexes(
544                        effects_to_prune,
545                        perpetual_db,
546                        checkpoint_number,
547                        metrics.clone(),
548                        checkpoint_content_to_prune,
549                        rpc_index,
550                        !config.killswitch_tombstone_pruning,
551                    )
552                    .await?
553                }
554                PruningMode::Checkpoints => Self::prune_checkpoints(
555                    perpetual_db,
556                    checkpoint_store,
557                    checkpoint_number,
558                    checkpoints_to_prune,
559                    checkpoint_content_to_prune,
560                    &effects_to_prune,
561                    metrics.clone(),
562                )?,
563            };
564        }
565        Ok(())
566    }
567
568    fn prune_indexes(
569        indexes: Option<&IndexStore>,
570        config: &AuthorityStorePruningConfig,
571        epoch_duration_ms: u64,
572        metrics: &AuthorityStorePruningMetrics,
573    ) -> anyhow::Result<()> {
574        if let (Some(mut epochs_to_retain), Some(indexes)) =
575            (config.num_epochs_to_retain_for_indexes, indexes)
576        {
577            if epochs_to_retain < 7 {
578                warn!("num_epochs_to_retain_for_indexes is too low. Reseting it to 7");
579                epochs_to_retain = 7;
580            }
581            let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
582            if let Some(cut_time_ms) =
583                u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
584            {
585                let transaction_id = indexes.prune(cut_time_ms)?;
586                metrics
587                    .last_pruned_indexes_transaction
588                    .set(transaction_id as i64);
589            }
590        }
591        Ok(())
592    }
593
594    async fn prune_executed_tx_digests(
595        perpetual_db: &Arc<AuthorityPerpetualTables>,
596        checkpoint_store: &Arc<CheckpointStore>,
597    ) -> anyhow::Result<()> {
598        let current_epoch = checkpoint_store
599            .get_highest_executed_checkpoint()?
600            .map(|c| c.epoch)
601            .unwrap_or_default();
602
603        if current_epoch < 2 {
604            return Ok(());
605        }
606
607        let target_epoch = current_epoch - 1;
608
609        let start_key = (0u64, TransactionDigest::ZERO);
610        let end_key = (target_epoch, TransactionDigest::ZERO);
611
612        info!(
613            "Pruning executed_transaction_digests for epochs < {} (current epoch: {})",
614            target_epoch, current_epoch
615        );
616
617        let mut batch = perpetual_db.executed_transaction_digests.batch();
618        batch.schedule_delete_range(
619            &perpetual_db.executed_transaction_digests,
620            &start_key,
621            // `to` is non-inclusive so target_epoch and all later epochs are preserved
622            &end_key,
623        )?;
624        batch.write()?;
625        Ok(())
626    }
627
628    #[cfg(tidehunter)]
629    fn prune_executed_tx_digests_th(
630        perpetual_db: &Arc<AuthorityPerpetualTables>,
631        checkpoint_store: &Arc<CheckpointStore>,
632    ) -> anyhow::Result<()> {
633        let current_epoch = checkpoint_store
634            .get_highest_executed_checkpoint()?
635            .map(|c| c.epoch)
636            .unwrap_or_default();
637
638        if current_epoch < 2 {
639            return Ok(());
640        }
641
642        let last_epoch_to_delete = current_epoch - 2;
643        let from_key = (0u64, TransactionDigest::ZERO);
644        let to_key = (last_epoch_to_delete, TransactionDigest::new([0xff; 32]));
645        info!(
646            "Pruning executed_transaction_digests for epochs 0 to {} (current epoch: {})",
647            last_epoch_to_delete, current_epoch
648        );
649        perpetual_db
650            .executed_transaction_digests
651            .drop_cells_in_range(&from_key, &to_key)?;
652        Ok(())
653    }
654
655    fn update_pruning_watermarks(
656        perpetual_db: &Arc<AuthorityPerpetualTables>,
657        checkpoint_store: &Arc<CheckpointStore>,
658        num_epochs_to_retain: u64,
659        pruning_watermark: &Arc<PrunerWatermarks>,
660    ) -> anyhow::Result<bool> {
661        use std::sync::atomic::Ordering;
662        let objects_pruning_checkpoint_id = perpetual_db
663            .get_highest_pruned_checkpoint()?
664            .unwrap_or_default();
665        let objects_pruning_epoch_id = checkpoint_store
666            .get_checkpoint_by_sequence_number(objects_pruning_checkpoint_id)?
667            .map(|chk| chk.epoch)
668            .unwrap_or_default();
669
670        let current_watermark = pruning_watermark.epoch_id.load(Ordering::Relaxed);
671        let current_epoch_id = checkpoint_store
672            .get_highest_executed_checkpoint()?
673            .map(|c| c.epoch)
674            .unwrap_or_default();
675        if current_epoch_id < num_epochs_to_retain {
676            return Ok(false);
677        }
678        let target_epoch_id = current_epoch_id - num_epochs_to_retain;
679        let checkpoint_id =
680            checkpoint_store.get_epoch_last_checkpoint_seq_number(target_epoch_id)?;
681
682        let new_watermark = min(target_epoch_id + 1, objects_pruning_epoch_id);
683        if current_watermark == new_watermark {
684            return Ok(false);
685        }
686        info!("relocation: setting epoch watermark to {}", new_watermark);
687        pruning_watermark
688            .epoch_id
689            .store(new_watermark, Ordering::Relaxed);
690        if let Some(checkpoint_id) = checkpoint_id {
691            let watermark = min(checkpoint_id, objects_pruning_checkpoint_id);
692            info!("relocation: setting checkpoint watermark to {}", watermark);
693            pruning_watermark
694                .checkpoint_id
695                .store(watermark, Ordering::Relaxed);
696        }
697        Ok(true)
698    }
699
700    #[cfg(tidehunter)]
701    fn prune_th(
702        perpetual_db: &Arc<AuthorityPerpetualTables>,
703        checkpoint_store: &Arc<CheckpointStore>,
704        num_epochs_to_retain: u64,
705        pruning_watermark: Arc<PrunerWatermarks>,
706    ) -> anyhow::Result<()> {
707        let watermark_updated = Self::update_pruning_watermarks(
708            perpetual_db,
709            checkpoint_store,
710            num_epochs_to_retain,
711            &pruning_watermark,
712        )?;
713        if !watermark_updated {
714            info!("skip relocation. Watermark hasn't changed");
715            return Ok(());
716        }
717        perpetual_db.objects.db.start_relocation()?;
718        checkpoint_store.tables.watermarks.db.start_relocation()?;
719        Self::prune_executed_tx_digests_th(perpetual_db, checkpoint_store)?;
720        Ok(())
721    }
722
723    fn compact_next_sst_file(
724        perpetual_db: Arc<AuthorityPerpetualTables>,
725        delay_days: usize,
726        last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
727    ) -> anyhow::Result<Option<LiveFile>> {
728        let db_path = perpetual_db.objects.db.path_for_pruning();
729        let mut state = last_processed
730            .lock()
731            .expect("failed to obtain a lock for last processed SST files");
732        let mut sst_file_for_compaction: Option<LiveFile> = None;
733        let time_threshold =
734            SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
735        for sst_file in perpetual_db.objects.db.live_files()? {
736            let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
737            let last_modified = std::fs::metadata(file_path)?.modified()?;
738            if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
739                || sst_file.level < 1
740                || sst_file.start_key.is_none()
741                || sst_file.end_key.is_none()
742                || last_modified > time_threshold
743                || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
744            {
745                continue;
746            }
747            if let Some(candidate) = &sst_file_for_compaction
748                && candidate.size > sst_file.size
749            {
750                continue;
751            }
752            sst_file_for_compaction = Some(sst_file);
753        }
754        let Some(sst_file) = sst_file_for_compaction else {
755            return Ok(None);
756        };
757        info!(
758            "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
759            sst_file.name, sst_file.size, sst_file.level
760        );
761        perpetual_db.objects.compact_range_raw(
762            &sst_file.column_family_name,
763            sst_file.start_key.clone().unwrap(),
764            sst_file.end_key.clone().unwrap(),
765        )?;
766        state.insert(sst_file.name.clone(), SystemTime::now());
767        Ok(Some(sst_file))
768    }
769
770    fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
771        min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
772    }
773
774    fn smoothed_max_eligible_checkpoint_number(
775        checkpoint_store: &Arc<CheckpointStore>,
776        mut max_eligible_checkpoint: CheckpointSequenceNumber,
777        pruned_checkpoint: CheckpointSequenceNumber,
778        epoch_id: EpochId,
779        epoch_duration_ms: u64,
780        num_epochs_to_retain: u64,
781    ) -> anyhow::Result<CheckpointSequenceNumber> {
782        if epoch_id < num_epochs_to_retain {
783            return Ok(0);
784        }
785        let last_checkpoint_in_epoch = checkpoint_store
786            .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
787            .map(|checkpoint| checkpoint.sequence_number)
788            .unwrap_or_default();
789        max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
790        if max_eligible_checkpoint == 0 {
791            return Ok(max_eligible_checkpoint);
792        }
793        let num_intervals = epoch_duration_ms
794            .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
795            .unwrap_or(1);
796        let delta = max_eligible_checkpoint
797            .checked_sub(pruned_checkpoint)
798            .unwrap_or_default()
799            .checked_div(num_intervals)
800            .unwrap_or(1);
801        Ok(pruned_checkpoint + delta)
802    }
803
804    fn setup_pruning(
805        config: AuthorityStorePruningConfig,
806        epoch_duration_ms: u64,
807        perpetual_db: Arc<AuthorityPerpetualTables>,
808        checkpoint_store: Arc<CheckpointStore>,
809        rpc_index: Option<Arc<RpcIndexStore>>,
810        jsonrpc_index: Option<Arc<IndexStore>>,
811        metrics: Arc<AuthorityStorePruningMetrics>,
812        pruner_watermarks: Arc<PrunerWatermarks>,
813    ) -> Sender<()> {
814        let (sender, mut recv) = tokio::sync::oneshot::channel();
815        debug!(
816            "Starting object pruning service with num_epochs_to_retain={}",
817            config.num_epochs_to_retain
818        );
819
820        let tick_duration =
821            Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
822        let pruning_initial_delay = if cfg!(msim) {
823            Duration::from_millis(1)
824        } else {
825            Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
826        };
827        let mut objects_prune_interval =
828            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
829        let mut checkpoints_prune_interval =
830            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
831
832        metrics
833            .num_epochs_to_retain_for_objects
834            .set(config.num_epochs_to_retain as i64);
835        metrics.num_epochs_to_retain_for_checkpoints.set(
836            config
837                .num_epochs_to_retain_for_checkpoints
838                .unwrap_or_default() as i64,
839        );
840
841        #[cfg(tidehunter)]
842        {
843            if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
844                tokio::task::spawn(async move {
845                    loop {
846                        tokio::select! {
847                            _ = objects_prune_interval.tick() => {
848                                if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
849                                    error!("Failed to prune objects: {:?}", err);
850                                }
851                            },
852                            _ = checkpoints_prune_interval.tick() => {
853                                if let Err(err) = Self::prune_th(&perpetual_db, &checkpoint_store, num_epochs_to_retain, pruner_watermarks.clone()) {
854                                    error!("Failed to prune checkpoints: {:?}", err);
855                                }
856                            },
857                            _ = &mut recv => break,
858                        }
859                    }
860                });
861            }
862        }
863        #[cfg(not(tidehunter))]
864        {
865            let mut indexes_prune_interval =
866                tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
867
868            let perpetual_db_for_compaction = perpetual_db.clone();
869            if let Some(delay_days) = config.periodic_compaction_threshold_days {
870                spawn_monitored_task!(async move {
871                    let last_processed = Arc::new(Mutex::new(HashMap::new()));
872                    loop {
873                        let db = perpetual_db_for_compaction.clone();
874                        let state = Arc::clone(&last_processed);
875                        let result = tokio::task::spawn_blocking(move || {
876                            Self::compact_next_sst_file(db, delay_days, state)
877                        })
878                        .await;
879                        let mut sleep_interval_secs = 1;
880                        match result {
881                            Err(err) => error!("Failed to compact sst file: {:?}", err),
882                            Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
883                            Ok(Ok(None)) => {
884                                sleep_interval_secs = 3600;
885                            }
886                            _ => {}
887                        }
888                        tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
889                    }
890                });
891            }
892            tokio::task::spawn(async move {
893                loop {
894                    tokio::select! {
895                        _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
896                            if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
897                                error!("Failed to prune objects: {:?}", err);
898                            }
899                            if let Err(err) = Self::prune_executed_tx_digests(&perpetual_db, &checkpoint_store).await {
900                                error!("Failed to prune executed_tx_digests: {:?}", err);
901                            }
902                        },
903                        _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
904                            if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms, &pruner_watermarks).await {
905                                error!("Failed to prune checkpoints: {:?}", err);
906                            }
907                        },
908                        _ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
909                            if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
910                                error!("Failed to prune indexes: {:?}", err);
911                            }
912                        }
913                        _ = &mut recv => break,
914                    }
915                }
916            });
917        }
918        sender
919    }
920
921    pub fn new(
922        perpetual_db: Arc<AuthorityPerpetualTables>,
923        checkpoint_store: Arc<CheckpointStore>,
924        rpc_index: Option<Arc<RpcIndexStore>>,
925        jsonrpc_index: Option<Arc<IndexStore>>,
926        mut pruning_config: AuthorityStorePruningConfig,
927        is_validator: bool,
928        epoch_duration_ms: u64,
929        registry: &Registry,
930        pruner_watermarks: Arc<PrunerWatermarks>, // used by tidehunter relocation filters
931    ) -> Self {
932        if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
933        {
934            warn!(
935                "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
936                pruning_config.num_epochs_to_retain
937            );
938            if is_validator {
939                warn!("Resetting to aggressive pruner.");
940                pruning_config.num_epochs_to_retain = 0;
941            } else {
942                warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
943            }
944        }
945        AuthorityStorePruner {
946            _objects_pruner_cancel_handle: Self::setup_pruning(
947                pruning_config,
948                epoch_duration_ms,
949                perpetual_db,
950                checkpoint_store,
951                rpc_index,
952                jsonrpc_index,
953                AuthorityStorePruningMetrics::new(registry),
954                pruner_watermarks,
955            ),
956        }
957    }
958
959    pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
960        perpetual_db.objects.compact_range(
961            &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
962            &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
963        )
964    }
965}
966
967#[cfg(tidehunter)]
968pub(crate) fn apply_relocation_filter<T: DeserializeOwned>(
969    config: typed_store::tidehunter_util::KeySpaceConfig,
970    pruner_watermark: Arc<AtomicU64>,
971    extractor: impl Fn(T) -> u64 + Send + Sync + 'static,
972    by_key: bool,
973) -> typed_store::tidehunter_util::KeySpaceConfig {
974    use bincode::Options;
975    use std::sync::atomic::Ordering;
976    use typed_store::tidehunter_util::Decision;
977    config.with_relocation_filter(move |key, value| {
978        let data = if by_key {
979            bincode::DefaultOptions::new()
980                .with_big_endian()
981                .with_fixint_encoding()
982                .deserialize(&key)
983                .expect("relocation filter deserialization error")
984        } else {
985            bcs::from_bytes(&value).expect("relocation filter deserialization error")
986        };
987        if extractor(data) < pruner_watermark.load(Ordering::Relaxed) {
988            Decision::Remove
989        } else {
990            Decision::StopRelocation
991        }
992    })
993}
994
995#[cfg(test)]
996mod tests {
997    use more_asserts as ma;
998    use std::path::Path;
999    use std::time::Duration;
1000    use std::{collections::HashSet, sync::Arc};
1001    use tracing::log::info;
1002
1003    use crate::authority::authority_store_pruner::AuthorityStorePruningMetrics;
1004    use crate::authority::authority_store_tables::AuthorityPerpetualTables;
1005    use crate::authority::authority_store_types::{
1006        StoreObject, StoreObjectWrapper, get_store_object,
1007    };
1008    use prometheus::Registry;
1009    use sui_types::base_types::ObjectDigest;
1010    use sui_types::effects::TransactionEffects;
1011    use sui_types::effects::TransactionEffectsAPI;
1012    use sui_types::{
1013        base_types::{ObjectID, SequenceNumber},
1014        object::Object,
1015        storage::ObjectKey,
1016    };
1017    use typed_store::Map;
1018    use typed_store::rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options};
1019
1020    use super::AuthorityStorePruner;
1021
1022    fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
1023        let perpetual_db_path = path.join(Path::new("perpetual"));
1024        let cf_names = AuthorityPerpetualTables::describe_tables();
1025        let cfs: Vec<_> = cf_names
1026            .keys()
1027            .map(|x| (x.as_str(), default_db_options().options))
1028            .collect();
1029        let perpetual_db = typed_store::rocks::open_cf_opts(
1030            perpetual_db_path,
1031            None,
1032            MetricConf::new("perpetual_pruning"),
1033            &cfs,
1034        );
1035
1036        let mut after_pruning = HashSet::new();
1037        let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
1038            &perpetual_db?,
1039            Some("objects"),
1040            // open the db to bypass default db options which ignores range tombstones
1041            // so we can read the accurate number of retained versions
1042            &ReadWriteOptions::default(),
1043            false,
1044        )?;
1045        let iter = objects.safe_iter();
1046        for item in iter {
1047            after_pruning.insert(item?.0);
1048        }
1049        Ok(after_pruning)
1050    }
1051
1052    type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
1053
1054    fn generate_test_data(
1055        db: Arc<AuthorityPerpetualTables>,
1056        num_versions_per_object: u64,
1057        num_object_versions_to_retain: u64,
1058        total_unique_object_ids: u32,
1059    ) -> Result<GenerateTestDataResult, anyhow::Error> {
1060        assert!(num_versions_per_object >= num_object_versions_to_retain);
1061
1062        let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
1063        let mut batch = db.objects.batch();
1064
1065        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
1066        for id in ids {
1067            for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
1068                let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
1069                if counter < num_object_versions_to_retain.try_into().unwrap() {
1070                    // latest `num_object_versions_to_retain` should not have been pruned
1071                    to_keep.push(object_key);
1072                } else {
1073                    to_delete.push(object_key);
1074                }
1075                let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1076                batch.insert_batch(
1077                    &db.objects,
1078                    [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
1079                )?;
1080            }
1081
1082            // Adding a tombstone for deleted object.
1083            if num_object_versions_to_retain == 0 {
1084                let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
1085                println!("Adding tombstone object {:?}", tombstone_key);
1086                batch.insert_batch(
1087                    &db.objects,
1088                    [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
1089                )?;
1090                tombstones.push(tombstone_key);
1091            }
1092        }
1093        batch.write().unwrap();
1094        assert_eq!(
1095            to_keep.len() as u64,
1096            std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
1097                * total_unique_object_ids as u64
1098        );
1099        assert_eq!(
1100            tombstones.len() as u64,
1101            if num_object_versions_to_retain == 0 {
1102                total_unique_object_ids as u64
1103            } else {
1104                0
1105            }
1106        );
1107        Ok((to_keep, to_delete, tombstones))
1108    }
1109
1110    async fn run_pruner(
1111        path: &Path,
1112        num_versions_per_object: u64,
1113        num_object_versions_to_retain: u64,
1114        total_unique_object_ids: u32,
1115    ) -> Vec<ObjectKey> {
1116        let registry = Registry::default();
1117        let metrics = AuthorityStorePruningMetrics::new(&registry);
1118        let to_keep = {
1119            let db = Arc::new(AuthorityPerpetualTables::open(path, None, None));
1120            let (to_keep, to_delete, tombstones) = generate_test_data(
1121                db.clone(),
1122                num_versions_per_object,
1123                num_object_versions_to_retain,
1124                total_unique_object_ids,
1125            )
1126            .unwrap();
1127            let mut effects = TransactionEffects::default();
1128            for object in to_delete {
1129                effects.unsafe_add_deleted_live_object_for_testing((
1130                    object.0,
1131                    object.1,
1132                    ObjectDigest::MIN,
1133                ));
1134            }
1135            for object in tombstones {
1136                effects.unsafe_add_object_tombstone_for_testing((
1137                    object.0,
1138                    object.1,
1139                    ObjectDigest::MIN,
1140                ));
1141            }
1142            AuthorityStorePruner::prune_objects_and_indexes(
1143                vec![effects],
1144                &db,
1145                0,
1146                metrics,
1147                vec![],
1148                None,
1149                true,
1150            )
1151            .await
1152            .unwrap();
1153            to_keep
1154        };
1155        tokio::time::sleep(Duration::from_secs(3)).await;
1156        to_keep
1157    }
1158
1159    // Tests pruning old version of live objects.
1160    #[cfg(not(tidehunter))]
1161    #[tokio::test]
1162    async fn test_pruning_objects() {
1163        let path = tempfile::tempdir().unwrap().keep();
1164        let to_keep = run_pruner(&path, 3, 2, 1000).await;
1165        assert_eq!(
1166            HashSet::from_iter(to_keep),
1167            get_keys_after_pruning(&path).unwrap()
1168        );
1169        run_pruner(&tempfile::tempdir().unwrap().keep(), 3, 2, 1000).await;
1170    }
1171
1172    // Tests pruning deleted objects (object tombstones).
1173    #[cfg(not(tidehunter))]
1174    #[tokio::test]
1175    async fn test_pruning_tombstones() {
1176        let path = tempfile::tempdir().unwrap().keep();
1177        let to_keep = run_pruner(&path, 0, 0, 1000).await;
1178        assert_eq!(to_keep.len(), 0);
1179        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1180
1181        let path = tempfile::tempdir().unwrap().keep();
1182        let to_keep = run_pruner(&path, 3, 0, 1000).await;
1183        assert_eq!(to_keep.len(), 0);
1184        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1185    }
1186
1187    #[cfg(not(target_env = "msvc"))]
1188    #[tokio::test]
1189    async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1190        let primary_path = tempfile::tempdir()?.keep();
1191        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None, None));
1192        let total_unique_object_ids = 10_000;
1193        let num_versions_per_object = 10;
1194        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1195        let mut to_delete = vec![];
1196        for id in ids {
1197            for i in (0..num_versions_per_object).rev() {
1198                if i < num_versions_per_object - 2 {
1199                    to_delete.push((id, SequenceNumber::from(i)));
1200                }
1201                let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1202                perpetual_db
1203                    .objects
1204                    .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1205            }
1206        }
1207
1208        fn get_sst_size(path: &Path) -> u64 {
1209            let mut size = 0;
1210            for entry in std::fs::read_dir(path).unwrap() {
1211                let entry = entry.unwrap();
1212                let path = entry.path();
1213                if let Some(ext) = path.extension() {
1214                    if ext != "sst" {
1215                        continue;
1216                    }
1217                    size += std::fs::metadata(path).unwrap().len();
1218                }
1219            }
1220            size
1221        }
1222
1223        let db_path = primary_path.clone().join("perpetual");
1224        let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1225        let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1226
1227        perpetual_db.objects.compact_range(&start, &end)?;
1228        let before_compaction_size = get_sst_size(&db_path);
1229
1230        let mut effects = TransactionEffects::default();
1231        for object in to_delete {
1232            effects.unsafe_add_deleted_live_object_for_testing((
1233                object.0,
1234                object.1,
1235                ObjectDigest::MIN,
1236            ));
1237        }
1238        let registry = Registry::default();
1239        let metrics = AuthorityStorePruningMetrics::new(&registry);
1240        let total_pruned = AuthorityStorePruner::prune_objects_and_indexes(
1241            vec![effects],
1242            &perpetual_db,
1243            0,
1244            metrics,
1245            vec![],
1246            None,
1247            true,
1248        )
1249        .await;
1250        info!("Total pruned keys = {:?}", total_pruned);
1251
1252        perpetual_db.objects.compact_range(&start, &end)?;
1253        let after_compaction_size = get_sst_size(&db_path);
1254
1255        info!(
1256            "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1257            before_compaction_size, after_compaction_size
1258        );
1259        ma::assert_le!(after_compaction_size, before_compaction_size);
1260        Ok(())
1261    }
1262}