sui_core/authority/
authority_store_pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::authority_store_tables::AuthorityPerpetualTables;
5use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
6use crate::jsonrpc_index::IndexStore;
7use crate::rpc_index::RpcIndexStore;
8use anyhow::anyhow;
9use mysten_metrics::{monitored_scope, spawn_monitored_task};
10use once_cell::sync::Lazy;
11use prometheus::{
12    IntCounter, IntGauge, Registry, register_int_counter_with_registry,
13    register_int_gauge_with_registry,
14};
15#[cfg(tidehunter)]
16use serde::de::DeserializeOwned;
17use std::cmp::{max, min};
18use std::collections::{BTreeSet, HashMap};
19use std::sync::Mutex;
20use std::sync::atomic::AtomicU64;
21use std::time::{SystemTime, UNIX_EPOCH};
22use std::{sync::Arc, time::Duration};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_types::committee::EpochId;
25use sui_types::effects::TransactionEffects;
26use sui_types::effects::TransactionEffectsAPI;
27use sui_types::message_envelope::Message;
28use sui_types::messages_checkpoint::{
29    CheckpointContents, CheckpointDigest, CheckpointSequenceNumber,
30};
31use sui_types::{
32    base_types::{ObjectID, SequenceNumber, TransactionDigest, VersionNumber},
33    storage::ObjectKey,
34};
35use tokio::sync::oneshot::{self, Sender};
36use tokio::time::Instant;
37use tracing::{debug, error, info, warn};
38use typed_store::rocksdb::LiveFile;
39use typed_store::{Map, TypedStoreError};
40
41static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
42    [
43        "objects",
44        "effects",
45        "transactions",
46        "events",
47        "executed_effects",
48        "executed_transactions_to_checkpoint",
49    ]
50    .into_iter()
51    .map(|cf| cf.to_string())
52    .collect()
53});
54pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
55pub struct AuthorityStorePruner {
56    _objects_pruner_cancel_handle: oneshot::Sender<()>,
57}
58
59#[derive(Default)]
60pub struct PrunerWatermarks {
61    pub epoch_id: Arc<AtomicU64>,
62    pub checkpoint_id: Arc<AtomicU64>,
63}
64
65static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
66
67pub struct AuthorityStorePruningMetrics {
68    pub last_pruned_checkpoint: IntGauge,
69    pub num_pruned_objects: IntCounter,
70    pub num_pruned_tombstones: IntCounter,
71    pub last_pruned_effects_checkpoint: IntGauge,
72    pub last_pruned_indexes_transaction: IntGauge,
73    pub num_epochs_to_retain_for_objects: IntGauge,
74    pub num_epochs_to_retain_for_checkpoints: IntGauge,
75}
76
77impl AuthorityStorePruningMetrics {
78    pub fn new(registry: &Registry) -> Arc<Self> {
79        let this = Self {
80            last_pruned_checkpoint: register_int_gauge_with_registry!(
81                "last_pruned_checkpoint",
82                "Last pruned checkpoint",
83                registry
84            )
85            .unwrap(),
86            num_pruned_objects: register_int_counter_with_registry!(
87                "num_pruned_objects",
88                "Number of pruned objects",
89                registry
90            )
91            .unwrap(),
92            num_pruned_tombstones: register_int_counter_with_registry!(
93                "num_pruned_tombstones",
94                "Number of pruned tombstones",
95                registry
96            )
97            .unwrap(),
98            last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
99                "last_pruned_effects_checkpoint",
100                "Last pruned effects checkpoint",
101                registry
102            )
103            .unwrap(),
104            last_pruned_indexes_transaction: register_int_gauge_with_registry!(
105                "last_pruned_indexes_transaction",
106                "Last pruned indexes transaction",
107                registry
108            )
109            .unwrap(),
110            num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
111                "num_epochs_to_retain_for_objects",
112                "Number of epochs to retain for objects",
113                registry
114            )
115            .unwrap(),
116            num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
117                "num_epochs_to_retain_for_checkpoints",
118                "Number of epochs to retain for checkpoints",
119                registry
120            )
121            .unwrap(),
122        };
123        Arc::new(this)
124    }
125
126    pub fn new_for_test() -> Arc<Self> {
127        Self::new(&Registry::new())
128    }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq)]
132pub enum PruningMode {
133    Objects,
134    Checkpoints,
135}
136
137impl AuthorityStorePruner {
138    /// prunes old versions of objects based on transaction effects
139    #[cfg(not(tidehunter))]
140    async fn prune_objects_and_indexes(
141        transaction_effects: Vec<TransactionEffects>,
142        perpetual_db: &Arc<AuthorityPerpetualTables>,
143        checkpoint_number: CheckpointSequenceNumber,
144        metrics: Arc<AuthorityStorePruningMetrics>,
145        checkpoint_content_to_prune: Vec<CheckpointContents>,
146        rpc_index: Option<&RpcIndexStore>,
147        enable_pruning_tombstones: bool,
148    ) -> anyhow::Result<()> {
149        let _scope = monitored_scope("ObjectsLivePruner");
150        let mut wb = perpetual_db.objects.batch();
151
152        // Collect objects keys that need to be deleted from `transaction_effects`.
153        let mut live_object_keys_to_prune = vec![];
154        let mut object_tombstones_to_prune = vec![];
155        for effects in &transaction_effects {
156            for (object_id, seq_number) in effects.modified_at_versions() {
157                live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
158            }
159
160            if enable_pruning_tombstones {
161                for deleted_object_key in effects.all_tombstones() {
162                    object_tombstones_to_prune
163                        .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
164                }
165            }
166        }
167
168        metrics
169            .num_pruned_objects
170            .inc_by(live_object_keys_to_prune.len() as u64);
171        metrics
172            .num_pruned_tombstones
173            .inc_by(object_tombstones_to_prune.len() as u64);
174
175        let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
176        for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
177            updates
178                .entry(object_id)
179                .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
180                .or_insert((seq_number, seq_number));
181        }
182
183        for (object_id, (min_version, max_version)) in updates {
184            debug!(
185                "Pruning object {:?} versions {:?} - {:?}",
186                object_id, min_version, max_version
187            );
188            let start_range = ObjectKey(object_id, min_version);
189            let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
190            wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
191        }
192
193        // When enable_pruning_tombstones is enabled, instead of using range deletes, we need to do a scan of all the keys
194        // for the deleted objects and then do point deletes to delete all the existing keys. This is because to improve read
195        // performance, we set `ignore_range_deletions` on all read options, and using range delete to delete tombstones
196        // may leak object (imagine a tombstone is compacted away, but earlier version is still not). Using point deletes
197        // guarantees that all earlier versions are deleted in the database.
198        if !object_tombstones_to_prune.is_empty() {
199            let mut object_keys_to_delete = vec![];
200            for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
201                for result in perpetual_db.objects.safe_iter_with_bounds(
202                    Some(ObjectKey(object_id, VersionNumber::MIN)),
203                    Some(ObjectKey(object_id, seq_number.next())),
204                ) {
205                    let (object_key, _) = result?;
206                    assert_eq!(object_key.0, object_id);
207                    object_keys_to_delete.push(object_key);
208                }
209            }
210
211            wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
212        }
213
214        if let Some(rpc_index) = rpc_index {
215            rpc_index.prune(checkpoint_number, &checkpoint_content_to_prune)?;
216        }
217
218        perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
219        metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
220
221        wb.write()?;
222        Ok(())
223    }
224
225    #[cfg(tidehunter)]
226    async fn prune_objects_and_indexes(
227        transaction_effects: Vec<TransactionEffects>,
228        perpetual_db: &Arc<AuthorityPerpetualTables>,
229        checkpoint_number: CheckpointSequenceNumber,
230        metrics: Arc<AuthorityStorePruningMetrics>,
231        checkpoint_content_to_prune: Vec<CheckpointContents>,
232        rpc_index: Option<&RpcIndexStore>,
233        _: bool,
234    ) -> anyhow::Result<()> {
235        let _scope = monitored_scope("ObjectsLivePruner");
236        let mut wb = perpetual_db.objects.batch();
237        let mut objects_to_prune = vec![];
238
239        for effects in &transaction_effects {
240            for (object_id, version) in effects
241                .modified_at_versions()
242                .into_iter()
243                .chain(effects.all_tombstones())
244            {
245                debug!("Pruning object {:?} version {:?}", object_id, version);
246                objects_to_prune.push(ObjectKey(object_id, version));
247            }
248        }
249        metrics
250            .num_pruned_objects
251            .inc_by(objects_to_prune.len() as u64);
252        wb.delete_batch(&perpetual_db.objects, &objects_to_prune)?;
253
254        if let Some(rpc_index) = rpc_index {
255            rpc_index.prune(checkpoint_number, &checkpoint_content_to_prune)?;
256        }
257
258        perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
259        metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
260        wb.write()?;
261        Ok(())
262    }
263
264    fn prune_checkpoints(
265        perpetual_db: &Arc<AuthorityPerpetualTables>,
266        checkpoint_db: &Arc<CheckpointStore>,
267        checkpoint_number: CheckpointSequenceNumber,
268        checkpoints_to_prune: Vec<CheckpointDigest>,
269        checkpoint_content_to_prune: Vec<CheckpointContents>,
270        effects_to_prune: &Vec<TransactionEffects>,
271        metrics: Arc<AuthorityStorePruningMetrics>,
272    ) -> anyhow::Result<()> {
273        let _scope = monitored_scope("EffectsLivePruner");
274
275        let mut perpetual_batch = perpetual_db.objects.batch();
276        let transactions: Vec<_> = checkpoint_content_to_prune
277            .iter()
278            .flat_map(|content| content.iter().map(|tx| tx.transaction))
279            .collect();
280
281        perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
282        perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
283        perpetual_batch.delete_batch(
284            &perpetual_db.executed_transactions_to_checkpoint,
285            transactions.iter(),
286        )?;
287
288        let mut effect_digests = vec![];
289        for effects in effects_to_prune {
290            let effects_digest = effects.digest();
291            debug!("Pruning effects {:?}", effects_digest);
292            effect_digests.push(effects_digest);
293
294            if effects.events_digest().is_some() {
295                perpetual_batch
296                    .delete_batch(&perpetual_db.events_2, [effects.transaction_digest()])?;
297            }
298        }
299        perpetual_batch.delete_batch(
300            &perpetual_db.unchanged_loaded_runtime_objects,
301            transactions.iter(),
302        )?;
303        perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
304
305        let mut checkpoints_batch = checkpoint_db.tables.certified_checkpoints.batch();
306
307        let checkpoint_content_digests =
308            checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
309        checkpoints_batch.delete_batch(
310            &checkpoint_db.tables.checkpoint_content,
311            checkpoint_content_digests.clone(),
312        )?;
313        checkpoints_batch.delete_batch(
314            &checkpoint_db.tables.checkpoint_sequence_by_contents_digest,
315            checkpoint_content_digests,
316        )?;
317
318        checkpoints_batch.delete_batch(
319            &checkpoint_db.tables.checkpoint_by_digest,
320            checkpoints_to_prune,
321        )?;
322
323        checkpoints_batch.insert_batch(
324            &checkpoint_db.tables.watermarks,
325            [(
326                &CheckpointWatermark::HighestPruned,
327                &(checkpoint_number, CheckpointDigest::random()),
328            )],
329        )?;
330
331        perpetual_batch.write()?;
332        checkpoints_batch.write()?;
333        metrics
334            .last_pruned_effects_checkpoint
335            .set(checkpoint_number as i64);
336
337        Ok(())
338    }
339
340    /// Prunes old data based on effects from all checkpoints from epochs eligible for pruning
341    pub async fn prune_objects_for_eligible_epochs(
342        perpetual_db: &Arc<AuthorityPerpetualTables>,
343        checkpoint_store: &Arc<CheckpointStore>,
344        rpc_index: Option<&RpcIndexStore>,
345        config: AuthorityStorePruningConfig,
346        metrics: Arc<AuthorityStorePruningMetrics>,
347        epoch_duration_ms: u64,
348    ) -> anyhow::Result<()> {
349        let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
350        let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
351            .get_highest_executed_checkpoint()?
352            .map(|c| (*c.sequence_number(), c.epoch))
353            .unwrap_or_default();
354        let pruned_checkpoint_number = perpetual_db
355            .get_highest_pruned_checkpoint()?
356            .unwrap_or_default();
357        if config.smooth && config.num_epochs_to_retain > 0 {
358            max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
359                checkpoint_store,
360                max_eligible_checkpoint_number,
361                pruned_checkpoint_number,
362                epoch_id,
363                epoch_duration_ms,
364                config.num_epochs_to_retain,
365            )?;
366        }
367        Self::prune_for_eligible_epochs(
368            perpetual_db,
369            checkpoint_store,
370            rpc_index,
371            PruningMode::Objects,
372            config.num_epochs_to_retain,
373            pruned_checkpoint_number,
374            max_eligible_checkpoint_number,
375            config,
376            metrics.clone(),
377        )
378        .await
379    }
380
381    pub async fn prune_checkpoints_for_eligible_epochs(
382        perpetual_db: &Arc<AuthorityPerpetualTables>,
383        checkpoint_store: &Arc<CheckpointStore>,
384        rpc_index: Option<&RpcIndexStore>,
385        config: AuthorityStorePruningConfig,
386        metrics: Arc<AuthorityStorePruningMetrics>,
387        epoch_duration_ms: u64,
388        pruner_watermarks: &Arc<PrunerWatermarks>,
389    ) -> anyhow::Result<()> {
390        let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
391        let pruned_checkpoint_number = checkpoint_store
392            .get_highest_pruned_checkpoint_seq_number()?
393            .unwrap_or(0);
394        let (mut max_eligible_checkpoint, epoch_id) = checkpoint_store
395            .get_highest_executed_checkpoint()?
396            .map(|c| (*c.sequence_number(), c.epoch))
397            .unwrap_or_default();
398        if config.num_epochs_to_retain != u64::MAX {
399            max_eligible_checkpoint = min(
400                max_eligible_checkpoint,
401                perpetual_db
402                    .get_highest_pruned_checkpoint()?
403                    .unwrap_or_default(),
404            );
405        }
406        if config.smooth
407            && let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints
408        {
409            max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
410                checkpoint_store,
411                max_eligible_checkpoint,
412                pruned_checkpoint_number,
413                epoch_id,
414                epoch_duration_ms,
415                num_epochs_to_retain,
416            )?;
417        }
418        debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
419        Self::prune_for_eligible_epochs(
420            perpetual_db,
421            checkpoint_store,
422            rpc_index,
423            PruningMode::Checkpoints,
424            config
425                .num_epochs_to_retain_for_checkpoints()
426                .ok_or_else(|| anyhow!("config value not set"))?,
427            pruned_checkpoint_number,
428            max_eligible_checkpoint,
429            config.clone(),
430            metrics.clone(),
431        )
432        .await?;
433
434        if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
435            Self::update_pruning_watermarks(
436                perpetual_db,
437                checkpoint_store,
438                num_epochs_to_retain,
439                pruner_watermarks,
440            )?;
441        }
442        Ok(())
443    }
444
445    /// Prunes old object versions based on effects from all checkpoints from epochs eligible for pruning
446    pub async fn prune_for_eligible_epochs(
447        perpetual_db: &Arc<AuthorityPerpetualTables>,
448        checkpoint_store: &Arc<CheckpointStore>,
449        rpc_index: Option<&RpcIndexStore>,
450        mode: PruningMode,
451        num_epochs_to_retain: u64,
452        starting_checkpoint_number: CheckpointSequenceNumber,
453        max_eligible_checkpoint: CheckpointSequenceNumber,
454        config: AuthorityStorePruningConfig,
455        metrics: Arc<AuthorityStorePruningMetrics>,
456    ) -> anyhow::Result<()> {
457        let _scope = monitored_scope("PruneForEligibleEpochs");
458
459        let mut checkpoint_number = starting_checkpoint_number;
460        let current_epoch = checkpoint_store
461            .get_highest_executed_checkpoint()?
462            .map(|c| c.epoch())
463            .unwrap_or_default();
464
465        let mut checkpoints_to_prune = vec![];
466        let mut checkpoint_content_to_prune = vec![];
467        let mut effects_to_prune = vec![];
468
469        loop {
470            let Some(ckpt) = checkpoint_store
471                .tables
472                .certified_checkpoints
473                .get(&(checkpoint_number + 1))?
474            else {
475                break;
476            };
477            let checkpoint = ckpt.into_inner();
478            // Skipping because  checkpoint's epoch or checkpoint number is too new.
479            // We have to respect the highest executed checkpoint watermark (including the watermark itself)
480            // because there might be parts of the system that still require access to old object versions
481            // (i.e. state accumulator).
482            if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
483                || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
484            {
485                break;
486            }
487            checkpoint_number = *checkpoint.sequence_number();
488
489            let content = checkpoint_store
490                .get_checkpoint_contents(&checkpoint.content_digest)?
491                .ok_or_else(|| {
492                    anyhow::anyhow!(
493                        "checkpoint content data is missing: {}",
494                        checkpoint.sequence_number
495                    )
496                })?;
497            let effects = perpetual_db
498                .effects
499                .multi_get(content.iter().map(|tx| tx.effects))?;
500
501            info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
502            checkpoints_to_prune.push(*checkpoint.digest());
503            checkpoint_content_to_prune.push(content);
504            effects_to_prune.extend(effects.into_iter().flatten());
505
506            if effects_to_prune.len() >= config.max_transactions_in_batch
507                || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
508            {
509                match mode {
510                    PruningMode::Objects => {
511                        Self::prune_objects_and_indexes(
512                            effects_to_prune,
513                            perpetual_db,
514                            checkpoint_number,
515                            metrics.clone(),
516                            checkpoint_content_to_prune,
517                            rpc_index,
518                            !config.killswitch_tombstone_pruning,
519                        )
520                        .await?
521                    }
522                    PruningMode::Checkpoints => Self::prune_checkpoints(
523                        perpetual_db,
524                        checkpoint_store,
525                        checkpoint_number,
526                        checkpoints_to_prune,
527                        checkpoint_content_to_prune,
528                        &effects_to_prune,
529                        metrics.clone(),
530                    )?,
531                };
532                checkpoints_to_prune = vec![];
533                checkpoint_content_to_prune = vec![];
534                effects_to_prune = vec![];
535                // yield back to the tokio runtime. Prevent potential halt of other tasks
536                tokio::task::yield_now().await;
537            }
538        }
539
540        if !checkpoints_to_prune.is_empty() {
541            match mode {
542                PruningMode::Objects => {
543                    Self::prune_objects_and_indexes(
544                        effects_to_prune,
545                        perpetual_db,
546                        checkpoint_number,
547                        metrics.clone(),
548                        checkpoint_content_to_prune,
549                        rpc_index,
550                        !config.killswitch_tombstone_pruning,
551                    )
552                    .await?
553                }
554                PruningMode::Checkpoints => Self::prune_checkpoints(
555                    perpetual_db,
556                    checkpoint_store,
557                    checkpoint_number,
558                    checkpoints_to_prune,
559                    checkpoint_content_to_prune,
560                    &effects_to_prune,
561                    metrics.clone(),
562                )?,
563            };
564        }
565        Ok(())
566    }
567
568    fn prune_indexes(
569        indexes: Option<&IndexStore>,
570        config: &AuthorityStorePruningConfig,
571        epoch_duration_ms: u64,
572        metrics: &AuthorityStorePruningMetrics,
573    ) -> anyhow::Result<()> {
574        if let (Some(mut epochs_to_retain), Some(indexes)) =
575            (config.num_epochs_to_retain_for_indexes, indexes)
576        {
577            if epochs_to_retain < 7 {
578                warn!("num_epochs_to_retain_for_indexes is too low. Reseting it to 7");
579                epochs_to_retain = 7;
580            }
581            let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
582            if let Some(cut_time_ms) =
583                u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
584            {
585                let transaction_id = indexes.prune(cut_time_ms)?;
586                metrics
587                    .last_pruned_indexes_transaction
588                    .set(transaction_id as i64);
589            }
590        }
591        Ok(())
592    }
593
594    async fn prune_executed_tx_digests(
595        perpetual_db: &Arc<AuthorityPerpetualTables>,
596        checkpoint_store: &Arc<CheckpointStore>,
597    ) -> anyhow::Result<()> {
598        let current_epoch = checkpoint_store
599            .get_highest_executed_checkpoint()?
600            .map(|c| c.epoch)
601            .unwrap_or_default();
602
603        if current_epoch < 2 {
604            return Ok(());
605        }
606
607        let target_epoch = current_epoch - 1;
608
609        let start_key = (0u64, TransactionDigest::ZERO);
610        let end_key = (target_epoch, TransactionDigest::ZERO);
611
612        info!(
613            "Pruning executed_transaction_digests for epochs < {} (current epoch: {})",
614            target_epoch, current_epoch
615        );
616
617        let mut batch = perpetual_db.executed_transaction_digests.batch();
618        batch.schedule_delete_range(
619            &perpetual_db.executed_transaction_digests,
620            &start_key,
621            // `to` is non-inclusive so target_epoch and all later epochs are preserved
622            &end_key,
623        )?;
624        batch.write()?;
625        Ok(())
626    }
627
628    #[cfg(tidehunter)]
629    fn prune_executed_tx_digests_th(
630        perpetual_db: &Arc<AuthorityPerpetualTables>,
631        checkpoint_store: &Arc<CheckpointStore>,
632    ) -> anyhow::Result<()> {
633        let current_epoch = checkpoint_store
634            .get_highest_executed_checkpoint()?
635            .map(|c| c.epoch)
636            .unwrap_or_default();
637
638        if current_epoch < 2 {
639            return Ok(());
640        }
641
642        let last_epoch_to_delete = current_epoch - 2;
643        let from_key = (0u64, TransactionDigest::ZERO);
644        let to_key = (last_epoch_to_delete, TransactionDigest::new([0xff; 32]));
645        info!(
646            "Pruning executed_transaction_digests for epochs 0 to {} (current epoch: {})",
647            last_epoch_to_delete, current_epoch
648        );
649        perpetual_db
650            .executed_transaction_digests
651            .drop_cells_in_range(&from_key, &to_key)?;
652        Ok(())
653    }
654
655    fn update_pruning_watermarks(
656        perpetual_db: &Arc<AuthorityPerpetualTables>,
657        checkpoint_store: &Arc<CheckpointStore>,
658        num_epochs_to_retain: u64,
659        pruning_watermark: &Arc<PrunerWatermarks>,
660    ) -> anyhow::Result<bool> {
661        use std::sync::atomic::Ordering;
662        let objects_pruning_checkpoint_id = perpetual_db
663            .get_highest_pruned_checkpoint()?
664            .unwrap_or_default();
665        let objects_pruning_epoch_id = checkpoint_store
666            .get_checkpoint_by_sequence_number(objects_pruning_checkpoint_id)?
667            .map(|chk| chk.epoch)
668            .unwrap_or_default();
669
670        let current_watermark = pruning_watermark.epoch_id.load(Ordering::Relaxed);
671        let current_epoch_id = checkpoint_store
672            .get_highest_executed_checkpoint()?
673            .map(|c| c.epoch)
674            .unwrap_or_default();
675        if current_epoch_id < num_epochs_to_retain {
676            return Ok(false);
677        }
678        let target_epoch_id = current_epoch_id - num_epochs_to_retain;
679        let checkpoint_id =
680            checkpoint_store.get_epoch_last_checkpoint_seq_number(target_epoch_id)?;
681
682        let new_watermark = min(target_epoch_id + 1, objects_pruning_epoch_id);
683        if current_watermark == new_watermark {
684            return Ok(false);
685        }
686        info!("relocation: setting epoch watermark to {}", new_watermark);
687        pruning_watermark
688            .epoch_id
689            .store(new_watermark, Ordering::Relaxed);
690        if let Some(checkpoint_id) = checkpoint_id {
691            let watermark = min(checkpoint_id, objects_pruning_checkpoint_id);
692            info!("relocation: setting checkpoint watermark to {}", watermark);
693            pruning_watermark
694                .checkpoint_id
695                .store(watermark, Ordering::Relaxed);
696        }
697        Ok(true)
698    }
699
700    #[cfg(tidehunter)]
701    fn prune_th(
702        perpetual_db: &Arc<AuthorityPerpetualTables>,
703        checkpoint_store: &Arc<CheckpointStore>,
704        num_epochs_to_retain: u64,
705        pruning_watermark: Arc<PrunerWatermarks>,
706    ) -> anyhow::Result<()> {
707        let watermark_updated = Self::update_pruning_watermarks(
708            perpetual_db,
709            checkpoint_store,
710            num_epochs_to_retain,
711            &pruning_watermark,
712        )?;
713        if !watermark_updated {
714            info!("skip relocation. Watermark hasn't changed");
715            return Ok(());
716        }
717        perpetual_db.objects.db.start_relocation()?;
718        checkpoint_store.tables.watermarks.db.start_relocation()?;
719        Self::prune_executed_tx_digests_th(perpetual_db, checkpoint_store)?;
720        Ok(())
721    }
722
723    fn compact_next_sst_file(
724        perpetual_db: Arc<AuthorityPerpetualTables>,
725        delay_days: usize,
726        last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
727    ) -> anyhow::Result<Option<LiveFile>> {
728        let db_path = perpetual_db.objects.db.path_for_pruning();
729        let mut state = last_processed
730            .lock()
731            .expect("failed to obtain a lock for last processed SST files");
732        let mut sst_file_for_compaction: Option<LiveFile> = None;
733        let time_threshold =
734            SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
735        for sst_file in perpetual_db.objects.db.live_files()? {
736            let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
737            let last_modified = std::fs::metadata(file_path)?.modified()?;
738            if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
739                || sst_file.level < 1
740                || sst_file.start_key.is_none()
741                || sst_file.end_key.is_none()
742                || last_modified > time_threshold
743                || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
744            {
745                continue;
746            }
747            if let Some(candidate) = &sst_file_for_compaction
748                && candidate.size > sst_file.size
749            {
750                continue;
751            }
752            sst_file_for_compaction = Some(sst_file);
753        }
754        let Some(sst_file) = sst_file_for_compaction else {
755            return Ok(None);
756        };
757        info!(
758            "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
759            sst_file.name, sst_file.size, sst_file.level
760        );
761        perpetual_db.objects.compact_range_raw(
762            &sst_file.column_family_name,
763            sst_file.start_key.clone().unwrap(),
764            sst_file.end_key.clone().unwrap(),
765        )?;
766        state.insert(sst_file.name.clone(), SystemTime::now());
767        Ok(Some(sst_file))
768    }
769
770    fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
771        min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
772    }
773
774    fn smoothed_max_eligible_checkpoint_number(
775        checkpoint_store: &Arc<CheckpointStore>,
776        mut max_eligible_checkpoint: CheckpointSequenceNumber,
777        pruned_checkpoint: CheckpointSequenceNumber,
778        epoch_id: EpochId,
779        epoch_duration_ms: u64,
780        num_epochs_to_retain: u64,
781    ) -> anyhow::Result<CheckpointSequenceNumber> {
782        if epoch_id < num_epochs_to_retain {
783            return Ok(0);
784        }
785        let last_checkpoint_in_epoch = checkpoint_store
786            .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
787            .map(|checkpoint| checkpoint.sequence_number)
788            .unwrap_or_default();
789        max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
790        if max_eligible_checkpoint == 0 {
791            return Ok(max_eligible_checkpoint);
792        }
793        let num_intervals = epoch_duration_ms
794            .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
795            .unwrap_or(1);
796        let delta = max_eligible_checkpoint
797            .checked_sub(pruned_checkpoint)
798            .unwrap_or_default()
799            .checked_div(num_intervals)
800            .unwrap_or(1);
801        Ok(pruned_checkpoint + delta)
802    }
803
804    fn setup_pruning(
805        config: AuthorityStorePruningConfig,
806        epoch_duration_ms: u64,
807        perpetual_db: Arc<AuthorityPerpetualTables>,
808        checkpoint_store: Arc<CheckpointStore>,
809        rpc_index: Option<Arc<RpcIndexStore>>,
810        jsonrpc_index: Option<Arc<IndexStore>>,
811        metrics: Arc<AuthorityStorePruningMetrics>,
812        pruner_watermarks: Arc<PrunerWatermarks>,
813    ) -> Sender<()> {
814        let (sender, mut recv) = tokio::sync::oneshot::channel();
815        debug!(
816            "Starting object pruning service with num_epochs_to_retain={}",
817            config.num_epochs_to_retain
818        );
819
820        let tick_duration =
821            Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
822        let pruning_initial_delay = if cfg!(msim) {
823            Duration::from_millis(1)
824        } else {
825            Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
826        };
827        let mut objects_prune_interval =
828            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
829        let mut checkpoints_prune_interval =
830            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
831
832        metrics
833            .num_epochs_to_retain_for_objects
834            .set(config.num_epochs_to_retain as i64);
835        metrics.num_epochs_to_retain_for_checkpoints.set(
836            config
837                .num_epochs_to_retain_for_checkpoints
838                .unwrap_or_default() as i64,
839        );
840
841        #[cfg(tidehunter)]
842        {
843            if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
844                let prune_objects = config.num_epochs_to_retain != u64::MAX;
845                tokio::task::spawn(async move {
846                    loop {
847                        tokio::select! {
848                            _ = objects_prune_interval.tick(), if prune_objects => {
849                                if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
850                                    error!("Failed to prune objects: {:?}", err);
851                                }
852                            },
853                            _ = checkpoints_prune_interval.tick() => {
854                                if let Err(err) = Self::prune_th(&perpetual_db, &checkpoint_store, num_epochs_to_retain, pruner_watermarks.clone()) {
855                                    error!("Failed to prune checkpoints: {:?}", err);
856                                }
857                            },
858                            _ = &mut recv => break,
859                        }
860                    }
861                });
862            }
863        }
864        #[cfg(not(tidehunter))]
865        {
866            let mut indexes_prune_interval =
867                tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
868
869            let perpetual_db_for_compaction = perpetual_db.clone();
870            if let Some(delay_days) = config.periodic_compaction_threshold_days {
871                spawn_monitored_task!(async move {
872                    let last_processed = Arc::new(Mutex::new(HashMap::new()));
873                    loop {
874                        let db = perpetual_db_for_compaction.clone();
875                        let state = Arc::clone(&last_processed);
876                        let result = tokio::task::spawn_blocking(move || {
877                            Self::compact_next_sst_file(db, delay_days, state)
878                        })
879                        .await;
880                        let mut sleep_interval_secs = 1;
881                        match result {
882                            Err(err) => error!("Failed to compact sst file: {:?}", err),
883                            Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
884                            Ok(Ok(None)) => {
885                                sleep_interval_secs = 3600;
886                            }
887                            _ => {}
888                        }
889                        tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
890                    }
891                });
892            }
893            tokio::task::spawn(async move {
894                loop {
895                    tokio::select! {
896                        _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
897                            if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
898                                error!("Failed to prune objects: {:?}", err);
899                            }
900                            if let Err(err) = Self::prune_executed_tx_digests(&perpetual_db, &checkpoint_store).await {
901                                error!("Failed to prune executed_tx_digests: {:?}", err);
902                            }
903                        },
904                        _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
905                            if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms, &pruner_watermarks).await {
906                                error!("Failed to prune checkpoints: {:?}", err);
907                            }
908                        },
909                        _ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
910                            if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
911                                error!("Failed to prune indexes: {:?}", err);
912                            }
913                        }
914                        _ = &mut recv => break,
915                    }
916                }
917            });
918        }
919        sender
920    }
921
922    pub fn new(
923        perpetual_db: Arc<AuthorityPerpetualTables>,
924        checkpoint_store: Arc<CheckpointStore>,
925        rpc_index: Option<Arc<RpcIndexStore>>,
926        jsonrpc_index: Option<Arc<IndexStore>>,
927        mut pruning_config: AuthorityStorePruningConfig,
928        is_validator: bool,
929        epoch_duration_ms: u64,
930        registry: &Registry,
931        pruner_watermarks: Arc<PrunerWatermarks>, // used by tidehunter relocation filters
932    ) -> Self {
933        // On tidehunter validators the per-keyspace `objects_compactor`
934        // (see `AuthorityPerpetualTables::open`) already retains only the latest
935        // version per ObjectID during compaction, so running the object pruner
936        // would duplicate that work. Force-disable it regardless of the configured
937        // value.
938        #[cfg(tidehunter)]
939        if is_validator && pruning_config.num_epochs_to_retain != u64::MAX {
940            info!(
941                "Tidehunter validator: disabling object pruner (was num_epochs_to_retain={}). The objects compactor performs equivalent compaction.",
942                pruning_config.num_epochs_to_retain
943            );
944            pruning_config.num_epochs_to_retain = u64::MAX;
945        }
946
947        if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
948        {
949            warn!(
950                "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
951                pruning_config.num_epochs_to_retain
952            );
953            if is_validator {
954                warn!("Resetting to aggressive pruner.");
955                pruning_config.num_epochs_to_retain = 0;
956            } else {
957                warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
958            }
959        }
960        AuthorityStorePruner {
961            _objects_pruner_cancel_handle: Self::setup_pruning(
962                pruning_config,
963                epoch_duration_ms,
964                perpetual_db,
965                checkpoint_store,
966                rpc_index,
967                jsonrpc_index,
968                AuthorityStorePruningMetrics::new(registry),
969                pruner_watermarks,
970            ),
971        }
972    }
973
974    pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
975        perpetual_db.objects.compact_range(
976            &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
977            &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
978        )
979    }
980}
981
982#[cfg(tidehunter)]
983pub(crate) fn apply_relocation_filter<T: DeserializeOwned>(
984    config: typed_store::tidehunter_util::KeySpaceConfig,
985    pruner_watermark: Arc<AtomicU64>,
986    extractor: impl Fn(T) -> u64 + Send + Sync + 'static,
987    by_key: bool,
988) -> typed_store::tidehunter_util::KeySpaceConfig {
989    use bincode::Options;
990    use std::sync::atomic::Ordering;
991    use typed_store::tidehunter_util::Decision;
992    config.with_relocation_filter(move |key, value| {
993        let data = if by_key {
994            bincode::DefaultOptions::new()
995                .with_big_endian()
996                .with_fixint_encoding()
997                .deserialize(&key)
998                .expect("relocation filter deserialization error")
999        } else {
1000            bcs::from_bytes(&value).expect("relocation filter deserialization error")
1001        };
1002        if extractor(data) < pruner_watermark.load(Ordering::Relaxed) {
1003            Decision::Remove
1004        } else {
1005            Decision::StopRelocation
1006        }
1007    })
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012    use more_asserts as ma;
1013    use std::path::Path;
1014    use std::time::Duration;
1015    use std::{collections::HashSet, sync::Arc};
1016    use tracing::log::info;
1017
1018    use crate::authority::authority_store_pruner::AuthorityStorePruningMetrics;
1019    use crate::authority::authority_store_tables::AuthorityPerpetualTables;
1020    use crate::authority::authority_store_types::{
1021        StoreObject, StoreObjectWrapper, get_store_object,
1022    };
1023    use prometheus::Registry;
1024    use sui_types::base_types::ObjectDigest;
1025    use sui_types::effects::TransactionEffects;
1026    use sui_types::effects::TransactionEffectsAPI;
1027    use sui_types::{
1028        base_types::{ObjectID, SequenceNumber},
1029        object::Object,
1030        storage::ObjectKey,
1031    };
1032    use typed_store::Map;
1033    use typed_store::rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options};
1034
1035    use super::AuthorityStorePruner;
1036
1037    fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
1038        let perpetual_db_path = path.join(Path::new("perpetual"));
1039        let cf_names = AuthorityPerpetualTables::describe_tables();
1040        let cfs: Vec<_> = cf_names
1041            .keys()
1042            .map(|x| (x.as_str(), default_db_options().options))
1043            .collect();
1044        let perpetual_db = typed_store::rocks::open_cf_opts(
1045            perpetual_db_path,
1046            None,
1047            MetricConf::new("perpetual_pruning"),
1048            &cfs,
1049        );
1050
1051        let mut after_pruning = HashSet::new();
1052        let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
1053            &perpetual_db?,
1054            Some("objects"),
1055            // open the db to bypass default db options which ignores range tombstones
1056            // so we can read the accurate number of retained versions
1057            &ReadWriteOptions::default(),
1058            false,
1059        )?;
1060        let iter = objects.safe_iter();
1061        for item in iter {
1062            after_pruning.insert(item?.0);
1063        }
1064        Ok(after_pruning)
1065    }
1066
1067    type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
1068
1069    fn generate_test_data(
1070        db: Arc<AuthorityPerpetualTables>,
1071        num_versions_per_object: u64,
1072        num_object_versions_to_retain: u64,
1073        total_unique_object_ids: u32,
1074    ) -> Result<GenerateTestDataResult, anyhow::Error> {
1075        assert!(num_versions_per_object >= num_object_versions_to_retain);
1076
1077        let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
1078        let mut batch = db.objects.batch();
1079
1080        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
1081        for id in ids {
1082            for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
1083                let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
1084                if counter < num_object_versions_to_retain.try_into().unwrap() {
1085                    // latest `num_object_versions_to_retain` should not have been pruned
1086                    to_keep.push(object_key);
1087                } else {
1088                    to_delete.push(object_key);
1089                }
1090                let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1091                batch.insert_batch(
1092                    &db.objects,
1093                    [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
1094                )?;
1095            }
1096
1097            // Adding a tombstone for deleted object.
1098            if num_object_versions_to_retain == 0 {
1099                let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
1100                println!("Adding tombstone object {:?}", tombstone_key);
1101                batch.insert_batch(
1102                    &db.objects,
1103                    [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
1104                )?;
1105                tombstones.push(tombstone_key);
1106            }
1107        }
1108        batch.write().unwrap();
1109        assert_eq!(
1110            to_keep.len() as u64,
1111            std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
1112                * total_unique_object_ids as u64
1113        );
1114        assert_eq!(
1115            tombstones.len() as u64,
1116            if num_object_versions_to_retain == 0 {
1117                total_unique_object_ids as u64
1118            } else {
1119                0
1120            }
1121        );
1122        Ok((to_keep, to_delete, tombstones))
1123    }
1124
1125    async fn run_pruner(
1126        path: &Path,
1127        num_versions_per_object: u64,
1128        num_object_versions_to_retain: u64,
1129        total_unique_object_ids: u32,
1130    ) -> Vec<ObjectKey> {
1131        let registry = Registry::default();
1132        let metrics = AuthorityStorePruningMetrics::new(&registry);
1133        let to_keep = {
1134            let db = Arc::new(AuthorityPerpetualTables::open(path, None, None));
1135            let (to_keep, to_delete, tombstones) = generate_test_data(
1136                db.clone(),
1137                num_versions_per_object,
1138                num_object_versions_to_retain,
1139                total_unique_object_ids,
1140            )
1141            .unwrap();
1142            let mut effects = TransactionEffects::default();
1143            for object in to_delete {
1144                effects.unsafe_add_deleted_live_object_for_testing((
1145                    object.0,
1146                    object.1,
1147                    ObjectDigest::MIN,
1148                ));
1149            }
1150            for object in tombstones {
1151                effects.unsafe_add_object_tombstone_for_testing((
1152                    object.0,
1153                    object.1,
1154                    ObjectDigest::MIN,
1155                ));
1156            }
1157            AuthorityStorePruner::prune_objects_and_indexes(
1158                vec![effects],
1159                &db,
1160                0,
1161                metrics,
1162                vec![],
1163                None,
1164                true,
1165            )
1166            .await
1167            .unwrap();
1168            to_keep
1169        };
1170        tokio::time::sleep(Duration::from_secs(3)).await;
1171        to_keep
1172    }
1173
1174    // Tests pruning old version of live objects.
1175    #[cfg(not(tidehunter))]
1176    #[tokio::test]
1177    async fn test_pruning_objects() {
1178        let path = tempfile::tempdir().unwrap().keep();
1179        let to_keep = run_pruner(&path, 3, 2, 1000).await;
1180        assert_eq!(
1181            HashSet::from_iter(to_keep),
1182            get_keys_after_pruning(&path).unwrap()
1183        );
1184        run_pruner(&tempfile::tempdir().unwrap().keep(), 3, 2, 1000).await;
1185    }
1186
1187    // Tests pruning deleted objects (object tombstones).
1188    #[cfg(not(tidehunter))]
1189    #[tokio::test]
1190    async fn test_pruning_tombstones() {
1191        let path = tempfile::tempdir().unwrap().keep();
1192        let to_keep = run_pruner(&path, 0, 0, 1000).await;
1193        assert_eq!(to_keep.len(), 0);
1194        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1195
1196        let path = tempfile::tempdir().unwrap().keep();
1197        let to_keep = run_pruner(&path, 3, 0, 1000).await;
1198        assert_eq!(to_keep.len(), 0);
1199        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1200    }
1201
1202    #[cfg(not(target_env = "msvc"))]
1203    #[tokio::test]
1204    async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1205        let primary_path = tempfile::tempdir()?.keep();
1206        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None, None));
1207        let total_unique_object_ids = 10_000;
1208        let num_versions_per_object = 10;
1209        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1210        let mut to_delete = vec![];
1211        for id in ids {
1212            for i in (0..num_versions_per_object).rev() {
1213                if i < num_versions_per_object - 2 {
1214                    to_delete.push((id, SequenceNumber::from(i)));
1215                }
1216                let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1217                perpetual_db
1218                    .objects
1219                    .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1220            }
1221        }
1222
1223        fn get_sst_size(path: &Path) -> u64 {
1224            let mut size = 0;
1225            for entry in std::fs::read_dir(path).unwrap() {
1226                let entry = entry.unwrap();
1227                let path = entry.path();
1228                if let Some(ext) = path.extension() {
1229                    if ext != "sst" {
1230                        continue;
1231                    }
1232                    size += std::fs::metadata(path).unwrap().len();
1233                }
1234            }
1235            size
1236        }
1237
1238        let db_path = primary_path.clone().join("perpetual");
1239        let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1240        let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1241
1242        perpetual_db.objects.compact_range(&start, &end)?;
1243        let before_compaction_size = get_sst_size(&db_path);
1244
1245        let mut effects = TransactionEffects::default();
1246        for object in to_delete {
1247            effects.unsafe_add_deleted_live_object_for_testing((
1248                object.0,
1249                object.1,
1250                ObjectDigest::MIN,
1251            ));
1252        }
1253        let registry = Registry::default();
1254        let metrics = AuthorityStorePruningMetrics::new(&registry);
1255        let total_pruned = AuthorityStorePruner::prune_objects_and_indexes(
1256            vec![effects],
1257            &perpetual_db,
1258            0,
1259            metrics,
1260            vec![],
1261            None,
1262            true,
1263        )
1264        .await;
1265        info!("Total pruned keys = {:?}", total_pruned);
1266
1267        perpetual_db.objects.compact_range(&start, &end)?;
1268        let after_compaction_size = get_sst_size(&db_path);
1269
1270        info!(
1271            "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1272            before_compaction_size, after_compaction_size
1273        );
1274        ma::assert_le!(after_compaction_size, before_compaction_size);
1275        Ok(())
1276    }
1277}