sui_core/authority/
authority_store_pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::authority_store_tables::AuthorityPerpetualTables;
5use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
6use crate::jsonrpc_index::IndexStore;
7use crate::rpc_index::RpcIndexStore;
8use anyhow::anyhow;
9use mysten_metrics::{monitored_scope, spawn_monitored_task};
10use once_cell::sync::Lazy;
11use prometheus::{
12    IntCounter, IntGauge, Registry, register_int_counter_with_registry,
13    register_int_gauge_with_registry,
14};
15#[cfg(tidehunter)]
16use serde::de::DeserializeOwned;
17use std::cmp::{max, min};
18use std::collections::{BTreeSet, HashMap};
19use std::sync::Mutex;
20use std::sync::atomic::AtomicU64;
21use std::time::{SystemTime, UNIX_EPOCH};
22use std::{sync::Arc, time::Duration};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_rpc_store::Store as RpcStore;
25use sui_types::committee::EpochId;
26use sui_types::effects::TransactionEffects;
27use sui_types::effects::TransactionEffectsAPI;
28use sui_types::message_envelope::Message;
29use sui_types::messages_checkpoint::{
30    CheckpointContents, CheckpointDigest, CheckpointSequenceNumber,
31};
32use sui_types::{
33    base_types::{ObjectID, SequenceNumber, TransactionDigest, VersionNumber},
34    storage::ObjectKey,
35};
36use tokio::sync::oneshot::{self, Sender};
37use tokio::time::Instant;
38use tracing::{debug, error, info, warn};
39use typed_store::rocksdb::LiveFile;
40use typed_store::{Map, TypedStoreError};
41
42static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
43    [
44        "objects",
45        "effects",
46        "transactions",
47        "events",
48        "executed_effects",
49        "executed_transactions_to_checkpoint",
50    ]
51    .into_iter()
52    .map(|cf| cf.to_string())
53    .collect()
54});
55pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
56pub struct AuthorityStorePruner {
57    _objects_pruner_cancel_handle: oneshot::Sender<()>,
58}
59
60#[derive(Default)]
61pub struct PrunerWatermarks {
62    pub epoch_id: Arc<AtomicU64>,
63    pub checkpoint_id: Arc<AtomicU64>,
64}
65
66static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
67
68pub struct AuthorityStorePruningMetrics {
69    pub last_pruned_checkpoint: IntGauge,
70    pub num_pruned_objects: IntCounter,
71    pub num_pruned_tombstones: IntCounter,
72    pub last_pruned_effects_checkpoint: IntGauge,
73    pub last_pruned_indexes_transaction: IntGauge,
74    pub num_epochs_to_retain_for_objects: IntGauge,
75    pub num_epochs_to_retain_for_checkpoints: IntGauge,
76}
77
78impl AuthorityStorePruningMetrics {
79    pub fn new(registry: &Registry) -> Arc<Self> {
80        let this = Self {
81            last_pruned_checkpoint: register_int_gauge_with_registry!(
82                "last_pruned_checkpoint",
83                "Last pruned checkpoint",
84                registry
85            )
86            .unwrap(),
87            num_pruned_objects: register_int_counter_with_registry!(
88                "num_pruned_objects",
89                "Number of pruned objects",
90                registry
91            )
92            .unwrap(),
93            num_pruned_tombstones: register_int_counter_with_registry!(
94                "num_pruned_tombstones",
95                "Number of pruned tombstones",
96                registry
97            )
98            .unwrap(),
99            last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
100                "last_pruned_effects_checkpoint",
101                "Last pruned effects checkpoint",
102                registry
103            )
104            .unwrap(),
105            last_pruned_indexes_transaction: register_int_gauge_with_registry!(
106                "last_pruned_indexes_transaction",
107                "Last pruned indexes transaction",
108                registry
109            )
110            .unwrap(),
111            num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
112                "num_epochs_to_retain_for_objects",
113                "Number of epochs to retain for objects",
114                registry
115            )
116            .unwrap(),
117            num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
118                "num_epochs_to_retain_for_checkpoints",
119                "Number of epochs to retain for checkpoints",
120                registry
121            )
122            .unwrap(),
123        };
124        Arc::new(this)
125    }
126
127    pub fn new_for_test() -> Arc<Self> {
128        Self::new(&Registry::new())
129    }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq)]
133pub enum PruningMode {
134    Objects,
135    Checkpoints,
136}
137
138impl AuthorityStorePruner {
139    /// prunes old versions of objects based on transaction effects
140    #[cfg(not(tidehunter))]
141    async fn prune_objects_and_indexes(
142        transaction_effects: Vec<TransactionEffects>,
143        perpetual_db: &Arc<AuthorityPerpetualTables>,
144        checkpoint_number: CheckpointSequenceNumber,
145        metrics: Arc<AuthorityStorePruningMetrics>,
146        pruned_tx_seq_exclusive: u64,
147        rpc_index: Option<&RpcIndexStore>,
148        rpc_store: Option<&RpcStore>,
149        enable_pruning_tombstones: bool,
150    ) -> anyhow::Result<()> {
151        let _scope = monitored_scope("ObjectsLivePruner");
152        let mut wb = perpetual_db.objects.batch();
153
154        // Collect objects keys that need to be deleted from `transaction_effects`.
155        let mut live_object_keys_to_prune = vec![];
156        let mut object_tombstones_to_prune = vec![];
157        for effects in &transaction_effects {
158            for (object_id, seq_number) in effects.modified_at_versions() {
159                live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
160            }
161
162            if enable_pruning_tombstones {
163                for deleted_object_key in effects.all_tombstones() {
164                    object_tombstones_to_prune
165                        .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
166                }
167            }
168        }
169
170        metrics
171            .num_pruned_objects
172            .inc_by(live_object_keys_to_prune.len() as u64);
173        metrics
174            .num_pruned_tombstones
175            .inc_by(object_tombstones_to_prune.len() as u64);
176
177        let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
178        for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
179            updates
180                .entry(object_id)
181                .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
182                .or_insert((seq_number, seq_number));
183        }
184
185        for (object_id, (min_version, max_version)) in updates {
186            debug!(
187                "Pruning object {:?} versions {:?} - {:?}",
188                object_id, min_version, max_version
189            );
190            let start_range = ObjectKey(object_id, min_version);
191            let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
192            wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
193        }
194
195        // When enable_pruning_tombstones is enabled, instead of using range deletes, we need to do a scan of all the keys
196        // for the deleted objects and then do point deletes to delete all the existing keys. This is because to improve read
197        // performance, we set `ignore_range_deletions` on all read options, and using range delete to delete tombstones
198        // may leak object (imagine a tombstone is compacted away, but earlier version is still not). Using point deletes
199        // guarantees that all earlier versions are deleted in the database.
200        if !object_tombstones_to_prune.is_empty() {
201            let mut object_keys_to_delete = vec![];
202            for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
203                for result in perpetual_db.objects.safe_iter_with_bounds(
204                    Some(ObjectKey(object_id, VersionNumber::MIN)),
205                    Some(ObjectKey(object_id, seq_number.next())),
206                ) {
207                    let (object_key, _) = result?;
208                    assert_eq!(object_key.0, object_id);
209                    object_keys_to_delete.push(object_key);
210                }
211            }
212
213            wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
214        }
215
216        perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
217        metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
218
219        wb.write()?;
220
221        if let Some(rpc_index) = rpc_index {
222            rpc_index.prune(checkpoint_number, pruned_tx_seq_exclusive)?;
223        }
224        if let Some(rpc_store) = rpc_store {
225            // Prune the embedded rpc-store's history cohort on the same
226            // floor, in lockstep with the legacy index above.
227            sui_rpc_store::prune_history_cohort(
228                rpc_store.db(),
229                rpc_store.schema(),
230                checkpoint_number,
231                pruned_tx_seq_exclusive,
232            )?;
233        }
234
235        Ok(())
236    }
237
238    #[cfg(tidehunter)]
239    async fn prune_objects_and_indexes(
240        transaction_effects: Vec<TransactionEffects>,
241        perpetual_db: &Arc<AuthorityPerpetualTables>,
242        checkpoint_number: CheckpointSequenceNumber,
243        metrics: Arc<AuthorityStorePruningMetrics>,
244        pruned_tx_seq_exclusive: u64,
245        rpc_index: Option<&RpcIndexStore>,
246        rpc_store: Option<&RpcStore>,
247        _: bool,
248    ) -> anyhow::Result<()> {
249        let _scope = monitored_scope("ObjectsLivePruner");
250        let mut wb = perpetual_db.objects.batch();
251        let mut objects_to_prune = vec![];
252
253        for effects in &transaction_effects {
254            for (object_id, version) in effects
255                .modified_at_versions()
256                .into_iter()
257                .chain(effects.all_tombstones())
258            {
259                debug!("Pruning object {:?} version {:?}", object_id, version);
260                objects_to_prune.push(ObjectKey(object_id, version));
261            }
262        }
263        metrics
264            .num_pruned_objects
265            .inc_by(objects_to_prune.len() as u64);
266        wb.delete_batch(&perpetual_db.objects, &objects_to_prune)?;
267
268        perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
269        metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
270        wb.write()?;
271
272        if let Some(rpc_index) = rpc_index {
273            rpc_index.prune(checkpoint_number, pruned_tx_seq_exclusive)?;
274        }
275        if let Some(rpc_store) = rpc_store {
276            // Prune the embedded rpc-store's history cohort on the same
277            // floor, in lockstep with the legacy index above.
278            sui_rpc_store::prune_history_cohort(
279                rpc_store.db(),
280                rpc_store.schema(),
281                checkpoint_number,
282                pruned_tx_seq_exclusive,
283            )?;
284        }
285
286        Ok(())
287    }
288
289    fn prune_checkpoints(
290        perpetual_db: &Arc<AuthorityPerpetualTables>,
291        checkpoint_db: &Arc<CheckpointStore>,
292        checkpoint_number: CheckpointSequenceNumber,
293        checkpoints_to_prune: Vec<CheckpointDigest>,
294        checkpoint_content_to_prune: Vec<CheckpointContents>,
295        effects_to_prune: &Vec<TransactionEffects>,
296        metrics: Arc<AuthorityStorePruningMetrics>,
297    ) -> anyhow::Result<()> {
298        let _scope = monitored_scope("EffectsLivePruner");
299
300        let mut perpetual_batch = perpetual_db.objects.batch();
301        let transactions: Vec<_> = checkpoint_content_to_prune
302            .iter()
303            .flat_map(|content| content.iter().map(|tx| tx.transaction))
304            .collect();
305
306        perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
307        perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
308        perpetual_batch.delete_batch(
309            &perpetual_db.executed_transactions_to_checkpoint,
310            transactions.iter(),
311        )?;
312
313        let mut effect_digests = vec![];
314        for effects in effects_to_prune {
315            let effects_digest = effects.digest();
316            debug!("Pruning effects {:?}", effects_digest);
317            effect_digests.push(effects_digest);
318
319            if effects.events_digest().is_some() {
320                perpetual_batch
321                    .delete_batch(&perpetual_db.events_2, [effects.transaction_digest()])?;
322            }
323        }
324        perpetual_batch.delete_batch(
325            &perpetual_db.unchanged_loaded_runtime_objects,
326            transactions.iter(),
327        )?;
328        perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
329
330        let mut checkpoints_batch = checkpoint_db.tables.certified_checkpoints.batch();
331
332        let checkpoint_content_digests =
333            checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
334        checkpoints_batch.delete_batch(
335            &checkpoint_db.tables.checkpoint_content,
336            checkpoint_content_digests.clone(),
337        )?;
338        checkpoints_batch.delete_batch(
339            &checkpoint_db.tables.checkpoint_sequence_by_contents_digest,
340            checkpoint_content_digests,
341        )?;
342
343        checkpoints_batch.delete_batch(
344            &checkpoint_db.tables.checkpoint_by_digest,
345            checkpoints_to_prune,
346        )?;
347
348        checkpoints_batch.insert_batch(
349            &checkpoint_db.tables.watermarks,
350            [(
351                &CheckpointWatermark::HighestPruned,
352                &(checkpoint_number, CheckpointDigest::random()),
353            )],
354        )?;
355
356        perpetual_batch.write()?;
357        checkpoints_batch.write()?;
358        metrics
359            .last_pruned_effects_checkpoint
360            .set(checkpoint_number as i64);
361
362        Ok(())
363    }
364
365    /// Prunes old data based on effects from all checkpoints from epochs eligible for pruning
366    pub async fn prune_objects_for_eligible_epochs(
367        perpetual_db: &Arc<AuthorityPerpetualTables>,
368        checkpoint_store: &Arc<CheckpointStore>,
369        rpc_index: Option<&RpcIndexStore>,
370        rpc_store: Option<&RpcStore>,
371        config: AuthorityStorePruningConfig,
372        metrics: Arc<AuthorityStorePruningMetrics>,
373        epoch_duration_ms: u64,
374    ) -> anyhow::Result<()> {
375        let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
376        let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
377            .get_highest_executed_checkpoint()?
378            .map(|c| (*c.sequence_number(), c.epoch))
379            .unwrap_or_default();
380        let pruned_checkpoint_number = perpetual_db
381            .get_highest_pruned_checkpoint()?
382            .unwrap_or_default();
383        if config.smooth && config.num_epochs_to_retain > 0 {
384            max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
385                checkpoint_store,
386                max_eligible_checkpoint_number,
387                pruned_checkpoint_number,
388                epoch_id,
389                epoch_duration_ms,
390                config.num_epochs_to_retain,
391            )?;
392        }
393        Self::prune_for_eligible_epochs(
394            perpetual_db,
395            checkpoint_store,
396            rpc_index,
397            rpc_store,
398            PruningMode::Objects,
399            config.num_epochs_to_retain,
400            pruned_checkpoint_number,
401            max_eligible_checkpoint_number,
402            config,
403            metrics.clone(),
404        )
405        .await
406    }
407
408    pub async fn prune_checkpoints_for_eligible_epochs(
409        perpetual_db: &Arc<AuthorityPerpetualTables>,
410        checkpoint_store: &Arc<CheckpointStore>,
411        rpc_index: Option<&RpcIndexStore>,
412        rpc_store: Option<&RpcStore>,
413        config: AuthorityStorePruningConfig,
414        metrics: Arc<AuthorityStorePruningMetrics>,
415        epoch_duration_ms: u64,
416        pruner_watermarks: &Arc<PrunerWatermarks>,
417    ) -> anyhow::Result<()> {
418        let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
419        let pruned_checkpoint_number = checkpoint_store
420            .get_highest_pruned_checkpoint_seq_number()?
421            .unwrap_or(0);
422        let (mut max_eligible_checkpoint, epoch_id) = checkpoint_store
423            .get_highest_executed_checkpoint()?
424            .map(|c| (*c.sequence_number(), c.epoch))
425            .unwrap_or_default();
426        if config.num_epochs_to_retain != u64::MAX {
427            max_eligible_checkpoint = min(
428                max_eligible_checkpoint,
429                perpetual_db
430                    .get_highest_pruned_checkpoint()?
431                    .unwrap_or_default(),
432            );
433        }
434        if config.smooth
435            && let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints
436        {
437            max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
438                checkpoint_store,
439                max_eligible_checkpoint,
440                pruned_checkpoint_number,
441                epoch_id,
442                epoch_duration_ms,
443                num_epochs_to_retain,
444            )?;
445        }
446        debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
447        Self::prune_for_eligible_epochs(
448            perpetual_db,
449            checkpoint_store,
450            rpc_index,
451            rpc_store,
452            PruningMode::Checkpoints,
453            config
454                .num_epochs_to_retain_for_checkpoints()
455                .ok_or_else(|| anyhow!("config value not set"))?,
456            pruned_checkpoint_number,
457            max_eligible_checkpoint,
458            config.clone(),
459            metrics.clone(),
460        )
461        .await?;
462
463        if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
464            Self::update_pruning_watermarks(
465                perpetual_db,
466                checkpoint_store,
467                num_epochs_to_retain,
468                pruner_watermarks,
469                false,
470            )?;
471        }
472        Ok(())
473    }
474
475    /// Prunes old object versions based on effects from all checkpoints from epochs eligible for pruning
476    pub async fn prune_for_eligible_epochs(
477        perpetual_db: &Arc<AuthorityPerpetualTables>,
478        checkpoint_store: &Arc<CheckpointStore>,
479        rpc_index: Option<&RpcIndexStore>,
480        rpc_store: Option<&RpcStore>,
481        mode: PruningMode,
482        num_epochs_to_retain: u64,
483        starting_checkpoint_number: CheckpointSequenceNumber,
484        max_eligible_checkpoint: CheckpointSequenceNumber,
485        config: AuthorityStorePruningConfig,
486        metrics: Arc<AuthorityStorePruningMetrics>,
487    ) -> anyhow::Result<()> {
488        let _scope = monitored_scope("PruneForEligibleEpochs");
489
490        let mut checkpoint_number = starting_checkpoint_number;
491        let current_epoch = checkpoint_store
492            .get_highest_executed_checkpoint()?
493            .map(|c| c.epoch())
494            .unwrap_or_default();
495
496        let mut checkpoints_to_prune = vec![];
497        let mut checkpoint_content_to_prune = vec![];
498        let mut effects_to_prune = vec![];
499        // Absolute tx-seq floor (exclusive) after pruning the current
500        // batch — the last-pruned checkpoint's `network_total_transactions`.
501        // `RpcIndexStore::prune` consumes this directly instead of summing
502        // checkpoint content sizes.
503        let mut pruned_tx_seq_exclusive = 0u64;
504
505        loop {
506            let Some(ckpt) = checkpoint_store
507                .tables
508                .certified_checkpoints
509                .get(&(checkpoint_number + 1))?
510            else {
511                break;
512            };
513            let checkpoint = ckpt.into_inner();
514            // Skipping because  checkpoint's epoch or checkpoint number is too new.
515            // We have to respect the highest executed checkpoint watermark (including the watermark itself)
516            // because there might be parts of the system that still require access to old object versions
517            // (i.e. state accumulator).
518            if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
519                || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
520            {
521                break;
522            }
523            checkpoint_number = *checkpoint.sequence_number();
524            pruned_tx_seq_exclusive = checkpoint.network_total_transactions;
525
526            let content = checkpoint_store
527                .get_checkpoint_contents(&checkpoint.content_digest)?
528                .ok_or_else(|| {
529                    anyhow::anyhow!(
530                        "checkpoint content data is missing: {}",
531                        checkpoint.sequence_number
532                    )
533                })?;
534            let effects = perpetual_db
535                .effects
536                .multi_get(content.iter().map(|tx| tx.effects))?;
537
538            info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
539            checkpoints_to_prune.push(*checkpoint.digest());
540            checkpoint_content_to_prune.push(content);
541            effects_to_prune.extend(effects.into_iter().flatten());
542
543            if effects_to_prune.len() >= config.max_transactions_in_batch
544                || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
545            {
546                match mode {
547                    PruningMode::Objects => {
548                        Self::prune_objects_and_indexes(
549                            effects_to_prune,
550                            perpetual_db,
551                            checkpoint_number,
552                            metrics.clone(),
553                            pruned_tx_seq_exclusive,
554                            rpc_index,
555                            rpc_store,
556                            !config.killswitch_tombstone_pruning,
557                        )
558                        .await?
559                    }
560                    PruningMode::Checkpoints => Self::prune_checkpoints(
561                        perpetual_db,
562                        checkpoint_store,
563                        checkpoint_number,
564                        checkpoints_to_prune,
565                        checkpoint_content_to_prune,
566                        &effects_to_prune,
567                        metrics.clone(),
568                    )?,
569                };
570                checkpoints_to_prune = vec![];
571                checkpoint_content_to_prune = vec![];
572                effects_to_prune = vec![];
573                // yield back to the tokio runtime. Prevent potential halt of other tasks
574                tokio::task::yield_now().await;
575            }
576        }
577
578        if !checkpoints_to_prune.is_empty() {
579            match mode {
580                PruningMode::Objects => {
581                    Self::prune_objects_and_indexes(
582                        effects_to_prune,
583                        perpetual_db,
584                        checkpoint_number,
585                        metrics.clone(),
586                        pruned_tx_seq_exclusive,
587                        rpc_index,
588                        rpc_store,
589                        !config.killswitch_tombstone_pruning,
590                    )
591                    .await?
592                }
593                PruningMode::Checkpoints => Self::prune_checkpoints(
594                    perpetual_db,
595                    checkpoint_store,
596                    checkpoint_number,
597                    checkpoints_to_prune,
598                    checkpoint_content_to_prune,
599                    &effects_to_prune,
600                    metrics.clone(),
601                )?,
602            };
603        }
604        Ok(())
605    }
606
607    fn prune_indexes(
608        indexes: Option<&IndexStore>,
609        config: &AuthorityStorePruningConfig,
610        epoch_duration_ms: u64,
611        metrics: &AuthorityStorePruningMetrics,
612    ) -> anyhow::Result<()> {
613        if let (Some(mut epochs_to_retain), Some(indexes)) =
614            (config.num_epochs_to_retain_for_indexes, indexes)
615        {
616            if epochs_to_retain < 7 {
617                warn!("num_epochs_to_retain_for_indexes is too low. Reseting it to 7");
618                epochs_to_retain = 7;
619            }
620            let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
621            if let Some(cut_time_ms) =
622                u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
623            {
624                let transaction_id = indexes.prune(cut_time_ms)?;
625                metrics
626                    .last_pruned_indexes_transaction
627                    .set(transaction_id as i64);
628            }
629        }
630        Ok(())
631    }
632
633    async fn prune_executed_tx_digests(
634        perpetual_db: &Arc<AuthorityPerpetualTables>,
635        checkpoint_store: &Arc<CheckpointStore>,
636    ) -> anyhow::Result<()> {
637        let current_epoch = checkpoint_store
638            .get_highest_executed_checkpoint()?
639            .map(|c| c.epoch)
640            .unwrap_or_default();
641
642        if current_epoch < 2 {
643            return Ok(());
644        }
645
646        let target_epoch = current_epoch - 1;
647
648        let start_key = (0u64, TransactionDigest::ZERO);
649        let end_key = (target_epoch, TransactionDigest::ZERO);
650
651        info!(
652            "Pruning executed_transaction_digests for epochs < {} (current epoch: {})",
653            target_epoch, current_epoch
654        );
655
656        let mut batch = perpetual_db.executed_transaction_digests.batch();
657        batch.schedule_delete_range(
658            &perpetual_db.executed_transaction_digests,
659            &start_key,
660            // `to` is non-inclusive so target_epoch and all later epochs are preserved
661            &end_key,
662        )?;
663        batch.write()?;
664        Ok(())
665    }
666
667    #[cfg(tidehunter)]
668    fn prune_executed_tx_digests_th(
669        perpetual_db: &Arc<AuthorityPerpetualTables>,
670        checkpoint_store: &Arc<CheckpointStore>,
671    ) -> anyhow::Result<()> {
672        let current_epoch = checkpoint_store
673            .get_highest_executed_checkpoint()?
674            .map(|c| c.epoch)
675            .unwrap_or_default();
676
677        if current_epoch < 2 {
678            return Ok(());
679        }
680
681        let last_epoch_to_delete = current_epoch - 2;
682        let from_key = (0u64, TransactionDigest::ZERO);
683        let to_key = (last_epoch_to_delete, TransactionDigest::new([0xff; 32]));
684        info!(
685            "Pruning executed_transaction_digests for epochs 0 to {} (current epoch: {})",
686            last_epoch_to_delete, current_epoch
687        );
688        perpetual_db
689            .executed_transaction_digests
690            .drop_cells_in_range(&from_key, &to_key)?;
691        Ok(())
692    }
693
694    fn update_pruning_watermarks(
695        perpetual_db: &Arc<AuthorityPerpetualTables>,
696        checkpoint_store: &Arc<CheckpointStore>,
697        num_epochs_to_retain: u64,
698        pruning_watermark: &Arc<PrunerWatermarks>,
699        objects_compactor_active: bool,
700    ) -> anyhow::Result<bool> {
701        use std::sync::atomic::Ordering;
702        let objects_pruning_checkpoint_id = perpetual_db
703            .get_highest_pruned_checkpoint()?
704            .unwrap_or_default();
705        let objects_pruning_epoch_id = checkpoint_store
706            .get_checkpoint_by_sequence_number(objects_pruning_checkpoint_id)?
707            .map(|chk| chk.epoch)
708            .unwrap_or_default();
709
710        let current_watermark = pruning_watermark.epoch_id.load(Ordering::Relaxed);
711        let current_epoch_id = checkpoint_store
712            .get_highest_executed_checkpoint()?
713            .map(|c| c.epoch)
714            .unwrap_or_default();
715        if current_epoch_id < num_epochs_to_retain {
716            return Ok(false);
717        }
718        let target_epoch_id = current_epoch_id - num_epochs_to_retain;
719        let checkpoint_id =
720            checkpoint_store.get_epoch_last_checkpoint_seq_number(target_epoch_id)?;
721
722        // The objects compactor handles object retention continuously without advancing
723        // `highest_pruned_checkpoint`, so capping on it would freeze the watermark at 0.
724        let new_watermark = if objects_compactor_active {
725            target_epoch_id + 1
726        } else {
727            min(target_epoch_id + 1, objects_pruning_epoch_id)
728        };
729        if current_watermark == new_watermark {
730            return Ok(false);
731        }
732        info!("relocation: setting epoch watermark to {}", new_watermark);
733        pruning_watermark
734            .epoch_id
735            .store(new_watermark, Ordering::Relaxed);
736        if let Some(checkpoint_id) = checkpoint_id {
737            let watermark = if objects_compactor_active {
738                checkpoint_id
739            } else {
740                min(checkpoint_id, objects_pruning_checkpoint_id)
741            };
742            info!("relocation: setting checkpoint watermark to {}", watermark);
743            pruning_watermark
744                .checkpoint_id
745                .store(watermark, Ordering::Relaxed);
746        }
747        Ok(true)
748    }
749
750    #[cfg(tidehunter)]
751    fn prune_th(
752        perpetual_db: &Arc<AuthorityPerpetualTables>,
753        checkpoint_store: &Arc<CheckpointStore>,
754        num_epochs_to_retain: u64,
755        pruning_watermark: Arc<PrunerWatermarks>,
756        objects_compactor_active: bool,
757    ) -> anyhow::Result<()> {
758        let watermark_updated = Self::update_pruning_watermarks(
759            perpetual_db,
760            checkpoint_store,
761            num_epochs_to_retain,
762            &pruning_watermark,
763            objects_compactor_active,
764        )?;
765        if !watermark_updated {
766            info!("skip relocation. Watermark hasn't changed");
767            return Ok(());
768        }
769        perpetual_db.objects.db.start_relocation()?;
770        checkpoint_store.tables.watermarks.db.start_relocation()?;
771        Self::prune_executed_tx_digests_th(perpetual_db, checkpoint_store)?;
772        Ok(())
773    }
774
775    fn compact_next_sst_file(
776        perpetual_db: Arc<AuthorityPerpetualTables>,
777        delay_days: usize,
778        last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
779    ) -> anyhow::Result<Option<LiveFile>> {
780        let db_path = perpetual_db.objects.db.path_for_pruning();
781        let mut state = last_processed
782            .lock()
783            .expect("failed to obtain a lock for last processed SST files");
784        let mut sst_file_for_compaction: Option<LiveFile> = None;
785        let time_threshold =
786            SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
787        for sst_file in perpetual_db.objects.db.live_files()? {
788            let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
789            let last_modified = std::fs::metadata(file_path)?.modified()?;
790            if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
791                || sst_file.level < 1
792                || sst_file.start_key.is_none()
793                || sst_file.end_key.is_none()
794                || last_modified > time_threshold
795                || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
796            {
797                continue;
798            }
799            if let Some(candidate) = &sst_file_for_compaction
800                && candidate.size > sst_file.size
801            {
802                continue;
803            }
804            sst_file_for_compaction = Some(sst_file);
805        }
806        let Some(sst_file) = sst_file_for_compaction else {
807            return Ok(None);
808        };
809        info!(
810            "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
811            sst_file.name, sst_file.size, sst_file.level
812        );
813        perpetual_db.objects.compact_range_raw(
814            &sst_file.column_family_name,
815            sst_file.start_key.clone().unwrap(),
816            sst_file.end_key.clone().unwrap(),
817        )?;
818        state.insert(sst_file.name.clone(), SystemTime::now());
819        Ok(Some(sst_file))
820    }
821
822    fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
823        min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
824    }
825
826    fn smoothed_max_eligible_checkpoint_number(
827        checkpoint_store: &Arc<CheckpointStore>,
828        mut max_eligible_checkpoint: CheckpointSequenceNumber,
829        pruned_checkpoint: CheckpointSequenceNumber,
830        epoch_id: EpochId,
831        epoch_duration_ms: u64,
832        num_epochs_to_retain: u64,
833    ) -> anyhow::Result<CheckpointSequenceNumber> {
834        if epoch_id < num_epochs_to_retain {
835            return Ok(0);
836        }
837        let last_checkpoint_in_epoch = checkpoint_store
838            .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
839            .map(|checkpoint| checkpoint.sequence_number)
840            .unwrap_or_default();
841        max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
842        if max_eligible_checkpoint == 0 {
843            return Ok(max_eligible_checkpoint);
844        }
845        let num_intervals = epoch_duration_ms
846            .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
847            .unwrap_or(1);
848        let delta = max_eligible_checkpoint
849            .checked_sub(pruned_checkpoint)
850            .unwrap_or_default()
851            .checked_div(num_intervals)
852            .unwrap_or(1);
853        Ok(pruned_checkpoint + delta)
854    }
855
856    fn setup_pruning(
857        config: AuthorityStorePruningConfig,
858        epoch_duration_ms: u64,
859        perpetual_db: Arc<AuthorityPerpetualTables>,
860        checkpoint_store: Arc<CheckpointStore>,
861        rpc_index: Option<Arc<RpcIndexStore>>,
862        rpc_store: Option<RpcStore>,
863        jsonrpc_index: Option<Arc<IndexStore>>,
864        metrics: Arc<AuthorityStorePruningMetrics>,
865        pruner_watermarks: Arc<PrunerWatermarks>,
866    ) -> Sender<()> {
867        let (sender, mut recv) = tokio::sync::oneshot::channel();
868        debug!(
869            "Starting object pruning service with num_epochs_to_retain={}",
870            config.num_epochs_to_retain
871        );
872
873        let tick_duration =
874            Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
875        let pruning_initial_delay = if cfg!(msim) {
876            Duration::from_millis(1)
877        } else {
878            Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
879        };
880        let mut objects_prune_interval =
881            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
882        let mut checkpoints_prune_interval =
883            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
884
885        metrics
886            .num_epochs_to_retain_for_objects
887            .set(config.num_epochs_to_retain as i64);
888        metrics.num_epochs_to_retain_for_checkpoints.set(
889            config
890                .num_epochs_to_retain_for_checkpoints
891                .unwrap_or_default() as i64,
892        );
893
894        #[cfg(tidehunter)]
895        {
896            if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
897                let prune_objects = config.num_epochs_to_retain != u64::MAX;
898                tokio::task::spawn(async move {
899                    loop {
900                        tokio::select! {
901                            _ = objects_prune_interval.tick(), if prune_objects => {
902                                if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), rpc_store.as_ref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
903                                    error!("Failed to prune objects: {:?}", err);
904                                }
905                            },
906                            _ = checkpoints_prune_interval.tick() => {
907                                if let Err(err) = Self::prune_th(&perpetual_db, &checkpoint_store, num_epochs_to_retain, pruner_watermarks.clone(), !prune_objects) {
908                                    error!("Failed to prune checkpoints: {:?}", err);
909                                }
910                            },
911                            _ = &mut recv => break,
912                        }
913                    }
914                });
915            }
916        }
917        #[cfg(not(tidehunter))]
918        {
919            let mut indexes_prune_interval =
920                tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
921
922            let perpetual_db_for_compaction = perpetual_db.clone();
923            if let Some(delay_days) = config.periodic_compaction_threshold_days {
924                spawn_monitored_task!(async move {
925                    let last_processed = Arc::new(Mutex::new(HashMap::new()));
926                    loop {
927                        let db = perpetual_db_for_compaction.clone();
928                        let state = Arc::clone(&last_processed);
929                        let result = tokio::task::spawn_blocking(move || {
930                            Self::compact_next_sst_file(db, delay_days, state)
931                        })
932                        .await;
933                        let mut sleep_interval_secs = 1;
934                        match result {
935                            Err(err) => error!("Failed to compact sst file: {:?}", err),
936                            Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
937                            Ok(Ok(None)) => {
938                                sleep_interval_secs = 3600;
939                            }
940                            _ => {}
941                        }
942                        tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
943                    }
944                });
945            }
946            tokio::task::spawn(async move {
947                loop {
948                    tokio::select! {
949                        _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
950                            if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), rpc_store.as_ref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
951                                error!("Failed to prune objects: {:?}", err);
952                            }
953                            if let Err(err) = Self::prune_executed_tx_digests(&perpetual_db, &checkpoint_store).await {
954                                error!("Failed to prune executed_tx_digests: {:?}", err);
955                            }
956                        },
957                        _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
958                            if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), rpc_store.as_ref(), config.clone(), metrics.clone(), epoch_duration_ms, &pruner_watermarks).await {
959                                error!("Failed to prune checkpoints: {:?}", err);
960                            }
961                        },
962                        _ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
963                            if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
964                                error!("Failed to prune indexes: {:?}", err);
965                            }
966                        }
967                        _ = &mut recv => break,
968                    }
969                }
970            });
971        }
972        sender
973    }
974
975    pub fn new(
976        perpetual_db: Arc<AuthorityPerpetualTables>,
977        checkpoint_store: Arc<CheckpointStore>,
978        rpc_index: Option<Arc<RpcIndexStore>>,
979        rpc_store: Option<RpcStore>,
980        jsonrpc_index: Option<Arc<IndexStore>>,
981        mut pruning_config: AuthorityStorePruningConfig,
982        is_validator: bool,
983        epoch_duration_ms: u64,
984        registry: &Registry,
985        pruner_watermarks: Arc<PrunerWatermarks>, // used by tidehunter relocation filters
986    ) -> Self {
987        // On tidehunter, the per-keyspace `objects_compactor`
988        // (see `AuthorityPerpetualTables::open`) already retains only the latest
989        // version per ObjectID during compaction, so running the object pruner
990        // would duplicate that work. The compactor is enabled for validators
991        // and for any node configured with `num_epochs_to_retain = 0` — keep
992        // this predicate in sync with the one in `sui-node/src/lib.rs`.
993        // Force-disable the pruner whenever the compactor is active.
994        #[cfg(tidehunter)]
995        {
996            let objects_compactor_enabled =
997                is_validator || pruning_config.num_epochs_to_retain == 0;
998            if objects_compactor_enabled && pruning_config.num_epochs_to_retain != u64::MAX {
999                info!(
1000                    "Tidehunter: disabling object pruner (was num_epochs_to_retain={}). The objects compactor performs equivalent compaction.",
1001                    pruning_config.num_epochs_to_retain
1002                );
1003                pruning_config.num_epochs_to_retain = u64::MAX;
1004            }
1005        }
1006
1007        if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
1008        {
1009            warn!(
1010                "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
1011                pruning_config.num_epochs_to_retain
1012            );
1013            if is_validator {
1014                warn!("Resetting to aggressive pruner.");
1015                pruning_config.num_epochs_to_retain = 0;
1016            } else {
1017                warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
1018            }
1019        }
1020        AuthorityStorePruner {
1021            _objects_pruner_cancel_handle: Self::setup_pruning(
1022                pruning_config,
1023                epoch_duration_ms,
1024                perpetual_db,
1025                checkpoint_store,
1026                rpc_index,
1027                rpc_store,
1028                jsonrpc_index,
1029                AuthorityStorePruningMetrics::new(registry),
1030                pruner_watermarks,
1031            ),
1032        }
1033    }
1034
1035    pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
1036        perpetual_db.objects.compact_range(
1037            &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
1038            &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
1039        )
1040    }
1041}
1042
1043#[cfg(tidehunter)]
1044pub(crate) fn apply_relocation_filter<T: DeserializeOwned>(
1045    config: typed_store::tidehunter_util::KeySpaceConfig,
1046    pruner_watermark: Arc<AtomicU64>,
1047    extractor: impl Fn(T) -> u64 + Send + Sync + 'static,
1048    by_key: bool,
1049) -> typed_store::tidehunter_util::KeySpaceConfig {
1050    use bincode::Options;
1051    use std::sync::atomic::Ordering;
1052    use typed_store::tidehunter_util::Decision;
1053    config.with_relocation_filter(move |key, value| {
1054        let data = if by_key {
1055            bincode::DefaultOptions::new()
1056                .with_big_endian()
1057                .with_fixint_encoding()
1058                .deserialize(&key)
1059                .expect("relocation filter deserialization error")
1060        } else {
1061            bcs::from_bytes(&value).expect("relocation filter deserialization error")
1062        };
1063        if extractor(data) < pruner_watermark.load(Ordering::Relaxed) {
1064            Decision::Remove
1065        } else {
1066            Decision::StopRelocation
1067        }
1068    })
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use more_asserts as ma;
1074    use std::path::Path;
1075    use std::time::Duration;
1076    use std::{collections::HashSet, sync::Arc};
1077    use tracing::log::info;
1078
1079    use crate::authority::authority_store_pruner::AuthorityStorePruningMetrics;
1080    use crate::authority::authority_store_tables::AuthorityPerpetualTables;
1081    use crate::authority::authority_store_types::{
1082        StoreObject, StoreObjectWrapper, get_store_object,
1083    };
1084    use prometheus::Registry;
1085    use sui_types::base_types::ObjectDigest;
1086    use sui_types::effects::TransactionEffects;
1087    use sui_types::effects::TransactionEffectsAPI;
1088    use sui_types::{
1089        base_types::{ObjectID, SequenceNumber},
1090        object::Object,
1091        storage::ObjectKey,
1092    };
1093    use typed_store::Map;
1094    use typed_store::rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options};
1095
1096    use super::AuthorityStorePruner;
1097
1098    fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
1099        let perpetual_db_path = path.join(Path::new("perpetual"));
1100        let cf_names = AuthorityPerpetualTables::describe_tables();
1101        let cfs: Vec<_> = cf_names
1102            .keys()
1103            .map(|x| (x.as_str(), default_db_options().options))
1104            .collect();
1105        let perpetual_db = typed_store::rocks::open_cf_opts(
1106            perpetual_db_path,
1107            None,
1108            MetricConf::new("perpetual_pruning"),
1109            &cfs,
1110        );
1111
1112        let mut after_pruning = HashSet::new();
1113        let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
1114            &perpetual_db?,
1115            Some("objects"),
1116            // open the db to bypass default db options which ignores range tombstones
1117            // so we can read the accurate number of retained versions
1118            &ReadWriteOptions::default(),
1119            false,
1120        )?;
1121        let iter = objects.safe_iter();
1122        for item in iter {
1123            after_pruning.insert(item?.0);
1124        }
1125        Ok(after_pruning)
1126    }
1127
1128    type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
1129
1130    fn generate_test_data(
1131        db: Arc<AuthorityPerpetualTables>,
1132        num_versions_per_object: u64,
1133        num_object_versions_to_retain: u64,
1134        total_unique_object_ids: u32,
1135    ) -> Result<GenerateTestDataResult, anyhow::Error> {
1136        assert!(num_versions_per_object >= num_object_versions_to_retain);
1137
1138        let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
1139        let mut batch = db.objects.batch();
1140
1141        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
1142        for id in ids {
1143            for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
1144                let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
1145                if counter < num_object_versions_to_retain.try_into().unwrap() {
1146                    // latest `num_object_versions_to_retain` should not have been pruned
1147                    to_keep.push(object_key);
1148                } else {
1149                    to_delete.push(object_key);
1150                }
1151                let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1152                batch.insert_batch(
1153                    &db.objects,
1154                    [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
1155                )?;
1156            }
1157
1158            // Adding a tombstone for deleted object.
1159            if num_object_versions_to_retain == 0 {
1160                let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
1161                println!("Adding tombstone object {:?}", tombstone_key);
1162                batch.insert_batch(
1163                    &db.objects,
1164                    [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
1165                )?;
1166                tombstones.push(tombstone_key);
1167            }
1168        }
1169        batch.write().unwrap();
1170        assert_eq!(
1171            to_keep.len() as u64,
1172            std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
1173                * total_unique_object_ids as u64
1174        );
1175        assert_eq!(
1176            tombstones.len() as u64,
1177            if num_object_versions_to_retain == 0 {
1178                total_unique_object_ids as u64
1179            } else {
1180                0
1181            }
1182        );
1183        Ok((to_keep, to_delete, tombstones))
1184    }
1185
1186    async fn run_pruner(
1187        path: &Path,
1188        num_versions_per_object: u64,
1189        num_object_versions_to_retain: u64,
1190        total_unique_object_ids: u32,
1191    ) -> Vec<ObjectKey> {
1192        let registry = Registry::default();
1193        let metrics = AuthorityStorePruningMetrics::new(&registry);
1194        let to_keep = {
1195            let db = Arc::new(AuthorityPerpetualTables::open(path, None, None));
1196            let (to_keep, to_delete, tombstones) = generate_test_data(
1197                db.clone(),
1198                num_versions_per_object,
1199                num_object_versions_to_retain,
1200                total_unique_object_ids,
1201            )
1202            .unwrap();
1203            let mut effects = TransactionEffects::default();
1204            for object in to_delete {
1205                effects.unsafe_add_deleted_live_object_for_testing((
1206                    object.0,
1207                    object.1,
1208                    ObjectDigest::MIN,
1209                ));
1210            }
1211            for object in tombstones {
1212                effects.unsafe_add_object_tombstone_for_testing((
1213                    object.0,
1214                    object.1,
1215                    ObjectDigest::MIN,
1216                ));
1217            }
1218            AuthorityStorePruner::prune_objects_and_indexes(
1219                vec![effects],
1220                &db,
1221                0,
1222                metrics,
1223                0,
1224                None,
1225                None,
1226                true,
1227            )
1228            .await
1229            .unwrap();
1230            to_keep
1231        };
1232        tokio::time::sleep(Duration::from_secs(3)).await;
1233        to_keep
1234    }
1235
1236    // Tests pruning old version of live objects.
1237    #[cfg(not(tidehunter))]
1238    #[tokio::test]
1239    async fn test_pruning_objects() {
1240        let path = tempfile::tempdir().unwrap().keep();
1241        let to_keep = run_pruner(&path, 3, 2, 1000).await;
1242        assert_eq!(
1243            HashSet::from_iter(to_keep),
1244            get_keys_after_pruning(&path).unwrap()
1245        );
1246        run_pruner(&tempfile::tempdir().unwrap().keep(), 3, 2, 1000).await;
1247    }
1248
1249    // Tests pruning deleted objects (object tombstones).
1250    #[cfg(not(tidehunter))]
1251    #[tokio::test]
1252    async fn test_pruning_tombstones() {
1253        let path = tempfile::tempdir().unwrap().keep();
1254        let to_keep = run_pruner(&path, 0, 0, 1000).await;
1255        assert_eq!(to_keep.len(), 0);
1256        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1257
1258        let path = tempfile::tempdir().unwrap().keep();
1259        let to_keep = run_pruner(&path, 3, 0, 1000).await;
1260        assert_eq!(to_keep.len(), 0);
1261        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1262    }
1263
1264    #[cfg(not(target_env = "msvc"))]
1265    #[tokio::test]
1266    async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1267        let primary_path = tempfile::tempdir()?.keep();
1268        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None, None));
1269        let total_unique_object_ids = 10_000;
1270        let num_versions_per_object = 10;
1271        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1272        let mut to_delete = vec![];
1273        for id in ids {
1274            for i in (0..num_versions_per_object).rev() {
1275                if i < num_versions_per_object - 2 {
1276                    to_delete.push((id, SequenceNumber::from(i)));
1277                }
1278                let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1279                perpetual_db
1280                    .objects
1281                    .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1282            }
1283        }
1284
1285        fn get_sst_size(path: &Path) -> u64 {
1286            let mut size = 0;
1287            for entry in std::fs::read_dir(path).unwrap() {
1288                let entry = entry.unwrap();
1289                let path = entry.path();
1290                if let Some(ext) = path.extension() {
1291                    if ext != "sst" {
1292                        continue;
1293                    }
1294                    size += std::fs::metadata(path).unwrap().len();
1295                }
1296            }
1297            size
1298        }
1299
1300        let db_path = primary_path.clone().join("perpetual");
1301        let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1302        let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1303
1304        perpetual_db.objects.compact_range(&start, &end)?;
1305        let before_compaction_size = get_sst_size(&db_path);
1306
1307        let mut effects = TransactionEffects::default();
1308        for object in to_delete {
1309            effects.unsafe_add_deleted_live_object_for_testing((
1310                object.0,
1311                object.1,
1312                ObjectDigest::MIN,
1313            ));
1314        }
1315        let registry = Registry::default();
1316        let metrics = AuthorityStorePruningMetrics::new(&registry);
1317        let total_pruned = AuthorityStorePruner::prune_objects_and_indexes(
1318            vec![effects],
1319            &perpetual_db,
1320            0,
1321            metrics,
1322            0,
1323            None,
1324            None,
1325            true,
1326        )
1327        .await;
1328        info!("Total pruned keys = {:?}", total_pruned);
1329
1330        perpetual_db.objects.compact_range(&start, &end)?;
1331        let after_compaction_size = get_sst_size(&db_path);
1332
1333        info!(
1334            "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1335            before_compaction_size, after_compaction_size
1336        );
1337        ma::assert_le!(after_compaction_size, before_compaction_size);
1338        Ok(())
1339    }
1340}