sui_core/authority/
authority_store_pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::authority_store_tables::{AuthorityPerpetualTables, AuthorityPrunerTables};
5use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper};
6use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
7use crate::jsonrpc_index::IndexStore;
8use crate::rpc_index::RpcIndexStore;
9use anyhow::anyhow;
10use bincode::Options;
11use mysten_metrics::{monitored_scope, spawn_monitored_task};
12use once_cell::sync::Lazy;
13use prometheus::{
14    IntCounter, IntGauge, Registry, register_int_counter_with_registry,
15    register_int_gauge_with_registry,
16};
17#[cfg(tidehunter)]
18use serde::de::DeserializeOwned;
19use std::cmp::{max, min};
20use std::collections::{BTreeSet, HashMap};
21use std::sync::atomic::AtomicU64;
22use std::sync::{Mutex, Weak};
23use std::time::{SystemTime, UNIX_EPOCH};
24use std::{sync::Arc, time::Duration};
25use sui_config::node::AuthorityStorePruningConfig;
26use sui_types::committee::EpochId;
27use sui_types::effects::TransactionEffects;
28use sui_types::effects::TransactionEffectsAPI;
29use sui_types::message_envelope::Message;
30use sui_types::messages_checkpoint::{
31    CheckpointContents, CheckpointDigest, CheckpointSequenceNumber,
32};
33use sui_types::{
34    base_types::{ObjectID, SequenceNumber, TransactionDigest, VersionNumber},
35    storage::ObjectKey,
36};
37use tokio::sync::oneshot::{self, Sender};
38use tokio::time::Instant;
39use tracing::{debug, error, info, warn};
40use typed_store::rocksdb::LiveFile;
41use typed_store::rocksdb::compaction_filter::Decision;
42use typed_store::{Map, TypedStoreError};
43
44static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
45    [
46        "objects",
47        "effects",
48        "transactions",
49        "events",
50        "executed_effects",
51        "executed_transactions_to_checkpoint",
52    ]
53    .into_iter()
54    .map(|cf| cf.to_string())
55    .collect()
56});
57pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
58pub struct AuthorityStorePruner {
59    _objects_pruner_cancel_handle: oneshot::Sender<()>,
60}
61
62#[derive(Default)]
63pub struct PrunerWatermarks {
64    pub epoch_id: Arc<AtomicU64>,
65    pub checkpoint_id: Arc<AtomicU64>,
66}
67
68static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
69
70pub struct AuthorityStorePruningMetrics {
71    pub last_pruned_checkpoint: IntGauge,
72    pub num_pruned_objects: IntCounter,
73    pub num_pruned_tombstones: IntCounter,
74    pub last_pruned_effects_checkpoint: IntGauge,
75    pub last_pruned_indexes_transaction: IntGauge,
76    pub num_epochs_to_retain_for_objects: IntGauge,
77    pub num_epochs_to_retain_for_checkpoints: IntGauge,
78}
79
80impl AuthorityStorePruningMetrics {
81    pub fn new(registry: &Registry) -> Arc<Self> {
82        let this = Self {
83            last_pruned_checkpoint: register_int_gauge_with_registry!(
84                "last_pruned_checkpoint",
85                "Last pruned checkpoint",
86                registry
87            )
88            .unwrap(),
89            num_pruned_objects: register_int_counter_with_registry!(
90                "num_pruned_objects",
91                "Number of pruned objects",
92                registry
93            )
94            .unwrap(),
95            num_pruned_tombstones: register_int_counter_with_registry!(
96                "num_pruned_tombstones",
97                "Number of pruned tombstones",
98                registry
99            )
100            .unwrap(),
101            last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
102                "last_pruned_effects_checkpoint",
103                "Last pruned effects checkpoint",
104                registry
105            )
106            .unwrap(),
107            last_pruned_indexes_transaction: register_int_gauge_with_registry!(
108                "last_pruned_indexes_transaction",
109                "Last pruned indexes transaction",
110                registry
111            )
112            .unwrap(),
113            num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
114                "num_epochs_to_retain_for_objects",
115                "Number of epochs to retain for objects",
116                registry
117            )
118            .unwrap(),
119            num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
120                "num_epochs_to_retain_for_checkpoints",
121                "Number of epochs to retain for checkpoints",
122                registry
123            )
124            .unwrap(),
125        };
126        Arc::new(this)
127    }
128
129    pub fn new_for_test() -> Arc<Self> {
130        Self::new(&Registry::new())
131    }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq)]
135pub enum PruningMode {
136    Objects,
137    Checkpoints,
138}
139
140impl AuthorityStorePruner {
141    /// prunes old versions of objects based on transaction effects
142    async fn prune_objects(
143        transaction_effects: Vec<TransactionEffects>,
144        perpetual_db: &Arc<AuthorityPerpetualTables>,
145        pruner_db: Option<&Arc<AuthorityPrunerTables>>,
146        checkpoint_number: CheckpointSequenceNumber,
147        metrics: Arc<AuthorityStorePruningMetrics>,
148        enable_pruning_tombstones: bool,
149    ) -> anyhow::Result<()> {
150        let _scope = monitored_scope("ObjectsLivePruner");
151        let mut wb = perpetual_db.objects.batch();
152        let mut pruner_db_wb = pruner_db.map(|db| db.object_tombstones.batch());
153
154        // Collect objects keys that need to be deleted from `transaction_effects`.
155        let mut live_object_keys_to_prune = vec![];
156        let mut object_tombstones_to_prune = vec![];
157        for effects in &transaction_effects {
158            for (object_id, seq_number) in effects.modified_at_versions() {
159                live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
160            }
161
162            if enable_pruning_tombstones {
163                for deleted_object_key in effects.all_tombstones() {
164                    object_tombstones_to_prune
165                        .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
166                }
167            }
168        }
169
170        metrics
171            .num_pruned_objects
172            .inc_by(live_object_keys_to_prune.len() as u64);
173        metrics
174            .num_pruned_tombstones
175            .inc_by(object_tombstones_to_prune.len() as u64);
176
177        let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
178        for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
179            updates
180                .entry(object_id)
181                .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
182                .or_insert((seq_number, seq_number));
183        }
184
185        for (object_id, (min_version, max_version)) in updates {
186            debug!(
187                "Pruning object {:?} versions {:?} - {:?}",
188                object_id, min_version, max_version
189            );
190            match pruner_db_wb {
191                Some(ref mut batch) => {
192                    batch.insert_batch(
193                        &pruner_db.expect("invariant checked").object_tombstones,
194                        std::iter::once((object_id, max_version)),
195                    )?;
196                }
197                None => {
198                    let start_range = ObjectKey(object_id, min_version);
199                    let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
200                    wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
201                }
202            }
203        }
204
205        // When enable_pruning_tombstones is enabled, instead of using range deletes, we need to do a scan of all the keys
206        // for the deleted objects and then do point deletes to delete all the existing keys. This is because to improve read
207        // performance, we set `ignore_range_deletions` on all read options, and using range delete to delete tombstones
208        // may leak object (imagine a tombstone is compacted away, but earlier version is still not). Using point deletes
209        // guarantees that all earlier versions are deleted in the database.
210        if !object_tombstones_to_prune.is_empty() {
211            let mut object_keys_to_delete = vec![];
212            for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
213                for result in perpetual_db.objects.safe_iter_with_bounds(
214                    Some(ObjectKey(object_id, VersionNumber::MIN)),
215                    Some(ObjectKey(object_id, seq_number.next())),
216                ) {
217                    let (object_key, _) = result?;
218                    assert_eq!(object_key.0, object_id);
219                    object_keys_to_delete.push(object_key);
220                }
221            }
222
223            wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
224        }
225
226        perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
227        metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
228
229        if let Some(batch) = pruner_db_wb {
230            batch.write()?;
231        }
232        wb.write()?;
233        Ok(())
234    }
235
236    fn prune_checkpoints(
237        perpetual_db: &Arc<AuthorityPerpetualTables>,
238        checkpoint_db: &Arc<CheckpointStore>,
239        rpc_index: Option<&RpcIndexStore>,
240        checkpoint_number: CheckpointSequenceNumber,
241        checkpoints_to_prune: Vec<CheckpointDigest>,
242        checkpoint_content_to_prune: Vec<CheckpointContents>,
243        effects_to_prune: &Vec<TransactionEffects>,
244        metrics: Arc<AuthorityStorePruningMetrics>,
245    ) -> anyhow::Result<()> {
246        let _scope = monitored_scope("EffectsLivePruner");
247
248        let mut perpetual_batch = perpetual_db.objects.batch();
249        let transactions: Vec<_> = checkpoint_content_to_prune
250            .iter()
251            .flat_map(|content| content.iter().map(|tx| tx.transaction))
252            .collect();
253
254        perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
255        perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
256        perpetual_batch.delete_batch(
257            &perpetual_db.executed_transactions_to_checkpoint,
258            transactions.iter(),
259        )?;
260
261        let mut effect_digests = vec![];
262        for effects in effects_to_prune {
263            let effects_digest = effects.digest();
264            debug!("Pruning effects {:?}", effects_digest);
265            effect_digests.push(effects_digest);
266
267            if effects.events_digest().is_some() {
268                perpetual_batch
269                    .delete_batch(&perpetual_db.events_2, [effects.transaction_digest()])?;
270            }
271        }
272        perpetual_batch.delete_batch(
273            &perpetual_db.unchanged_loaded_runtime_objects,
274            transactions.iter(),
275        )?;
276        perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
277
278        let mut checkpoints_batch = checkpoint_db.tables.certified_checkpoints.batch();
279
280        let checkpoint_content_digests =
281            checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
282        checkpoints_batch.delete_batch(
283            &checkpoint_db.tables.checkpoint_content,
284            checkpoint_content_digests.clone(),
285        )?;
286        checkpoints_batch.delete_batch(
287            &checkpoint_db.tables.checkpoint_sequence_by_contents_digest,
288            checkpoint_content_digests,
289        )?;
290
291        checkpoints_batch.delete_batch(
292            &checkpoint_db.tables.checkpoint_by_digest,
293            checkpoints_to_prune,
294        )?;
295
296        checkpoints_batch.insert_batch(
297            &checkpoint_db.tables.watermarks,
298            [(
299                &CheckpointWatermark::HighestPruned,
300                &(checkpoint_number, CheckpointDigest::random()),
301            )],
302        )?;
303
304        if let Some(rpc_index) = rpc_index {
305            rpc_index.prune(checkpoint_number, &checkpoint_content_to_prune)?;
306        }
307        perpetual_batch.write()?;
308        checkpoints_batch.write()?;
309        metrics
310            .last_pruned_effects_checkpoint
311            .set(checkpoint_number as i64);
312
313        Ok(())
314    }
315
316    /// Prunes old data based on effects from all checkpoints from epochs eligible for pruning
317    pub async fn prune_objects_for_eligible_epochs(
318        perpetual_db: &Arc<AuthorityPerpetualTables>,
319        checkpoint_store: &Arc<CheckpointStore>,
320        rpc_index: Option<&RpcIndexStore>,
321        pruner_db: Option<&Arc<AuthorityPrunerTables>>,
322        config: AuthorityStorePruningConfig,
323        metrics: Arc<AuthorityStorePruningMetrics>,
324        epoch_duration_ms: u64,
325    ) -> anyhow::Result<()> {
326        let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
327        let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
328            .get_highest_executed_checkpoint()?
329            .map(|c| (*c.sequence_number(), c.epoch))
330            .unwrap_or_default();
331        let pruned_checkpoint_number = perpetual_db
332            .get_highest_pruned_checkpoint()?
333            .unwrap_or_default();
334        if config.smooth && config.num_epochs_to_retain > 0 {
335            max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
336                checkpoint_store,
337                max_eligible_checkpoint_number,
338                pruned_checkpoint_number,
339                epoch_id,
340                epoch_duration_ms,
341                config.num_epochs_to_retain,
342            )?;
343        }
344        Self::prune_for_eligible_epochs(
345            perpetual_db,
346            checkpoint_store,
347            rpc_index,
348            pruner_db,
349            PruningMode::Objects,
350            config.num_epochs_to_retain,
351            pruned_checkpoint_number,
352            max_eligible_checkpoint_number,
353            config,
354            metrics.clone(),
355        )
356        .await
357    }
358
359    pub async fn prune_checkpoints_for_eligible_epochs(
360        perpetual_db: &Arc<AuthorityPerpetualTables>,
361        checkpoint_store: &Arc<CheckpointStore>,
362        rpc_index: Option<&RpcIndexStore>,
363        pruner_db: Option<&Arc<AuthorityPrunerTables>>,
364        config: AuthorityStorePruningConfig,
365        metrics: Arc<AuthorityStorePruningMetrics>,
366        epoch_duration_ms: u64,
367        pruner_watermarks: &Arc<PrunerWatermarks>,
368    ) -> anyhow::Result<()> {
369        let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
370        let pruned_checkpoint_number = checkpoint_store
371            .get_highest_pruned_checkpoint_seq_number()?
372            .unwrap_or(0);
373        let (mut max_eligible_checkpoint, epoch_id) = checkpoint_store
374            .get_highest_executed_checkpoint()?
375            .map(|c| (*c.sequence_number(), c.epoch))
376            .unwrap_or_default();
377        if config.num_epochs_to_retain != u64::MAX {
378            max_eligible_checkpoint = min(
379                max_eligible_checkpoint,
380                perpetual_db
381                    .get_highest_pruned_checkpoint()?
382                    .unwrap_or_default(),
383            );
384        }
385        if config.smooth
386            && let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints
387        {
388            max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
389                checkpoint_store,
390                max_eligible_checkpoint,
391                pruned_checkpoint_number,
392                epoch_id,
393                epoch_duration_ms,
394                num_epochs_to_retain,
395            )?;
396        }
397        debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
398        Self::prune_for_eligible_epochs(
399            perpetual_db,
400            checkpoint_store,
401            rpc_index,
402            pruner_db,
403            PruningMode::Checkpoints,
404            config
405                .num_epochs_to_retain_for_checkpoints()
406                .ok_or_else(|| anyhow!("config value not set"))?,
407            pruned_checkpoint_number,
408            max_eligible_checkpoint,
409            config.clone(),
410            metrics.clone(),
411        )
412        .await?;
413
414        if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
415            Self::update_pruning_watermarks(
416                checkpoint_store,
417                num_epochs_to_retain,
418                pruner_watermarks,
419            )?;
420        }
421        Ok(())
422    }
423
424    /// Prunes old object versions based on effects from all checkpoints from epochs eligible for pruning
425    pub async fn prune_for_eligible_epochs(
426        perpetual_db: &Arc<AuthorityPerpetualTables>,
427        checkpoint_store: &Arc<CheckpointStore>,
428        rpc_index: Option<&RpcIndexStore>,
429        pruner_db: Option<&Arc<AuthorityPrunerTables>>,
430        mode: PruningMode,
431        num_epochs_to_retain: u64,
432        starting_checkpoint_number: CheckpointSequenceNumber,
433        max_eligible_checkpoint: CheckpointSequenceNumber,
434        config: AuthorityStorePruningConfig,
435        metrics: Arc<AuthorityStorePruningMetrics>,
436    ) -> anyhow::Result<()> {
437        let _scope = monitored_scope("PruneForEligibleEpochs");
438
439        let mut checkpoint_number = starting_checkpoint_number;
440        let current_epoch = checkpoint_store
441            .get_highest_executed_checkpoint()?
442            .map(|c| c.epoch())
443            .unwrap_or_default();
444
445        let mut checkpoints_to_prune = vec![];
446        let mut checkpoint_content_to_prune = vec![];
447        let mut effects_to_prune = vec![];
448
449        loop {
450            let Some(ckpt) = checkpoint_store
451                .tables
452                .certified_checkpoints
453                .get(&(checkpoint_number + 1))?
454            else {
455                break;
456            };
457            let checkpoint = ckpt.into_inner();
458            // Skipping because  checkpoint's epoch or checkpoint number is too new.
459            // We have to respect the highest executed checkpoint watermark (including the watermark itself)
460            // because there might be parts of the system that still require access to old object versions
461            // (i.e. state accumulator).
462            if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
463                || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
464            {
465                break;
466            }
467            checkpoint_number = *checkpoint.sequence_number();
468
469            let content = checkpoint_store
470                .get_checkpoint_contents(&checkpoint.content_digest)?
471                .ok_or_else(|| {
472                    anyhow::anyhow!(
473                        "checkpoint content data is missing: {}",
474                        checkpoint.sequence_number
475                    )
476                })?;
477            let effects = perpetual_db
478                .effects
479                .multi_get(content.iter().map(|tx| tx.effects))?;
480
481            info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
482            checkpoints_to_prune.push(*checkpoint.digest());
483            checkpoint_content_to_prune.push(content);
484            effects_to_prune.extend(effects.into_iter().flatten());
485
486            if effects_to_prune.len() >= config.max_transactions_in_batch
487                || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
488            {
489                match mode {
490                    PruningMode::Objects => {
491                        Self::prune_objects(
492                            effects_to_prune,
493                            perpetual_db,
494                            pruner_db,
495                            checkpoint_number,
496                            metrics.clone(),
497                            !config.killswitch_tombstone_pruning,
498                        )
499                        .await?
500                    }
501                    PruningMode::Checkpoints => Self::prune_checkpoints(
502                        perpetual_db,
503                        checkpoint_store,
504                        rpc_index,
505                        checkpoint_number,
506                        checkpoints_to_prune,
507                        checkpoint_content_to_prune,
508                        &effects_to_prune,
509                        metrics.clone(),
510                    )?,
511                };
512                checkpoints_to_prune = vec![];
513                checkpoint_content_to_prune = vec![];
514                effects_to_prune = vec![];
515                // yield back to the tokio runtime. Prevent potential halt of other tasks
516                tokio::task::yield_now().await;
517            }
518        }
519
520        if !checkpoints_to_prune.is_empty() {
521            match mode {
522                PruningMode::Objects => {
523                    Self::prune_objects(
524                        effects_to_prune,
525                        perpetual_db,
526                        pruner_db,
527                        checkpoint_number,
528                        metrics.clone(),
529                        !config.killswitch_tombstone_pruning,
530                    )
531                    .await?
532                }
533                PruningMode::Checkpoints => Self::prune_checkpoints(
534                    perpetual_db,
535                    checkpoint_store,
536                    rpc_index,
537                    checkpoint_number,
538                    checkpoints_to_prune,
539                    checkpoint_content_to_prune,
540                    &effects_to_prune,
541                    metrics.clone(),
542                )?,
543            };
544        }
545        Ok(())
546    }
547
548    fn prune_indexes(
549        indexes: Option<&IndexStore>,
550        config: &AuthorityStorePruningConfig,
551        epoch_duration_ms: u64,
552        metrics: &AuthorityStorePruningMetrics,
553    ) -> anyhow::Result<()> {
554        if let (Some(mut epochs_to_retain), Some(indexes)) =
555            (config.num_epochs_to_retain_for_indexes, indexes)
556        {
557            if epochs_to_retain < 7 {
558                warn!("num_epochs_to_retain_for_indexes is too low. Reseting it to 7");
559                epochs_to_retain = 7;
560            }
561            let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
562            if let Some(cut_time_ms) =
563                u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
564            {
565                let transaction_id = indexes.prune(cut_time_ms)?;
566                metrics
567                    .last_pruned_indexes_transaction
568                    .set(transaction_id as i64);
569            }
570        }
571        Ok(())
572    }
573
574    async fn prune_executed_tx_digests(
575        perpetual_db: &Arc<AuthorityPerpetualTables>,
576        checkpoint_store: &Arc<CheckpointStore>,
577    ) -> anyhow::Result<()> {
578        let current_epoch = checkpoint_store
579            .get_highest_executed_checkpoint()?
580            .map(|c| c.epoch)
581            .unwrap_or_default();
582
583        if current_epoch < 2 {
584            return Ok(());
585        }
586
587        let target_epoch = current_epoch - 1;
588
589        let start_key = (0u64, TransactionDigest::ZERO);
590        let end_key = (target_epoch, TransactionDigest::ZERO);
591
592        info!(
593            "Pruning executed_transaction_digests for epochs < {} (current epoch: {})",
594            target_epoch, current_epoch
595        );
596
597        let mut batch = perpetual_db.executed_transaction_digests.batch();
598        batch.schedule_delete_range(
599            &perpetual_db.executed_transaction_digests,
600            &start_key,
601            // `to` is non-inclusive so target_epoch and all later epochs are preserved
602            &end_key,
603        )?;
604        batch.write()?;
605        Ok(())
606    }
607
608    #[cfg(tidehunter)]
609    fn prune_executed_tx_digests_th(
610        perpetual_db: &Arc<AuthorityPerpetualTables>,
611        checkpoint_store: &Arc<CheckpointStore>,
612    ) -> anyhow::Result<()> {
613        let current_epoch = checkpoint_store
614            .get_highest_executed_checkpoint()?
615            .map(|c| c.epoch)
616            .unwrap_or_default();
617
618        if current_epoch < 2 {
619            return Ok(());
620        }
621
622        let last_epoch_to_delete = current_epoch - 2;
623        let from_key = (0u64, TransactionDigest::ZERO);
624        let to_key = (last_epoch_to_delete, TransactionDigest::new([0xff; 32]));
625        info!(
626            "Pruning executed_transaction_digests for epochs 0 to {} (current epoch: {})",
627            last_epoch_to_delete, current_epoch
628        );
629        perpetual_db
630            .executed_transaction_digests
631            .drop_cells_in_range(&from_key, &to_key)?;
632        Ok(())
633    }
634
635    fn update_pruning_watermarks(
636        checkpoint_store: &Arc<CheckpointStore>,
637        num_epochs_to_retain: u64,
638        pruning_watermark: &Arc<PrunerWatermarks>,
639    ) -> anyhow::Result<bool> {
640        use std::sync::atomic::Ordering;
641        let current_watermark = pruning_watermark.epoch_id.load(Ordering::Relaxed);
642        let current_epoch_id = checkpoint_store
643            .get_highest_executed_checkpoint()?
644            .map(|c| c.epoch)
645            .unwrap_or_default();
646        if current_epoch_id < num_epochs_to_retain {
647            return Ok(false);
648        }
649        let target_epoch_id = current_epoch_id - num_epochs_to_retain;
650        let checkpoint_id =
651            checkpoint_store.get_epoch_last_checkpoint_seq_number(target_epoch_id)?;
652
653        let new_watermark = target_epoch_id + 1;
654        if current_watermark == new_watermark {
655            return Ok(false);
656        }
657        info!("relocation: setting epoch watermark to {}", new_watermark);
658        pruning_watermark
659            .epoch_id
660            .store(new_watermark, Ordering::Relaxed);
661        if let Some(checkpoint_id) = checkpoint_id {
662            info!(
663                "relocation: setting checkpoint watermark to {}",
664                checkpoint_id
665            );
666            pruning_watermark
667                .checkpoint_id
668                .store(checkpoint_id, Ordering::Relaxed);
669        }
670        Ok(true)
671    }
672
673    #[cfg(tidehunter)]
674    fn prune_th(
675        perpetual_db: &Arc<AuthorityPerpetualTables>,
676        checkpoint_store: &Arc<CheckpointStore>,
677        num_epochs_to_retain: u64,
678        pruning_watermark: Arc<PrunerWatermarks>,
679    ) -> anyhow::Result<()> {
680        let watermark_updated = Self::update_pruning_watermarks(
681            checkpoint_store,
682            num_epochs_to_retain,
683            &pruning_watermark,
684        )?;
685        if !watermark_updated {
686            info!("skip relocation. Watermark hasn't changed");
687            return Ok(());
688        }
689        perpetual_db.objects.db.start_relocation()?;
690        checkpoint_store.tables.watermarks.db.start_relocation()?;
691        Self::prune_executed_tx_digests_th(perpetual_db, checkpoint_store)?;
692        Ok(())
693    }
694
695    fn compact_next_sst_file(
696        perpetual_db: Arc<AuthorityPerpetualTables>,
697        delay_days: usize,
698        last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
699    ) -> anyhow::Result<Option<LiveFile>> {
700        let db_path = perpetual_db.objects.db.path_for_pruning();
701        let mut state = last_processed
702            .lock()
703            .expect("failed to obtain a lock for last processed SST files");
704        let mut sst_file_for_compaction: Option<LiveFile> = None;
705        let time_threshold =
706            SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
707        for sst_file in perpetual_db.objects.db.live_files()? {
708            let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
709            let last_modified = std::fs::metadata(file_path)?.modified()?;
710            if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
711                || sst_file.level < 1
712                || sst_file.start_key.is_none()
713                || sst_file.end_key.is_none()
714                || last_modified > time_threshold
715                || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
716            {
717                continue;
718            }
719            if let Some(candidate) = &sst_file_for_compaction
720                && candidate.size > sst_file.size
721            {
722                continue;
723            }
724            sst_file_for_compaction = Some(sst_file);
725        }
726        let Some(sst_file) = sst_file_for_compaction else {
727            return Ok(None);
728        };
729        info!(
730            "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
731            sst_file.name, sst_file.size, sst_file.level
732        );
733        perpetual_db.objects.compact_range_raw(
734            &sst_file.column_family_name,
735            sst_file.start_key.clone().unwrap(),
736            sst_file.end_key.clone().unwrap(),
737        )?;
738        state.insert(sst_file.name.clone(), SystemTime::now());
739        Ok(Some(sst_file))
740    }
741
742    fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
743        min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
744    }
745
746    fn smoothed_max_eligible_checkpoint_number(
747        checkpoint_store: &Arc<CheckpointStore>,
748        mut max_eligible_checkpoint: CheckpointSequenceNumber,
749        pruned_checkpoint: CheckpointSequenceNumber,
750        epoch_id: EpochId,
751        epoch_duration_ms: u64,
752        num_epochs_to_retain: u64,
753    ) -> anyhow::Result<CheckpointSequenceNumber> {
754        if epoch_id < num_epochs_to_retain {
755            return Ok(0);
756        }
757        let last_checkpoint_in_epoch = checkpoint_store
758            .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
759            .map(|checkpoint| checkpoint.sequence_number)
760            .unwrap_or_default();
761        max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
762        if max_eligible_checkpoint == 0 {
763            return Ok(max_eligible_checkpoint);
764        }
765        let num_intervals = epoch_duration_ms
766            .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
767            .unwrap_or(1);
768        let delta = max_eligible_checkpoint
769            .checked_sub(pruned_checkpoint)
770            .unwrap_or_default()
771            .checked_div(num_intervals)
772            .unwrap_or(1);
773        Ok(pruned_checkpoint + delta)
774    }
775
776    fn setup_pruning(
777        config: AuthorityStorePruningConfig,
778        epoch_duration_ms: u64,
779        perpetual_db: Arc<AuthorityPerpetualTables>,
780        checkpoint_store: Arc<CheckpointStore>,
781        rpc_index: Option<Arc<RpcIndexStore>>,
782        jsonrpc_index: Option<Arc<IndexStore>>,
783        pruner_db: Option<Arc<AuthorityPrunerTables>>,
784        metrics: Arc<AuthorityStorePruningMetrics>,
785        pruner_watermarks: Arc<PrunerWatermarks>,
786    ) -> Sender<()> {
787        let (sender, mut recv) = tokio::sync::oneshot::channel();
788        debug!(
789            "Starting object pruning service with num_epochs_to_retain={}",
790            config.num_epochs_to_retain
791        );
792
793        let tick_duration =
794            Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
795        let pruning_initial_delay = if cfg!(msim) {
796            Duration::from_millis(1)
797        } else {
798            Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
799        };
800        let mut objects_prune_interval =
801            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
802
803        metrics
804            .num_epochs_to_retain_for_objects
805            .set(config.num_epochs_to_retain as i64);
806        metrics.num_epochs_to_retain_for_checkpoints.set(
807            config
808                .num_epochs_to_retain_for_checkpoints
809                .unwrap_or_default() as i64,
810        );
811
812        #[cfg(tidehunter)]
813        {
814            if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
815                tokio::task::spawn(async move {
816                    loop {
817                        objects_prune_interval.tick().await;
818                        if let Err(err) = Self::prune_th(
819                            &perpetual_db,
820                            &checkpoint_store,
821                            num_epochs_to_retain,
822                            pruner_watermarks.clone(),
823                        ) {
824                            error!("Failed to prune tidehunter: {:?}", err);
825                        }
826                    }
827                });
828            }
829        }
830        #[cfg(not(tidehunter))]
831        {
832            let mut checkpoints_prune_interval =
833                tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
834            let mut indexes_prune_interval =
835                tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
836
837            let perpetual_db_for_compaction = perpetual_db.clone();
838            if let Some(delay_days) = config.periodic_compaction_threshold_days {
839                spawn_monitored_task!(async move {
840                    let last_processed = Arc::new(Mutex::new(HashMap::new()));
841                    loop {
842                        let db = perpetual_db_for_compaction.clone();
843                        let state = Arc::clone(&last_processed);
844                        let result = tokio::task::spawn_blocking(move || {
845                            Self::compact_next_sst_file(db, delay_days, state)
846                        })
847                        .await;
848                        let mut sleep_interval_secs = 1;
849                        match result {
850                            Err(err) => error!("Failed to compact sst file: {:?}", err),
851                            Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
852                            Ok(Ok(None)) => {
853                                sleep_interval_secs = 3600;
854                            }
855                            _ => {}
856                        }
857                        tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
858                    }
859                });
860            }
861            tokio::task::spawn(async move {
862                loop {
863                    tokio::select! {
864                        _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
865                            if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), pruner_db.as_ref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
866                                error!("Failed to prune objects: {:?}", err);
867                            }
868                            if let Err(err) = Self::prune_executed_tx_digests(&perpetual_db, &checkpoint_store).await {
869                                error!("Failed to prune executed_tx_digests: {:?}", err);
870                            }
871                        },
872                        _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
873                            if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), pruner_db.as_ref(), config.clone(), metrics.clone(), epoch_duration_ms, &pruner_watermarks).await {
874                                error!("Failed to prune checkpoints: {:?}", err);
875                            }
876                        },
877                        _ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
878                            if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
879                                error!("Failed to prune indexes: {:?}", err);
880                            }
881                        }
882                        _ = &mut recv => break,
883                    }
884                }
885            });
886        }
887        sender
888    }
889
890    pub fn new(
891        perpetual_db: Arc<AuthorityPerpetualTables>,
892        checkpoint_store: Arc<CheckpointStore>,
893        rpc_index: Option<Arc<RpcIndexStore>>,
894        jsonrpc_index: Option<Arc<IndexStore>>,
895        mut pruning_config: AuthorityStorePruningConfig,
896        is_validator: bool,
897        epoch_duration_ms: u64,
898        registry: &Registry,
899        pruner_db: Option<Arc<AuthorityPrunerTables>>,
900        pruner_watermarks: Arc<PrunerWatermarks>, // used by tidehunter relocation filters
901    ) -> Self {
902        if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
903        {
904            warn!(
905                "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
906                pruning_config.num_epochs_to_retain
907            );
908            if is_validator {
909                warn!("Resetting to aggressive pruner.");
910                pruning_config.num_epochs_to_retain = 0;
911            } else {
912                warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
913            }
914        }
915        AuthorityStorePruner {
916            _objects_pruner_cancel_handle: Self::setup_pruning(
917                pruning_config,
918                epoch_duration_ms,
919                perpetual_db,
920                checkpoint_store,
921                rpc_index,
922                jsonrpc_index,
923                pruner_db,
924                AuthorityStorePruningMetrics::new(registry),
925                pruner_watermarks,
926            ),
927        }
928    }
929
930    pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
931        perpetual_db.objects.compact_range(
932            &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
933            &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
934        )
935    }
936}
937
938#[cfg(tidehunter)]
939pub(crate) fn apply_relocation_filter<T: DeserializeOwned>(
940    config: typed_store::tidehunter_util::KeySpaceConfig,
941    pruner_watermark: Arc<AtomicU64>,
942    extractor: impl Fn(T) -> u64 + Send + Sync + 'static,
943    by_key: bool,
944) -> typed_store::tidehunter_util::KeySpaceConfig {
945    use bincode::Options;
946    use std::sync::atomic::Ordering;
947    use typed_store::tidehunter_util::Decision;
948    config.with_relocation_filter(move |key, value| {
949        let data = if by_key {
950            bincode::DefaultOptions::new()
951                .with_big_endian()
952                .with_fixint_encoding()
953                .deserialize(&key)
954                .expect("relocation filter deserialization error")
955        } else {
956            bcs::from_bytes(&value).expect("relocation filter deserialization error")
957        };
958        if extractor(data) < pruner_watermark.load(Ordering::Relaxed) {
959            Decision::Remove
960        } else {
961            Decision::StopRelocation
962        }
963    })
964}
965
966#[derive(Clone)]
967pub struct ObjectsCompactionFilter {
968    db: Weak<AuthorityPrunerTables>,
969    metrics: Arc<ObjectCompactionMetrics>,
970}
971
972impl ObjectsCompactionFilter {
973    pub fn new(db: Arc<AuthorityPrunerTables>, registry: &Registry) -> Self {
974        Self {
975            db: Arc::downgrade(&db),
976            metrics: ObjectCompactionMetrics::new(registry),
977        }
978    }
979    pub fn filter(&mut self, key: &[u8], value: &[u8]) -> anyhow::Result<Decision> {
980        let ObjectKey(object_id, version) = bincode::DefaultOptions::new()
981            .with_big_endian()
982            .with_fixint_encoding()
983            .deserialize(key)?;
984        let object: StoreObjectWrapper = bcs::from_bytes(value)?;
985        if matches!(object.into_inner(), StoreObject::Value(_))
986            && let Some(db) = self.db.upgrade()
987        {
988            match db.object_tombstones.get(&object_id)? {
989                Some(gc_version) => {
990                    if version <= gc_version {
991                        self.metrics.key_removed.inc();
992                        return Ok(Decision::Remove);
993                    }
994                    self.metrics.key_kept.inc();
995                }
996                None => self.metrics.key_not_found.inc(),
997            }
998        }
999        Ok(Decision::Keep)
1000    }
1001}
1002
1003struct ObjectCompactionMetrics {
1004    key_removed: IntCounter,
1005    key_kept: IntCounter,
1006    key_not_found: IntCounter,
1007}
1008
1009impl ObjectCompactionMetrics {
1010    pub fn new(registry: &Registry) -> Arc<Self> {
1011        Arc::new(Self {
1012            key_removed: register_int_counter_with_registry!(
1013                "objects_compaction_filter_key_removed",
1014                "Compaction key removed",
1015                registry
1016            )
1017            .unwrap(),
1018            key_kept: register_int_counter_with_registry!(
1019                "objects_compaction_filter_key_kept",
1020                "Compaction key kept",
1021                registry
1022            )
1023            .unwrap(),
1024            key_not_found: register_int_counter_with_registry!(
1025                "objects_compaction_filter_key_not_found",
1026                "Compaction key not found",
1027                registry
1028            )
1029            .unwrap(),
1030        })
1031    }
1032}
1033
1034#[cfg(test)]
1035mod tests {
1036    use more_asserts as ma;
1037    use std::path::Path;
1038    use std::time::Duration;
1039    use std::{collections::HashSet, sync::Arc};
1040    use tracing::log::info;
1041
1042    use crate::authority::authority_store_pruner::AuthorityStorePruningMetrics;
1043    use crate::authority::authority_store_tables::AuthorityPerpetualTables;
1044    use crate::authority::authority_store_types::{
1045        StoreObject, StoreObjectWrapper, get_store_object,
1046    };
1047    use prometheus::Registry;
1048    use sui_types::base_types::ObjectDigest;
1049    use sui_types::effects::TransactionEffects;
1050    use sui_types::effects::TransactionEffectsAPI;
1051    use sui_types::{
1052        base_types::{ObjectID, SequenceNumber},
1053        object::Object,
1054        storage::ObjectKey,
1055    };
1056    use typed_store::Map;
1057    use typed_store::rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options};
1058
1059    use super::AuthorityStorePruner;
1060
1061    fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
1062        let perpetual_db_path = path.join(Path::new("perpetual"));
1063        let cf_names = AuthorityPerpetualTables::describe_tables();
1064        let cfs: Vec<_> = cf_names
1065            .keys()
1066            .map(|x| (x.as_str(), default_db_options().options))
1067            .collect();
1068        let perpetual_db = typed_store::rocks::open_cf_opts(
1069            perpetual_db_path,
1070            None,
1071            MetricConf::new("perpetual_pruning"),
1072            &cfs,
1073        );
1074
1075        let mut after_pruning = HashSet::new();
1076        let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
1077            &perpetual_db?,
1078            Some("objects"),
1079            // open the db to bypass default db options which ignores range tombstones
1080            // so we can read the accurate number of retained versions
1081            &ReadWriteOptions::default(),
1082            false,
1083        )?;
1084        let iter = objects.safe_iter();
1085        for item in iter {
1086            after_pruning.insert(item?.0);
1087        }
1088        Ok(after_pruning)
1089    }
1090
1091    type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
1092
1093    fn generate_test_data(
1094        db: Arc<AuthorityPerpetualTables>,
1095        num_versions_per_object: u64,
1096        num_object_versions_to_retain: u64,
1097        total_unique_object_ids: u32,
1098    ) -> Result<GenerateTestDataResult, anyhow::Error> {
1099        assert!(num_versions_per_object >= num_object_versions_to_retain);
1100
1101        let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
1102        let mut batch = db.objects.batch();
1103
1104        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
1105        for id in ids {
1106            for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
1107                let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
1108                if counter < num_object_versions_to_retain.try_into().unwrap() {
1109                    // latest `num_object_versions_to_retain` should not have been pruned
1110                    to_keep.push(object_key);
1111                } else {
1112                    to_delete.push(object_key);
1113                }
1114                let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1115                batch.insert_batch(
1116                    &db.objects,
1117                    [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
1118                )?;
1119            }
1120
1121            // Adding a tombstone for deleted object.
1122            if num_object_versions_to_retain == 0 {
1123                let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
1124                println!("Adding tombstone object {:?}", tombstone_key);
1125                batch.insert_batch(
1126                    &db.objects,
1127                    [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
1128                )?;
1129                tombstones.push(tombstone_key);
1130            }
1131        }
1132        batch.write().unwrap();
1133        assert_eq!(
1134            to_keep.len() as u64,
1135            std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
1136                * total_unique_object_ids as u64
1137        );
1138        assert_eq!(
1139            tombstones.len() as u64,
1140            if num_object_versions_to_retain == 0 {
1141                total_unique_object_ids as u64
1142            } else {
1143                0
1144            }
1145        );
1146        Ok((to_keep, to_delete, tombstones))
1147    }
1148
1149    async fn run_pruner(
1150        path: &Path,
1151        num_versions_per_object: u64,
1152        num_object_versions_to_retain: u64,
1153        total_unique_object_ids: u32,
1154    ) -> Vec<ObjectKey> {
1155        let registry = Registry::default();
1156        let metrics = AuthorityStorePruningMetrics::new(&registry);
1157        let to_keep = {
1158            let db = Arc::new(AuthorityPerpetualTables::open(path, None, None));
1159            let (to_keep, to_delete, tombstones) = generate_test_data(
1160                db.clone(),
1161                num_versions_per_object,
1162                num_object_versions_to_retain,
1163                total_unique_object_ids,
1164            )
1165            .unwrap();
1166            let mut effects = TransactionEffects::default();
1167            for object in to_delete {
1168                effects.unsafe_add_deleted_live_object_for_testing((
1169                    object.0,
1170                    object.1,
1171                    ObjectDigest::MIN,
1172                ));
1173            }
1174            for object in tombstones {
1175                effects.unsafe_add_object_tombstone_for_testing((
1176                    object.0,
1177                    object.1,
1178                    ObjectDigest::MIN,
1179                ));
1180            }
1181            AuthorityStorePruner::prune_objects(vec![effects], &db, None, 0, metrics, true)
1182                .await
1183                .unwrap();
1184            to_keep
1185        };
1186        tokio::time::sleep(Duration::from_secs(3)).await;
1187        to_keep
1188    }
1189
1190    // Tests pruning old version of live objects.
1191    #[tokio::test]
1192    async fn test_pruning_objects() {
1193        let path = tempfile::tempdir().unwrap().keep();
1194        let to_keep = run_pruner(&path, 3, 2, 1000).await;
1195        assert_eq!(
1196            HashSet::from_iter(to_keep),
1197            get_keys_after_pruning(&path).unwrap()
1198        );
1199        run_pruner(&tempfile::tempdir().unwrap().keep(), 3, 2, 1000).await;
1200    }
1201
1202    // Tests pruning deleted objects (object tombstones).
1203    #[tokio::test]
1204    async fn test_pruning_tombstones() {
1205        let path = tempfile::tempdir().unwrap().keep();
1206        let to_keep = run_pruner(&path, 0, 0, 1000).await;
1207        assert_eq!(to_keep.len(), 0);
1208        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1209
1210        let path = tempfile::tempdir().unwrap().keep();
1211        let to_keep = run_pruner(&path, 3, 0, 1000).await;
1212        assert_eq!(to_keep.len(), 0);
1213        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1214    }
1215
1216    #[cfg(not(target_env = "msvc"))]
1217    #[tokio::test]
1218    async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1219        let primary_path = tempfile::tempdir()?.keep();
1220        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None, None));
1221        let total_unique_object_ids = 10_000;
1222        let num_versions_per_object = 10;
1223        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1224        let mut to_delete = vec![];
1225        for id in ids {
1226            for i in (0..num_versions_per_object).rev() {
1227                if i < num_versions_per_object - 2 {
1228                    to_delete.push((id, SequenceNumber::from(i)));
1229                }
1230                let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1231                perpetual_db
1232                    .objects
1233                    .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1234            }
1235        }
1236
1237        fn get_sst_size(path: &Path) -> u64 {
1238            let mut size = 0;
1239            for entry in std::fs::read_dir(path).unwrap() {
1240                let entry = entry.unwrap();
1241                let path = entry.path();
1242                if let Some(ext) = path.extension() {
1243                    if ext != "sst" {
1244                        continue;
1245                    }
1246                    size += std::fs::metadata(path).unwrap().len();
1247                }
1248            }
1249            size
1250        }
1251
1252        let db_path = primary_path.clone().join("perpetual");
1253        let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1254        let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1255
1256        perpetual_db.objects.compact_range(&start, &end)?;
1257        let before_compaction_size = get_sst_size(&db_path);
1258
1259        let mut effects = TransactionEffects::default();
1260        for object in to_delete {
1261            effects.unsafe_add_deleted_live_object_for_testing((
1262                object.0,
1263                object.1,
1264                ObjectDigest::MIN,
1265            ));
1266        }
1267        let registry = Registry::default();
1268        let metrics = AuthorityStorePruningMetrics::new(&registry);
1269        let total_pruned = AuthorityStorePruner::prune_objects(
1270            vec![effects],
1271            &perpetual_db,
1272            None,
1273            0,
1274            metrics,
1275            true,
1276        )
1277        .await;
1278        info!("Total pruned keys = {:?}", total_pruned);
1279
1280        perpetual_db.objects.compact_range(&start, &end)?;
1281        let after_compaction_size = get_sst_size(&db_path);
1282
1283        info!(
1284            "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1285            before_compaction_size, after_compaction_size
1286        );
1287        ma::assert_le!(after_compaction_size, before_compaction_size);
1288        Ok(())
1289    }
1290}