sui_core/authority/
authority_store_pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::authority_store_tables::AuthorityPerpetualTables;
5use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
6use crate::jsonrpc_index::IndexStore;
7use crate::rpc_index::RpcIndexStore;
8use anyhow::anyhow;
9use mysten_metrics::{monitored_scope, spawn_monitored_task};
10use once_cell::sync::Lazy;
11use prometheus::{
12    IntCounter, IntGauge, Registry, register_int_counter_with_registry,
13    register_int_gauge_with_registry,
14};
15#[cfg(tidehunter)]
16use serde::de::DeserializeOwned;
17use std::cmp::{max, min};
18use std::collections::{BTreeSet, HashMap};
19use std::sync::Mutex;
20use std::sync::atomic::AtomicU64;
21use std::time::{SystemTime, UNIX_EPOCH};
22use std::{sync::Arc, time::Duration};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_types::committee::EpochId;
25use sui_types::effects::TransactionEffects;
26use sui_types::effects::TransactionEffectsAPI;
27use sui_types::message_envelope::Message;
28use sui_types::messages_checkpoint::{
29    CheckpointContents, CheckpointDigest, CheckpointSequenceNumber,
30};
31use sui_types::{
32    base_types::{ObjectID, SequenceNumber, TransactionDigest, VersionNumber},
33    storage::ObjectKey,
34};
35use tokio::sync::oneshot::{self, Sender};
36use tokio::time::Instant;
37use tracing::{debug, error, info, warn};
38use typed_store::rocksdb::LiveFile;
39use typed_store::{Map, TypedStoreError};
40
41static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
42    [
43        "objects",
44        "effects",
45        "transactions",
46        "events",
47        "executed_effects",
48        "executed_transactions_to_checkpoint",
49    ]
50    .into_iter()
51    .map(|cf| cf.to_string())
52    .collect()
53});
54pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
55pub struct AuthorityStorePruner {
56    _objects_pruner_cancel_handle: oneshot::Sender<()>,
57}
58
59#[derive(Default)]
60pub struct PrunerWatermarks {
61    pub epoch_id: Arc<AtomicU64>,
62    pub checkpoint_id: Arc<AtomicU64>,
63}
64
65static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
66
67pub struct AuthorityStorePruningMetrics {
68    pub last_pruned_checkpoint: IntGauge,
69    pub num_pruned_objects: IntCounter,
70    pub num_pruned_tombstones: IntCounter,
71    pub last_pruned_effects_checkpoint: IntGauge,
72    pub last_pruned_indexes_transaction: IntGauge,
73    pub num_epochs_to_retain_for_objects: IntGauge,
74    pub num_epochs_to_retain_for_checkpoints: IntGauge,
75}
76
77impl AuthorityStorePruningMetrics {
78    pub fn new(registry: &Registry) -> Arc<Self> {
79        let this = Self {
80            last_pruned_checkpoint: register_int_gauge_with_registry!(
81                "last_pruned_checkpoint",
82                "Last pruned checkpoint",
83                registry
84            )
85            .unwrap(),
86            num_pruned_objects: register_int_counter_with_registry!(
87                "num_pruned_objects",
88                "Number of pruned objects",
89                registry
90            )
91            .unwrap(),
92            num_pruned_tombstones: register_int_counter_with_registry!(
93                "num_pruned_tombstones",
94                "Number of pruned tombstones",
95                registry
96            )
97            .unwrap(),
98            last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
99                "last_pruned_effects_checkpoint",
100                "Last pruned effects checkpoint",
101                registry
102            )
103            .unwrap(),
104            last_pruned_indexes_transaction: register_int_gauge_with_registry!(
105                "last_pruned_indexes_transaction",
106                "Last pruned indexes transaction",
107                registry
108            )
109            .unwrap(),
110            num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
111                "num_epochs_to_retain_for_objects",
112                "Number of epochs to retain for objects",
113                registry
114            )
115            .unwrap(),
116            num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
117                "num_epochs_to_retain_for_checkpoints",
118                "Number of epochs to retain for checkpoints",
119                registry
120            )
121            .unwrap(),
122        };
123        Arc::new(this)
124    }
125
126    pub fn new_for_test() -> Arc<Self> {
127        Self::new(&Registry::new())
128    }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq)]
132pub enum PruningMode {
133    Objects,
134    Checkpoints,
135}
136
137impl AuthorityStorePruner {
138    /// prunes old versions of objects based on transaction effects
139    #[cfg(not(tidehunter))]
140    async fn prune_objects_and_indexes(
141        transaction_effects: Vec<TransactionEffects>,
142        perpetual_db: &Arc<AuthorityPerpetualTables>,
143        checkpoint_number: CheckpointSequenceNumber,
144        metrics: Arc<AuthorityStorePruningMetrics>,
145        pruned_tx_seq_exclusive: u64,
146        rpc_index: Option<&RpcIndexStore>,
147        enable_pruning_tombstones: bool,
148    ) -> anyhow::Result<()> {
149        let _scope = monitored_scope("ObjectsLivePruner");
150        let mut wb = perpetual_db.objects.batch();
151
152        // Collect objects keys that need to be deleted from `transaction_effects`.
153        let mut live_object_keys_to_prune = vec![];
154        let mut object_tombstones_to_prune = vec![];
155        for effects in &transaction_effects {
156            for (object_id, seq_number) in effects.modified_at_versions() {
157                live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
158            }
159
160            if enable_pruning_tombstones {
161                for deleted_object_key in effects.all_tombstones() {
162                    object_tombstones_to_prune
163                        .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
164                }
165            }
166        }
167
168        metrics
169            .num_pruned_objects
170            .inc_by(live_object_keys_to_prune.len() as u64);
171        metrics
172            .num_pruned_tombstones
173            .inc_by(object_tombstones_to_prune.len() as u64);
174
175        let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
176        for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
177            updates
178                .entry(object_id)
179                .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
180                .or_insert((seq_number, seq_number));
181        }
182
183        for (object_id, (min_version, max_version)) in updates {
184            debug!(
185                "Pruning object {:?} versions {:?} - {:?}",
186                object_id, min_version, max_version
187            );
188            let start_range = ObjectKey(object_id, min_version);
189            let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
190            wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
191        }
192
193        // When enable_pruning_tombstones is enabled, instead of using range deletes, we need to do a scan of all the keys
194        // for the deleted objects and then do point deletes to delete all the existing keys. This is because to improve read
195        // performance, we set `ignore_range_deletions` on all read options, and using range delete to delete tombstones
196        // may leak object (imagine a tombstone is compacted away, but earlier version is still not). Using point deletes
197        // guarantees that all earlier versions are deleted in the database.
198        if !object_tombstones_to_prune.is_empty() {
199            let mut object_keys_to_delete = vec![];
200            for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
201                for result in perpetual_db.objects.safe_iter_with_bounds(
202                    Some(ObjectKey(object_id, VersionNumber::MIN)),
203                    Some(ObjectKey(object_id, seq_number.next())),
204                ) {
205                    let (object_key, _) = result?;
206                    assert_eq!(object_key.0, object_id);
207                    object_keys_to_delete.push(object_key);
208                }
209            }
210
211            wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
212        }
213
214        perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
215        metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
216
217        wb.write()?;
218
219        if let Some(rpc_index) = rpc_index {
220            rpc_index.prune(checkpoint_number, pruned_tx_seq_exclusive)?;
221        }
222
223        Ok(())
224    }
225
226    #[cfg(tidehunter)]
227    async fn prune_objects_and_indexes(
228        transaction_effects: Vec<TransactionEffects>,
229        perpetual_db: &Arc<AuthorityPerpetualTables>,
230        checkpoint_number: CheckpointSequenceNumber,
231        metrics: Arc<AuthorityStorePruningMetrics>,
232        pruned_tx_seq_exclusive: u64,
233        rpc_index: Option<&RpcIndexStore>,
234        _: bool,
235    ) -> anyhow::Result<()> {
236        let _scope = monitored_scope("ObjectsLivePruner");
237        let mut wb = perpetual_db.objects.batch();
238        let mut objects_to_prune = vec![];
239
240        for effects in &transaction_effects {
241            for (object_id, version) in effects
242                .modified_at_versions()
243                .into_iter()
244                .chain(effects.all_tombstones())
245            {
246                debug!("Pruning object {:?} version {:?}", object_id, version);
247                objects_to_prune.push(ObjectKey(object_id, version));
248            }
249        }
250        metrics
251            .num_pruned_objects
252            .inc_by(objects_to_prune.len() as u64);
253        wb.delete_batch(&perpetual_db.objects, &objects_to_prune)?;
254
255        perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
256        metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
257        wb.write()?;
258
259        if let Some(rpc_index) = rpc_index {
260            rpc_index.prune(checkpoint_number, pruned_tx_seq_exclusive)?;
261        }
262
263        Ok(())
264    }
265
266    fn prune_checkpoints(
267        perpetual_db: &Arc<AuthorityPerpetualTables>,
268        checkpoint_db: &Arc<CheckpointStore>,
269        checkpoint_number: CheckpointSequenceNumber,
270        checkpoints_to_prune: Vec<CheckpointDigest>,
271        checkpoint_content_to_prune: Vec<CheckpointContents>,
272        effects_to_prune: &Vec<TransactionEffects>,
273        metrics: Arc<AuthorityStorePruningMetrics>,
274    ) -> anyhow::Result<()> {
275        let _scope = monitored_scope("EffectsLivePruner");
276
277        let mut perpetual_batch = perpetual_db.objects.batch();
278        let transactions: Vec<_> = checkpoint_content_to_prune
279            .iter()
280            .flat_map(|content| content.iter().map(|tx| tx.transaction))
281            .collect();
282
283        perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
284        perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
285        perpetual_batch.delete_batch(
286            &perpetual_db.executed_transactions_to_checkpoint,
287            transactions.iter(),
288        )?;
289
290        let mut effect_digests = vec![];
291        for effects in effects_to_prune {
292            let effects_digest = effects.digest();
293            debug!("Pruning effects {:?}", effects_digest);
294            effect_digests.push(effects_digest);
295
296            if effects.events_digest().is_some() {
297                perpetual_batch
298                    .delete_batch(&perpetual_db.events_2, [effects.transaction_digest()])?;
299            }
300        }
301        perpetual_batch.delete_batch(
302            &perpetual_db.unchanged_loaded_runtime_objects,
303            transactions.iter(),
304        )?;
305        perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
306
307        let mut checkpoints_batch = checkpoint_db.tables.certified_checkpoints.batch();
308
309        let checkpoint_content_digests =
310            checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
311        checkpoints_batch.delete_batch(
312            &checkpoint_db.tables.checkpoint_content,
313            checkpoint_content_digests.clone(),
314        )?;
315        checkpoints_batch.delete_batch(
316            &checkpoint_db.tables.checkpoint_sequence_by_contents_digest,
317            checkpoint_content_digests,
318        )?;
319
320        checkpoints_batch.delete_batch(
321            &checkpoint_db.tables.checkpoint_by_digest,
322            checkpoints_to_prune,
323        )?;
324
325        checkpoints_batch.insert_batch(
326            &checkpoint_db.tables.watermarks,
327            [(
328                &CheckpointWatermark::HighestPruned,
329                &(checkpoint_number, CheckpointDigest::random()),
330            )],
331        )?;
332
333        perpetual_batch.write()?;
334        checkpoints_batch.write()?;
335        metrics
336            .last_pruned_effects_checkpoint
337            .set(checkpoint_number as i64);
338
339        Ok(())
340    }
341
342    /// Prunes old data based on effects from all checkpoints from epochs eligible for pruning
343    pub async fn prune_objects_for_eligible_epochs(
344        perpetual_db: &Arc<AuthorityPerpetualTables>,
345        checkpoint_store: &Arc<CheckpointStore>,
346        rpc_index: Option<&RpcIndexStore>,
347        config: AuthorityStorePruningConfig,
348        metrics: Arc<AuthorityStorePruningMetrics>,
349        epoch_duration_ms: u64,
350    ) -> anyhow::Result<()> {
351        let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
352        let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
353            .get_highest_executed_checkpoint()?
354            .map(|c| (*c.sequence_number(), c.epoch))
355            .unwrap_or_default();
356        let pruned_checkpoint_number = perpetual_db
357            .get_highest_pruned_checkpoint()?
358            .unwrap_or_default();
359        if config.smooth && config.num_epochs_to_retain > 0 {
360            max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
361                checkpoint_store,
362                max_eligible_checkpoint_number,
363                pruned_checkpoint_number,
364                epoch_id,
365                epoch_duration_ms,
366                config.num_epochs_to_retain,
367            )?;
368        }
369        Self::prune_for_eligible_epochs(
370            perpetual_db,
371            checkpoint_store,
372            rpc_index,
373            PruningMode::Objects,
374            config.num_epochs_to_retain,
375            pruned_checkpoint_number,
376            max_eligible_checkpoint_number,
377            config,
378            metrics.clone(),
379        )
380        .await
381    }
382
383    pub async fn prune_checkpoints_for_eligible_epochs(
384        perpetual_db: &Arc<AuthorityPerpetualTables>,
385        checkpoint_store: &Arc<CheckpointStore>,
386        rpc_index: Option<&RpcIndexStore>,
387        config: AuthorityStorePruningConfig,
388        metrics: Arc<AuthorityStorePruningMetrics>,
389        epoch_duration_ms: u64,
390        pruner_watermarks: &Arc<PrunerWatermarks>,
391    ) -> anyhow::Result<()> {
392        let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
393        let pruned_checkpoint_number = checkpoint_store
394            .get_highest_pruned_checkpoint_seq_number()?
395            .unwrap_or(0);
396        let (mut max_eligible_checkpoint, epoch_id) = checkpoint_store
397            .get_highest_executed_checkpoint()?
398            .map(|c| (*c.sequence_number(), c.epoch))
399            .unwrap_or_default();
400        if config.num_epochs_to_retain != u64::MAX {
401            max_eligible_checkpoint = min(
402                max_eligible_checkpoint,
403                perpetual_db
404                    .get_highest_pruned_checkpoint()?
405                    .unwrap_or_default(),
406            );
407        }
408        if config.smooth
409            && let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints
410        {
411            max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
412                checkpoint_store,
413                max_eligible_checkpoint,
414                pruned_checkpoint_number,
415                epoch_id,
416                epoch_duration_ms,
417                num_epochs_to_retain,
418            )?;
419        }
420        debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
421        Self::prune_for_eligible_epochs(
422            perpetual_db,
423            checkpoint_store,
424            rpc_index,
425            PruningMode::Checkpoints,
426            config
427                .num_epochs_to_retain_for_checkpoints()
428                .ok_or_else(|| anyhow!("config value not set"))?,
429            pruned_checkpoint_number,
430            max_eligible_checkpoint,
431            config.clone(),
432            metrics.clone(),
433        )
434        .await?;
435
436        if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
437            Self::update_pruning_watermarks(
438                perpetual_db,
439                checkpoint_store,
440                num_epochs_to_retain,
441                pruner_watermarks,
442                false,
443            )?;
444        }
445        Ok(())
446    }
447
448    /// Prunes old object versions based on effects from all checkpoints from epochs eligible for pruning
449    pub async fn prune_for_eligible_epochs(
450        perpetual_db: &Arc<AuthorityPerpetualTables>,
451        checkpoint_store: &Arc<CheckpointStore>,
452        rpc_index: Option<&RpcIndexStore>,
453        mode: PruningMode,
454        num_epochs_to_retain: u64,
455        starting_checkpoint_number: CheckpointSequenceNumber,
456        max_eligible_checkpoint: CheckpointSequenceNumber,
457        config: AuthorityStorePruningConfig,
458        metrics: Arc<AuthorityStorePruningMetrics>,
459    ) -> anyhow::Result<()> {
460        let _scope = monitored_scope("PruneForEligibleEpochs");
461
462        let mut checkpoint_number = starting_checkpoint_number;
463        let current_epoch = checkpoint_store
464            .get_highest_executed_checkpoint()?
465            .map(|c| c.epoch())
466            .unwrap_or_default();
467
468        let mut checkpoints_to_prune = vec![];
469        let mut checkpoint_content_to_prune = vec![];
470        let mut effects_to_prune = vec![];
471        // Absolute tx-seq floor (exclusive) after pruning the current
472        // batch — the last-pruned checkpoint's `network_total_transactions`.
473        // `RpcIndexStore::prune` consumes this directly instead of summing
474        // checkpoint content sizes.
475        let mut pruned_tx_seq_exclusive = 0u64;
476
477        loop {
478            let Some(ckpt) = checkpoint_store
479                .tables
480                .certified_checkpoints
481                .get(&(checkpoint_number + 1))?
482            else {
483                break;
484            };
485            let checkpoint = ckpt.into_inner();
486            // Skipping because  checkpoint's epoch or checkpoint number is too new.
487            // We have to respect the highest executed checkpoint watermark (including the watermark itself)
488            // because there might be parts of the system that still require access to old object versions
489            // (i.e. state accumulator).
490            if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
491                || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
492            {
493                break;
494            }
495            checkpoint_number = *checkpoint.sequence_number();
496            pruned_tx_seq_exclusive = checkpoint.network_total_transactions;
497
498            let content = checkpoint_store
499                .get_checkpoint_contents(&checkpoint.content_digest)?
500                .ok_or_else(|| {
501                    anyhow::anyhow!(
502                        "checkpoint content data is missing: {}",
503                        checkpoint.sequence_number
504                    )
505                })?;
506            let effects = perpetual_db
507                .effects
508                .multi_get(content.iter().map(|tx| tx.effects))?;
509
510            info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
511            checkpoints_to_prune.push(*checkpoint.digest());
512            checkpoint_content_to_prune.push(content);
513            effects_to_prune.extend(effects.into_iter().flatten());
514
515            if effects_to_prune.len() >= config.max_transactions_in_batch
516                || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
517            {
518                match mode {
519                    PruningMode::Objects => {
520                        Self::prune_objects_and_indexes(
521                            effects_to_prune,
522                            perpetual_db,
523                            checkpoint_number,
524                            metrics.clone(),
525                            pruned_tx_seq_exclusive,
526                            rpc_index,
527                            !config.killswitch_tombstone_pruning,
528                        )
529                        .await?
530                    }
531                    PruningMode::Checkpoints => Self::prune_checkpoints(
532                        perpetual_db,
533                        checkpoint_store,
534                        checkpoint_number,
535                        checkpoints_to_prune,
536                        checkpoint_content_to_prune,
537                        &effects_to_prune,
538                        metrics.clone(),
539                    )?,
540                };
541                checkpoints_to_prune = vec![];
542                checkpoint_content_to_prune = vec![];
543                effects_to_prune = vec![];
544                // yield back to the tokio runtime. Prevent potential halt of other tasks
545                tokio::task::yield_now().await;
546            }
547        }
548
549        if !checkpoints_to_prune.is_empty() {
550            match mode {
551                PruningMode::Objects => {
552                    Self::prune_objects_and_indexes(
553                        effects_to_prune,
554                        perpetual_db,
555                        checkpoint_number,
556                        metrics.clone(),
557                        pruned_tx_seq_exclusive,
558                        rpc_index,
559                        !config.killswitch_tombstone_pruning,
560                    )
561                    .await?
562                }
563                PruningMode::Checkpoints => Self::prune_checkpoints(
564                    perpetual_db,
565                    checkpoint_store,
566                    checkpoint_number,
567                    checkpoints_to_prune,
568                    checkpoint_content_to_prune,
569                    &effects_to_prune,
570                    metrics.clone(),
571                )?,
572            };
573        }
574        Ok(())
575    }
576
577    fn prune_indexes(
578        indexes: Option<&IndexStore>,
579        config: &AuthorityStorePruningConfig,
580        epoch_duration_ms: u64,
581        metrics: &AuthorityStorePruningMetrics,
582    ) -> anyhow::Result<()> {
583        if let (Some(mut epochs_to_retain), Some(indexes)) =
584            (config.num_epochs_to_retain_for_indexes, indexes)
585        {
586            if epochs_to_retain < 7 {
587                warn!("num_epochs_to_retain_for_indexes is too low. Reseting it to 7");
588                epochs_to_retain = 7;
589            }
590            let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
591            if let Some(cut_time_ms) =
592                u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
593            {
594                let transaction_id = indexes.prune(cut_time_ms)?;
595                metrics
596                    .last_pruned_indexes_transaction
597                    .set(transaction_id as i64);
598            }
599        }
600        Ok(())
601    }
602
603    async fn prune_executed_tx_digests(
604        perpetual_db: &Arc<AuthorityPerpetualTables>,
605        checkpoint_store: &Arc<CheckpointStore>,
606    ) -> anyhow::Result<()> {
607        let current_epoch = checkpoint_store
608            .get_highest_executed_checkpoint()?
609            .map(|c| c.epoch)
610            .unwrap_or_default();
611
612        if current_epoch < 2 {
613            return Ok(());
614        }
615
616        let target_epoch = current_epoch - 1;
617
618        let start_key = (0u64, TransactionDigest::ZERO);
619        let end_key = (target_epoch, TransactionDigest::ZERO);
620
621        info!(
622            "Pruning executed_transaction_digests for epochs < {} (current epoch: {})",
623            target_epoch, current_epoch
624        );
625
626        let mut batch = perpetual_db.executed_transaction_digests.batch();
627        batch.schedule_delete_range(
628            &perpetual_db.executed_transaction_digests,
629            &start_key,
630            // `to` is non-inclusive so target_epoch and all later epochs are preserved
631            &end_key,
632        )?;
633        batch.write()?;
634        Ok(())
635    }
636
637    #[cfg(tidehunter)]
638    fn prune_executed_tx_digests_th(
639        perpetual_db: &Arc<AuthorityPerpetualTables>,
640        checkpoint_store: &Arc<CheckpointStore>,
641    ) -> anyhow::Result<()> {
642        let current_epoch = checkpoint_store
643            .get_highest_executed_checkpoint()?
644            .map(|c| c.epoch)
645            .unwrap_or_default();
646
647        if current_epoch < 2 {
648            return Ok(());
649        }
650
651        let last_epoch_to_delete = current_epoch - 2;
652        let from_key = (0u64, TransactionDigest::ZERO);
653        let to_key = (last_epoch_to_delete, TransactionDigest::new([0xff; 32]));
654        info!(
655            "Pruning executed_transaction_digests for epochs 0 to {} (current epoch: {})",
656            last_epoch_to_delete, current_epoch
657        );
658        perpetual_db
659            .executed_transaction_digests
660            .drop_cells_in_range(&from_key, &to_key)?;
661        Ok(())
662    }
663
664    fn update_pruning_watermarks(
665        perpetual_db: &Arc<AuthorityPerpetualTables>,
666        checkpoint_store: &Arc<CheckpointStore>,
667        num_epochs_to_retain: u64,
668        pruning_watermark: &Arc<PrunerWatermarks>,
669        objects_compactor_active: bool,
670    ) -> anyhow::Result<bool> {
671        use std::sync::atomic::Ordering;
672        let objects_pruning_checkpoint_id = perpetual_db
673            .get_highest_pruned_checkpoint()?
674            .unwrap_or_default();
675        let objects_pruning_epoch_id = checkpoint_store
676            .get_checkpoint_by_sequence_number(objects_pruning_checkpoint_id)?
677            .map(|chk| chk.epoch)
678            .unwrap_or_default();
679
680        let current_watermark = pruning_watermark.epoch_id.load(Ordering::Relaxed);
681        let current_epoch_id = checkpoint_store
682            .get_highest_executed_checkpoint()?
683            .map(|c| c.epoch)
684            .unwrap_or_default();
685        if current_epoch_id < num_epochs_to_retain {
686            return Ok(false);
687        }
688        let target_epoch_id = current_epoch_id - num_epochs_to_retain;
689        let checkpoint_id =
690            checkpoint_store.get_epoch_last_checkpoint_seq_number(target_epoch_id)?;
691
692        // The objects compactor handles object retention continuously without advancing
693        // `highest_pruned_checkpoint`, so capping on it would freeze the watermark at 0.
694        let new_watermark = if objects_compactor_active {
695            target_epoch_id + 1
696        } else {
697            min(target_epoch_id + 1, objects_pruning_epoch_id)
698        };
699        if current_watermark == new_watermark {
700            return Ok(false);
701        }
702        info!("relocation: setting epoch watermark to {}", new_watermark);
703        pruning_watermark
704            .epoch_id
705            .store(new_watermark, Ordering::Relaxed);
706        if let Some(checkpoint_id) = checkpoint_id {
707            let watermark = if objects_compactor_active {
708                checkpoint_id
709            } else {
710                min(checkpoint_id, objects_pruning_checkpoint_id)
711            };
712            info!("relocation: setting checkpoint watermark to {}", watermark);
713            pruning_watermark
714                .checkpoint_id
715                .store(watermark, Ordering::Relaxed);
716        }
717        Ok(true)
718    }
719
720    #[cfg(tidehunter)]
721    fn prune_th(
722        perpetual_db: &Arc<AuthorityPerpetualTables>,
723        checkpoint_store: &Arc<CheckpointStore>,
724        num_epochs_to_retain: u64,
725        pruning_watermark: Arc<PrunerWatermarks>,
726        objects_compactor_active: bool,
727    ) -> anyhow::Result<()> {
728        let watermark_updated = Self::update_pruning_watermarks(
729            perpetual_db,
730            checkpoint_store,
731            num_epochs_to_retain,
732            &pruning_watermark,
733            objects_compactor_active,
734        )?;
735        if !watermark_updated {
736            info!("skip relocation. Watermark hasn't changed");
737            return Ok(());
738        }
739        perpetual_db.objects.db.start_relocation()?;
740        checkpoint_store.tables.watermarks.db.start_relocation()?;
741        Self::prune_executed_tx_digests_th(perpetual_db, checkpoint_store)?;
742        Ok(())
743    }
744
745    fn compact_next_sst_file(
746        perpetual_db: Arc<AuthorityPerpetualTables>,
747        delay_days: usize,
748        last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
749    ) -> anyhow::Result<Option<LiveFile>> {
750        let db_path = perpetual_db.objects.db.path_for_pruning();
751        let mut state = last_processed
752            .lock()
753            .expect("failed to obtain a lock for last processed SST files");
754        let mut sst_file_for_compaction: Option<LiveFile> = None;
755        let time_threshold =
756            SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
757        for sst_file in perpetual_db.objects.db.live_files()? {
758            let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
759            let last_modified = std::fs::metadata(file_path)?.modified()?;
760            if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
761                || sst_file.level < 1
762                || sst_file.start_key.is_none()
763                || sst_file.end_key.is_none()
764                || last_modified > time_threshold
765                || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
766            {
767                continue;
768            }
769            if let Some(candidate) = &sst_file_for_compaction
770                && candidate.size > sst_file.size
771            {
772                continue;
773            }
774            sst_file_for_compaction = Some(sst_file);
775        }
776        let Some(sst_file) = sst_file_for_compaction else {
777            return Ok(None);
778        };
779        info!(
780            "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
781            sst_file.name, sst_file.size, sst_file.level
782        );
783        perpetual_db.objects.compact_range_raw(
784            &sst_file.column_family_name,
785            sst_file.start_key.clone().unwrap(),
786            sst_file.end_key.clone().unwrap(),
787        )?;
788        state.insert(sst_file.name.clone(), SystemTime::now());
789        Ok(Some(sst_file))
790    }
791
792    fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
793        min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
794    }
795
796    fn smoothed_max_eligible_checkpoint_number(
797        checkpoint_store: &Arc<CheckpointStore>,
798        mut max_eligible_checkpoint: CheckpointSequenceNumber,
799        pruned_checkpoint: CheckpointSequenceNumber,
800        epoch_id: EpochId,
801        epoch_duration_ms: u64,
802        num_epochs_to_retain: u64,
803    ) -> anyhow::Result<CheckpointSequenceNumber> {
804        if epoch_id < num_epochs_to_retain {
805            return Ok(0);
806        }
807        let last_checkpoint_in_epoch = checkpoint_store
808            .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
809            .map(|checkpoint| checkpoint.sequence_number)
810            .unwrap_or_default();
811        max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
812        if max_eligible_checkpoint == 0 {
813            return Ok(max_eligible_checkpoint);
814        }
815        let num_intervals = epoch_duration_ms
816            .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
817            .unwrap_or(1);
818        let delta = max_eligible_checkpoint
819            .checked_sub(pruned_checkpoint)
820            .unwrap_or_default()
821            .checked_div(num_intervals)
822            .unwrap_or(1);
823        Ok(pruned_checkpoint + delta)
824    }
825
826    fn setup_pruning(
827        config: AuthorityStorePruningConfig,
828        epoch_duration_ms: u64,
829        perpetual_db: Arc<AuthorityPerpetualTables>,
830        checkpoint_store: Arc<CheckpointStore>,
831        rpc_index: Option<Arc<RpcIndexStore>>,
832        jsonrpc_index: Option<Arc<IndexStore>>,
833        metrics: Arc<AuthorityStorePruningMetrics>,
834        pruner_watermarks: Arc<PrunerWatermarks>,
835    ) -> Sender<()> {
836        let (sender, mut recv) = tokio::sync::oneshot::channel();
837        debug!(
838            "Starting object pruning service with num_epochs_to_retain={}",
839            config.num_epochs_to_retain
840        );
841
842        let tick_duration =
843            Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
844        let pruning_initial_delay = if cfg!(msim) {
845            Duration::from_millis(1)
846        } else {
847            Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
848        };
849        let mut objects_prune_interval =
850            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
851        let mut checkpoints_prune_interval =
852            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
853
854        metrics
855            .num_epochs_to_retain_for_objects
856            .set(config.num_epochs_to_retain as i64);
857        metrics.num_epochs_to_retain_for_checkpoints.set(
858            config
859                .num_epochs_to_retain_for_checkpoints
860                .unwrap_or_default() as i64,
861        );
862
863        #[cfg(tidehunter)]
864        {
865            if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
866                let prune_objects = config.num_epochs_to_retain != u64::MAX;
867                tokio::task::spawn(async move {
868                    loop {
869                        tokio::select! {
870                            _ = objects_prune_interval.tick(), if prune_objects => {
871                                if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
872                                    error!("Failed to prune objects: {:?}", err);
873                                }
874                            },
875                            _ = checkpoints_prune_interval.tick() => {
876                                if let Err(err) = Self::prune_th(&perpetual_db, &checkpoint_store, num_epochs_to_retain, pruner_watermarks.clone(), !prune_objects) {
877                                    error!("Failed to prune checkpoints: {:?}", err);
878                                }
879                            },
880                            _ = &mut recv => break,
881                        }
882                    }
883                });
884            }
885        }
886        #[cfg(not(tidehunter))]
887        {
888            let mut indexes_prune_interval =
889                tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
890
891            let perpetual_db_for_compaction = perpetual_db.clone();
892            if let Some(delay_days) = config.periodic_compaction_threshold_days {
893                spawn_monitored_task!(async move {
894                    let last_processed = Arc::new(Mutex::new(HashMap::new()));
895                    loop {
896                        let db = perpetual_db_for_compaction.clone();
897                        let state = Arc::clone(&last_processed);
898                        let result = tokio::task::spawn_blocking(move || {
899                            Self::compact_next_sst_file(db, delay_days, state)
900                        })
901                        .await;
902                        let mut sleep_interval_secs = 1;
903                        match result {
904                            Err(err) => error!("Failed to compact sst file: {:?}", err),
905                            Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
906                            Ok(Ok(None)) => {
907                                sleep_interval_secs = 3600;
908                            }
909                            _ => {}
910                        }
911                        tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
912                    }
913                });
914            }
915            tokio::task::spawn(async move {
916                loop {
917                    tokio::select! {
918                        _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
919                            if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
920                                error!("Failed to prune objects: {:?}", err);
921                            }
922                            if let Err(err) = Self::prune_executed_tx_digests(&perpetual_db, &checkpoint_store).await {
923                                error!("Failed to prune executed_tx_digests: {:?}", err);
924                            }
925                        },
926                        _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
927                            if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms, &pruner_watermarks).await {
928                                error!("Failed to prune checkpoints: {:?}", err);
929                            }
930                        },
931                        _ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
932                            if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
933                                error!("Failed to prune indexes: {:?}", err);
934                            }
935                        }
936                        _ = &mut recv => break,
937                    }
938                }
939            });
940        }
941        sender
942    }
943
944    pub fn new(
945        perpetual_db: Arc<AuthorityPerpetualTables>,
946        checkpoint_store: Arc<CheckpointStore>,
947        rpc_index: Option<Arc<RpcIndexStore>>,
948        jsonrpc_index: Option<Arc<IndexStore>>,
949        mut pruning_config: AuthorityStorePruningConfig,
950        is_validator: bool,
951        epoch_duration_ms: u64,
952        registry: &Registry,
953        pruner_watermarks: Arc<PrunerWatermarks>, // used by tidehunter relocation filters
954    ) -> Self {
955        // On tidehunter, the per-keyspace `objects_compactor`
956        // (see `AuthorityPerpetualTables::open`) already retains only the latest
957        // version per ObjectID during compaction, so running the object pruner
958        // would duplicate that work. The compactor is enabled for validators
959        // and for any node configured with `num_epochs_to_retain = 0` — keep
960        // this predicate in sync with the one in `sui-node/src/lib.rs`.
961        // Force-disable the pruner whenever the compactor is active.
962        #[cfg(tidehunter)]
963        {
964            let objects_compactor_enabled =
965                is_validator || pruning_config.num_epochs_to_retain == 0;
966            if objects_compactor_enabled && pruning_config.num_epochs_to_retain != u64::MAX {
967                info!(
968                    "Tidehunter: disabling object pruner (was num_epochs_to_retain={}). The objects compactor performs equivalent compaction.",
969                    pruning_config.num_epochs_to_retain
970                );
971                pruning_config.num_epochs_to_retain = u64::MAX;
972            }
973        }
974
975        if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
976        {
977            warn!(
978                "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
979                pruning_config.num_epochs_to_retain
980            );
981            if is_validator {
982                warn!("Resetting to aggressive pruner.");
983                pruning_config.num_epochs_to_retain = 0;
984            } else {
985                warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
986            }
987        }
988        AuthorityStorePruner {
989            _objects_pruner_cancel_handle: Self::setup_pruning(
990                pruning_config,
991                epoch_duration_ms,
992                perpetual_db,
993                checkpoint_store,
994                rpc_index,
995                jsonrpc_index,
996                AuthorityStorePruningMetrics::new(registry),
997                pruner_watermarks,
998            ),
999        }
1000    }
1001
1002    pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
1003        perpetual_db.objects.compact_range(
1004            &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
1005            &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
1006        )
1007    }
1008}
1009
1010#[cfg(tidehunter)]
1011pub(crate) fn apply_relocation_filter<T: DeserializeOwned>(
1012    config: typed_store::tidehunter_util::KeySpaceConfig,
1013    pruner_watermark: Arc<AtomicU64>,
1014    extractor: impl Fn(T) -> u64 + Send + Sync + 'static,
1015    by_key: bool,
1016) -> typed_store::tidehunter_util::KeySpaceConfig {
1017    use bincode::Options;
1018    use std::sync::atomic::Ordering;
1019    use typed_store::tidehunter_util::Decision;
1020    config.with_relocation_filter(move |key, value| {
1021        let data = if by_key {
1022            bincode::DefaultOptions::new()
1023                .with_big_endian()
1024                .with_fixint_encoding()
1025                .deserialize(&key)
1026                .expect("relocation filter deserialization error")
1027        } else {
1028            bcs::from_bytes(&value).expect("relocation filter deserialization error")
1029        };
1030        if extractor(data) < pruner_watermark.load(Ordering::Relaxed) {
1031            Decision::Remove
1032        } else {
1033            Decision::StopRelocation
1034        }
1035    })
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040    use more_asserts as ma;
1041    use std::path::Path;
1042    use std::time::Duration;
1043    use std::{collections::HashSet, sync::Arc};
1044    use tracing::log::info;
1045
1046    use crate::authority::authority_store_pruner::AuthorityStorePruningMetrics;
1047    use crate::authority::authority_store_tables::AuthorityPerpetualTables;
1048    use crate::authority::authority_store_types::{
1049        StoreObject, StoreObjectWrapper, get_store_object,
1050    };
1051    use prometheus::Registry;
1052    use sui_types::base_types::ObjectDigest;
1053    use sui_types::effects::TransactionEffects;
1054    use sui_types::effects::TransactionEffectsAPI;
1055    use sui_types::{
1056        base_types::{ObjectID, SequenceNumber},
1057        object::Object,
1058        storage::ObjectKey,
1059    };
1060    use typed_store::Map;
1061    use typed_store::rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options};
1062
1063    use super::AuthorityStorePruner;
1064
1065    fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
1066        let perpetual_db_path = path.join(Path::new("perpetual"));
1067        let cf_names = AuthorityPerpetualTables::describe_tables();
1068        let cfs: Vec<_> = cf_names
1069            .keys()
1070            .map(|x| (x.as_str(), default_db_options().options))
1071            .collect();
1072        let perpetual_db = typed_store::rocks::open_cf_opts(
1073            perpetual_db_path,
1074            None,
1075            MetricConf::new("perpetual_pruning"),
1076            &cfs,
1077        );
1078
1079        let mut after_pruning = HashSet::new();
1080        let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
1081            &perpetual_db?,
1082            Some("objects"),
1083            // open the db to bypass default db options which ignores range tombstones
1084            // so we can read the accurate number of retained versions
1085            &ReadWriteOptions::default(),
1086            false,
1087        )?;
1088        let iter = objects.safe_iter();
1089        for item in iter {
1090            after_pruning.insert(item?.0);
1091        }
1092        Ok(after_pruning)
1093    }
1094
1095    type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
1096
1097    fn generate_test_data(
1098        db: Arc<AuthorityPerpetualTables>,
1099        num_versions_per_object: u64,
1100        num_object_versions_to_retain: u64,
1101        total_unique_object_ids: u32,
1102    ) -> Result<GenerateTestDataResult, anyhow::Error> {
1103        assert!(num_versions_per_object >= num_object_versions_to_retain);
1104
1105        let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
1106        let mut batch = db.objects.batch();
1107
1108        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
1109        for id in ids {
1110            for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
1111                let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
1112                if counter < num_object_versions_to_retain.try_into().unwrap() {
1113                    // latest `num_object_versions_to_retain` should not have been pruned
1114                    to_keep.push(object_key);
1115                } else {
1116                    to_delete.push(object_key);
1117                }
1118                let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1119                batch.insert_batch(
1120                    &db.objects,
1121                    [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
1122                )?;
1123            }
1124
1125            // Adding a tombstone for deleted object.
1126            if num_object_versions_to_retain == 0 {
1127                let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
1128                println!("Adding tombstone object {:?}", tombstone_key);
1129                batch.insert_batch(
1130                    &db.objects,
1131                    [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
1132                )?;
1133                tombstones.push(tombstone_key);
1134            }
1135        }
1136        batch.write().unwrap();
1137        assert_eq!(
1138            to_keep.len() as u64,
1139            std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
1140                * total_unique_object_ids as u64
1141        );
1142        assert_eq!(
1143            tombstones.len() as u64,
1144            if num_object_versions_to_retain == 0 {
1145                total_unique_object_ids as u64
1146            } else {
1147                0
1148            }
1149        );
1150        Ok((to_keep, to_delete, tombstones))
1151    }
1152
1153    async fn run_pruner(
1154        path: &Path,
1155        num_versions_per_object: u64,
1156        num_object_versions_to_retain: u64,
1157        total_unique_object_ids: u32,
1158    ) -> Vec<ObjectKey> {
1159        let registry = Registry::default();
1160        let metrics = AuthorityStorePruningMetrics::new(&registry);
1161        let to_keep = {
1162            let db = Arc::new(AuthorityPerpetualTables::open(path, None, None));
1163            let (to_keep, to_delete, tombstones) = generate_test_data(
1164                db.clone(),
1165                num_versions_per_object,
1166                num_object_versions_to_retain,
1167                total_unique_object_ids,
1168            )
1169            .unwrap();
1170            let mut effects = TransactionEffects::default();
1171            for object in to_delete {
1172                effects.unsafe_add_deleted_live_object_for_testing((
1173                    object.0,
1174                    object.1,
1175                    ObjectDigest::MIN,
1176                ));
1177            }
1178            for object in tombstones {
1179                effects.unsafe_add_object_tombstone_for_testing((
1180                    object.0,
1181                    object.1,
1182                    ObjectDigest::MIN,
1183                ));
1184            }
1185            AuthorityStorePruner::prune_objects_and_indexes(
1186                vec![effects],
1187                &db,
1188                0,
1189                metrics,
1190                0,
1191                None,
1192                true,
1193            )
1194            .await
1195            .unwrap();
1196            to_keep
1197        };
1198        tokio::time::sleep(Duration::from_secs(3)).await;
1199        to_keep
1200    }
1201
1202    // Tests pruning old version of live objects.
1203    #[cfg(not(tidehunter))]
1204    #[tokio::test]
1205    async fn test_pruning_objects() {
1206        let path = tempfile::tempdir().unwrap().keep();
1207        let to_keep = run_pruner(&path, 3, 2, 1000).await;
1208        assert_eq!(
1209            HashSet::from_iter(to_keep),
1210            get_keys_after_pruning(&path).unwrap()
1211        );
1212        run_pruner(&tempfile::tempdir().unwrap().keep(), 3, 2, 1000).await;
1213    }
1214
1215    // Tests pruning deleted objects (object tombstones).
1216    #[cfg(not(tidehunter))]
1217    #[tokio::test]
1218    async fn test_pruning_tombstones() {
1219        let path = tempfile::tempdir().unwrap().keep();
1220        let to_keep = run_pruner(&path, 0, 0, 1000).await;
1221        assert_eq!(to_keep.len(), 0);
1222        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1223
1224        let path = tempfile::tempdir().unwrap().keep();
1225        let to_keep = run_pruner(&path, 3, 0, 1000).await;
1226        assert_eq!(to_keep.len(), 0);
1227        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1228    }
1229
1230    #[cfg(not(target_env = "msvc"))]
1231    #[tokio::test]
1232    async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1233        let primary_path = tempfile::tempdir()?.keep();
1234        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None, None));
1235        let total_unique_object_ids = 10_000;
1236        let num_versions_per_object = 10;
1237        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1238        let mut to_delete = vec![];
1239        for id in ids {
1240            for i in (0..num_versions_per_object).rev() {
1241                if i < num_versions_per_object - 2 {
1242                    to_delete.push((id, SequenceNumber::from(i)));
1243                }
1244                let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1245                perpetual_db
1246                    .objects
1247                    .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1248            }
1249        }
1250
1251        fn get_sst_size(path: &Path) -> u64 {
1252            let mut size = 0;
1253            for entry in std::fs::read_dir(path).unwrap() {
1254                let entry = entry.unwrap();
1255                let path = entry.path();
1256                if let Some(ext) = path.extension() {
1257                    if ext != "sst" {
1258                        continue;
1259                    }
1260                    size += std::fs::metadata(path).unwrap().len();
1261                }
1262            }
1263            size
1264        }
1265
1266        let db_path = primary_path.clone().join("perpetual");
1267        let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1268        let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1269
1270        perpetual_db.objects.compact_range(&start, &end)?;
1271        let before_compaction_size = get_sst_size(&db_path);
1272
1273        let mut effects = TransactionEffects::default();
1274        for object in to_delete {
1275            effects.unsafe_add_deleted_live_object_for_testing((
1276                object.0,
1277                object.1,
1278                ObjectDigest::MIN,
1279            ));
1280        }
1281        let registry = Registry::default();
1282        let metrics = AuthorityStorePruningMetrics::new(&registry);
1283        let total_pruned = AuthorityStorePruner::prune_objects_and_indexes(
1284            vec![effects],
1285            &perpetual_db,
1286            0,
1287            metrics,
1288            0,
1289            None,
1290            true,
1291        )
1292        .await;
1293        info!("Total pruned keys = {:?}", total_pruned);
1294
1295        perpetual_db.objects.compact_range(&start, &end)?;
1296        let after_compaction_size = get_sst_size(&db_path);
1297
1298        info!(
1299            "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1300            before_compaction_size, after_compaction_size
1301        );
1302        ma::assert_le!(after_compaction_size, before_compaction_size);
1303        Ok(())
1304    }
1305}