sui_rpc_store/indexer/
pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Background pruner for the historical column families.
5//!
6//! Pruning is a standalone [`Service`] rather than a framework
7//! pipeline: it does not consume checkpoints from ingestion, it reads
8//! the already-committed state and deletes data below a retention
9//! floor. The shape mirrors the validator's perpetual-store pruner
10//! (a periodic background task) more than the indexer framework's
11//! per-pipeline `prune` hook — the deletions are data-driven (we walk
12//! transaction effects to retract superseded object versions) and the
13//! floor is a single value shared across every historical CF.
14//!
15//! # What gets pruned
16//!
17//! - **Per-transaction CFs** (`transactions`, `effects`, `events`,
18//!   `tx_metadata_by_seq`) — range-deleted over the pruned `tx_seq`
19//!   range; the keys are contiguous big-endian `tx_seq`, so one range
20//!   tombstone per CF clears the chunk.
21//! - **Per-checkpoint CFs** (`checkpoint_summary`,
22//!   `checkpoint_contents`) — range-deleted over the pruned
23//!   checkpoint range.
24//! - **Digest reverse indexes** (`tx_seq_by_digest`,
25//!   `checkpoint_seq_by_digest`) — point-deleted; their keys are
26//!   digests, so we collect them from the data being pruned (tx
27//!   digests from each effects row, checkpoint digests from each
28//!   summary) before deleting.
29//! - **`objects` history** — point-deleted, effects-driven: each
30//!   pruned transaction's `modified_at_versions` (superseded input
31//!   versions) and `all_tombstones` (deleted / wrapped markers) are
32//!   the exact `(ObjectID, version)` rows that are now dead. The
33//!   latest live version is never an input to a pruned transaction,
34//!   so it — and the greatest `object_version_by_checkpoint` entry
35//!   that resolves to it — is preserved.
36//! - **`object_version_by_checkpoint`** — retracted in lockstep with
37//!   `objects` history: the same effects-driven walk issues a
38//!   per-object range delete clearing every checkpoint-pinned entry
39//!   below the superseding transaction's checkpoint, plus a point
40//!   delete of the tombstone entry when the object was removed. The
41//!   retained set mirrors the `objects` versions kept, so the index
42//!   never points at a pruned version.
43//! - **Ledger-history bitmaps** (`transaction_bitmap`,
44//!   `event_bitmap`) — not deleted directly; advancing the shared
45//!   [`tx_seq_floor`](crate::schema::pruning_watermark::tx_seq_floor)
46//!   lets their compaction filters drop fully-pruned buckets. We
47//!   force a compaction once the floor advances so the eviction is
48//!   prompt rather than waiting for a natural sweep.
49//!
50//! The live-set-bounded indexes (`object_by_owner`, `object_by_type`,
51//! `balance`, `package_versions`) and the tiny `epochs` CF are never
52//! pruned.
53//!
54//! # Floor, retention, and safety
55//!
56//! Retention is epoch-based: the `retention_epochs` most-recent
57//! epochs are retained, and the target floor is the start checkpoint
58//! of the oldest retained epoch. The floor is then clamped so it
59//! never advances past the oldest in-memory snapshot's checkpoint:
60//! point and range deletes are already invisible to a snapshot
61//! (RocksDB pins the data a live snapshot references), but the bitmap
62//! compaction filter physically removes buckets irrespective of
63//! snapshots, so the clamp keeps every live snapshot's advertised
64//! available range valid even under an aggressively small retention.
65//!
66//! Each tick advances the floor toward that target by at most
67//! `max_checkpoints_per_tick` checkpoints (in `max_chunk_checkpoints`
68//! atomic chunks), so a large backlog — for example when pruning is
69//! first enabled on an old database — drains across many ticks rather
70//! than one long blocking pass. The floor converges to the target
71//! over subsequent ticks.
72//!
73//! # Ordering and crash-safety
74//!
75//! Each chunk stages all of its deletes plus the new
76//! `PruningWatermarks` row into one atomic batch, commits, and only
77//! then advances the in-memory bitmap floor. Because the watermark
78//! row lives in the same batch as the deletes, a crash either loses
79//! the whole chunk (re-pruned next run) or commits it wholesale;
80//! there is no partial-delete-without-watermark state. Range and
81//! point deletes are idempotent, so a re-run is harmless.
82
83use std::sync::Arc;
84
85use anyhow::Context as _;
86use prometheus::IntCounter;
87use prometheus::IntGauge;
88use prometheus::Registry;
89use prometheus::register_int_counter_with_registry;
90use prometheus::register_int_gauge_with_registry;
91use sui_consistent_store::Batch;
92use sui_consistent_store::Db;
93use sui_consistent_store::FrameworkSchema;
94use sui_consistent_store::Schema;
95use sui_indexer_alt_framework::service::Service;
96use sui_types::base_types::ObjectID;
97use sui_types::effects::TransactionEffectsAPI;
98use sui_types::message_envelope::Message;
99use tokio::time::MissedTickBehavior;
100use tracing::debug;
101use tracing::info;
102use tracing::warn;
103
104use crate::RpcStoreSchema;
105use crate::config::PrunerConfig;
106use crate::schema::checkpoint_seq_by_digest;
107use crate::schema::event_bitmap;
108use crate::schema::object_version_by_checkpoint;
109use crate::schema::objects;
110use crate::schema::primitives::U64Be;
111use crate::schema::pruning_watermark;
112use crate::schema::pruning_watermark::Watermarks;
113use crate::schema::transaction_bitmap;
114use crate::schema::tx_seq_by_digest;
115
116/// Prometheus metrics for the pruner.
117pub struct PrunerMetrics {
118    /// Lowest still-available checkpoint sequence number — the
119    /// persisted checkpoint floor.
120    pub checkpoint_lo: IntGauge,
121    /// Lowest still-available transaction sequence number — the
122    /// persisted `tx_seq` floor.
123    pub tx_seq_lo: IntGauge,
124    /// Total pruning chunks committed.
125    pub chunks_committed: IntCounter,
126    /// Total superseded object versions and tombstones deleted.
127    pub objects_deleted: IntCounter,
128}
129
130impl PrunerMetrics {
131    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
132        let prefix = prefix.unwrap_or("rpc_store_pruner");
133        let name = |n| format!("{prefix}_{n}");
134
135        Arc::new(Self {
136            checkpoint_lo: register_int_gauge_with_registry!(
137                name("checkpoint_lo"),
138                "Lowest still-available checkpoint sequence number (pruning floor)",
139                registry,
140            )
141            .unwrap(),
142            tx_seq_lo: register_int_gauge_with_registry!(
143                name("tx_seq_lo"),
144                "Lowest still-available transaction sequence number (pruning floor)",
145                registry,
146            )
147            .unwrap(),
148            chunks_committed: register_int_counter_with_registry!(
149                name("chunks_committed"),
150                "Total pruning chunks committed",
151                registry,
152            )
153            .unwrap(),
154            objects_deleted: register_int_counter_with_registry!(
155                name("objects_deleted"),
156                "Total superseded object versions and tombstones deleted by the pruner",
157                registry,
158            )
159            .unwrap(),
160        })
161    }
162}
163
164/// Start the background pruner as a [`Service`].
165///
166/// Errors if `config.retention_epochs` is `0` (which would prune the
167/// current epoch). The returned service runs an infinite tick loop;
168/// it is aborted on graceful shutdown (each chunk is atomic, so an
169/// abort leaves the database consistent).
170pub fn start_pruner(
171    db: Db,
172    config: PrunerConfig,
173    metrics: Arc<PrunerMetrics>,
174) -> anyhow::Result<Service> {
175    anyhow::ensure!(
176        config.retention_epochs >= 1,
177        "PrunerConfig::retention_epochs must be >= 1; 0 would prune the current epoch",
178    );
179    anyhow::ensure!(
180        config.max_checkpoints_per_tick >= 1,
181        "PrunerConfig::max_checkpoints_per_tick must be >= 1; 0 would never make progress",
182    );
183
184    // Reconstruct typed handles once; they are cheap views over the
185    // shared `Db` and are reused across every tick.
186    let schema = Arc::new(RpcStoreSchema::open(&db).context("Opening schema for pruner")?);
187
188    let service = Service::new().spawn_aborting(async move {
189        let mut ticker = tokio::time::interval(config.interval());
190        ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
191
192        loop {
193            ticker.tick().await;
194
195            let db = db.clone();
196            let schema = schema.clone();
197            let config = config.clone();
198            let metrics = metrics.clone();
199
200            // The pruner does blocking RocksDB iteration and writes;
201            // keep it off the async runtime threads.
202            let res =
203                tokio::task::spawn_blocking(move || prune_once(&db, &schema, &config, &metrics))
204                    .await;
205
206            match res {
207                Ok(Ok(())) => {}
208                Ok(Err(e)) => {
209                    warn!("rpc-store pruner pass failed (will retry next interval): {e:#}")
210                }
211                Err(e) => warn!("rpc-store pruner task join error: {e}"),
212            }
213        }
214    });
215
216    Ok(service)
217}
218
219/// Run a single pruning pass: recompute the target floor and advance
220/// the persisted floor toward it one chunk at a time.
221fn prune_once(
222    db: &Db,
223    schema: &RpcStoreSchema,
224    config: &PrunerConfig,
225    metrics: &PrunerMetrics,
226) -> anyhow::Result<()> {
227    let Some(current_epoch) = current_committed_epoch(db)? else {
228        debug!("rpc-store pruner: no committed watermark yet; nothing to prune");
229        return Ok(());
230    };
231
232    let Some(retention_lo) =
233        retention_checkpoint_floor(schema, current_epoch, config.retention_epochs)?
234    else {
235        debug!(
236            current_epoch,
237            "rpc-store pruner: retention floor not yet reached; nothing to prune"
238        );
239        return Ok(());
240    };
241
242    // Never advance the floor past the oldest live snapshot.
243    let target_lo = clamp_to_snapshot(retention_lo, db.snapshot_range().map(|r| *r.start()));
244
245    let mut cursor = schema.get_pruning_watermarks()?.unwrap_or_default();
246    if target_lo <= cursor.checkpoint_lo {
247        debug!(
248            target_lo,
249            current_lo = cursor.checkpoint_lo,
250            "rpc-store pruner: floor already at or beyond target"
251        );
252        return Ok(());
253    }
254
255    // Bound the work done this tick: advance the floor by at most
256    // `max_checkpoints_per_tick` checkpoints so a large backlog drains
257    // across many ticks instead of one long blocking pass. The floor
258    // converges to `target_lo` over subsequent ticks.
259    let tick_target = target_lo.min(cursor.checkpoint_lo + config.max_checkpoints_per_tick);
260
261    info!(
262        from = cursor.checkpoint_lo,
263        to = tick_target,
264        target = target_lo,
265        current_epoch,
266        "rpc-store pruner: advancing floor"
267    );
268
269    while cursor.checkpoint_lo < tick_target {
270        let chunk_ckpt_hi = (cursor.checkpoint_lo + config.max_chunk_checkpoints).min(tick_target);
271        cursor = prune_chunk(db, schema, cursor, chunk_ckpt_hi, metrics)?;
272        metrics.checkpoint_lo.set(cursor.checkpoint_lo as i64);
273        metrics.tx_seq_lo.set(cursor.tx_seq_lo as i64);
274        metrics.chunks_committed.inc();
275    }
276
277    // The bitmap CFs' compaction filters only drop fully-pruned
278    // buckets on a compaction sweep; force one once the floor has
279    // reached its retention target so the eviction is prompt. While a
280    // backlog is still draining over multiple ticks we skip the
281    // whole-CF compaction so it does not become the per-tick long
282    // pole; natural background compaction still applies the same
283    // filter opportunistically in the meantime, and the final
284    // catch-up tick forces a prompt sweep.
285    if cursor.checkpoint_lo >= target_lo {
286        db.compact_range_cf(transaction_bitmap::NAME, None, None)
287            .context("Compacting transaction_bitmap after prune")?;
288        db.compact_range_cf(event_bitmap::NAME, None, None)
289            .context("Compacting event_bitmap after prune")?;
290    }
291
292    Ok(())
293}
294
295/// Prune one chunk of checkpoints `[cursor.checkpoint_lo,
296/// chunk_ckpt_hi)` and their transactions, returning the new floor.
297fn prune_chunk(
298    db: &Db,
299    schema: &RpcStoreSchema,
300    cursor: Watermarks,
301    chunk_ckpt_hi: u64,
302    metrics: &PrunerMetrics,
303) -> anyhow::Result<Watermarks> {
304    let ckpt_lo = cursor.checkpoint_lo;
305    let tx_lo = cursor.tx_seq_lo;
306
307    // The exclusive `tx_seq` upper bound for the chunk is the
308    // cumulative network tx count after the chunk's highest
309    // checkpoint, which is the first `tx_seq` of `chunk_ckpt_hi`.
310    // `chunk_ckpt_hi >= 1` by the caller's loop invariant, and
311    // `chunk_ckpt_hi - 1 >= ckpt_lo` is still retained (not yet
312    // pruned), so its summary is present.
313    let last_ckpt = chunk_ckpt_hi - 1;
314    let tx_hi = schema
315        .get_checkpoint_summary(last_ckpt)?
316        .with_context(|| format!("checkpoint_summary missing for checkpoint {last_ckpt}"))?
317        .data()
318        .network_total_transactions;
319
320    let mut batch = db.batch();
321    let mut objects_deleted: u64 = 0;
322
323    // Walk each pruned checkpoint and the transactions it contains.
324    // Consecutive summaries' `network_total_transactions` partition
325    // `[tx_lo, tx_hi)` into per-checkpoint tx ranges, so the containing
326    // checkpoint of every transaction is known here -- it is exactly
327    // the `seq` being walked -- without a per-transaction metadata
328    // lookup. Each effects row yields the object versions to retract
329    // and the transaction digest to unindex; a missing effects row
330    // means that transaction was already pruned (idempotent re-run).
331    let mut tx_cursor = tx_lo;
332    for seq in ckpt_lo..chunk_ckpt_hi {
333        // Every in-range summary is still present: the chunk has not
334        // deleted any yet, and prior chunks committed atomically. A
335        // miss is therefore corruption, not an expected re-run state,
336        // so fail loudly rather than mis-partition the tx range.
337        let summary = schema
338            .get_checkpoint_summary(seq)?
339            .with_context(|| format!("checkpoint_summary missing for checkpoint {seq}"))?;
340        let ckpt_tx_hi = summary.data().network_total_transactions;
341
342        for tx_seq in tx_cursor..ckpt_tx_hi {
343            let Some((effects, _unchanged)) = schema.get_effects(tx_seq)? else {
344                continue;
345            };
346            for (id, version) in effects.modified_at_versions() {
347                batch.delete(&schema.objects, &objects::Key { id, version })?;
348                // Retract checkpoint-pinned entries older than this
349                // supersession; the entry at `seq` (the object's final
350                // version in this checkpoint) is kept.
351                retract_object_version_by_checkpoint(&mut batch, schema, id, seq, false)?;
352                objects_deleted += 1;
353            }
354            for (id, version) in effects.all_tombstones() {
355                batch.delete(&schema.objects, &objects::Key { id, version })?;
356                // The object was removed in `seq`: drop its tombstone
357                // entry at `seq` too.
358                retract_object_version_by_checkpoint(&mut batch, schema, id, seq, true)?;
359                objects_deleted += 1;
360            }
361            batch.delete(
362                &schema.tx_seq_by_digest,
363                &tx_seq_by_digest::Key(*effects.transaction_digest()),
364            )?;
365        }
366        tx_cursor = ckpt_tx_hi;
367
368        // Unindex this checkpoint's digest reverse map.
369        batch.delete(
370            &schema.checkpoint_seq_by_digest,
371            &checkpoint_seq_by_digest::Key(summary.data().digest()),
372        )?;
373    }
374
375    // The `tx_seq`- and checkpoint-keyed CFs are contiguous, so one
376    // range delete each clears the whole chunk regardless of how many
377    // rows it spans.
378    batch.delete_range(&schema.transactions, &U64Be(tx_lo), &U64Be(tx_hi))?;
379    batch.delete_range(&schema.effects, &U64Be(tx_lo), &U64Be(tx_hi))?;
380    batch.delete_range(&schema.events, &U64Be(tx_lo), &U64Be(tx_hi))?;
381    batch.delete_range(&schema.tx_metadata_by_seq, &U64Be(tx_lo), &U64Be(tx_hi))?;
382    batch.delete_range(
383        &schema.checkpoint_summary,
384        &U64Be(ckpt_lo),
385        &U64Be(chunk_ckpt_hi),
386    )?;
387    batch.delete_range(
388        &schema.checkpoint_contents,
389        &U64Be(ckpt_lo),
390        &U64Be(chunk_ckpt_hi),
391    )?;
392
393    // Advance the persisted floor atomically with the deletes.
394    let new = Watermarks {
395        tx_seq_lo: tx_hi,
396        checkpoint_lo: chunk_ckpt_hi,
397    };
398    let (k, v) = pruning_watermark::store(&new);
399    batch.put(&schema.pruning_watermark, &k, &v)?;
400
401    batch.commit()?;
402
403    // The commit is durable; advance the in-memory bitmap floor so
404    // the compaction filters drop buckets below `tx_hi`.
405    schema.set_pruning_floor(new.tx_seq_lo);
406    metrics.objects_deleted.inc_by(objects_deleted);
407
408    Ok(new)
409}
410
411/// Retract `object_version_by_checkpoint` rows for one object, given a
412/// transaction at checkpoint `cp` that superseded or removed it, in
413/// lockstep with the `objects` CF.
414///
415/// Deletes every checkpoint-pinned entry for `id` strictly older than
416/// `cp` with a single per-object range delete. Once the floor advances
417/// past `cp`, the entry at `cp` (or a newer one) is the floor a
418/// checkpoint-pinned read resolves to, so the older entries can never
419/// be the answer again. Because the chunk only prunes checkpoints below
420/// the new floor, `cp` is itself below the floor, so the kept entry is
421/// never the answer to an in-range read either; it survives only until
422/// its own superseding transaction is pruned in a later chunk.
423///
424/// The entry *at* `cp` is kept for a supersession (it is the object's
425/// final live version in `cp`). When `removed` is set, the object was
426/// deleted or wrapped in `cp`: its tombstone entry at `cp` is dropped
427/// too, since nothing at or after the floor can reference a removed
428/// object.
429fn retract_object_version_by_checkpoint(
430    batch: &mut Batch,
431    schema: &RpcStoreSchema,
432    id: ObjectID,
433    cp: u64,
434    removed: bool,
435) -> anyhow::Result<()> {
436    let lo = object_version_by_checkpoint::Key { id, checkpoint: 0 };
437    let hi = object_version_by_checkpoint::Key { id, checkpoint: cp };
438    batch.delete_range(&schema.object_version_by_checkpoint, &lo, &hi)?;
439    if removed {
440        batch.delete(&schema.object_version_by_checkpoint, &hi)?;
441    }
442    Ok(())
443}
444
445/// Prune the embedded fullnode's history cohort up to a floor supplied
446/// by the validator's perpetual-store pruner.
447///
448/// Unlike [`start_pruner`], this is not epoch-driven and not a
449/// `Service`. The embedded deployment deactivates the raw chain-data
450/// CFs (`transactions`, `effects`, `events`, `objects`,
451/// `checkpoint_*`), so it can neither derive a retention floor nor walk
452/// effects to find the rows to delete. Instead the perpetual pruner —
453/// which owns the raw data — supplies the floor directly, and this
454/// prunes exactly the history-cohort CFs that grow without bound:
455///
456/// - `tx_metadata_by_seq` — range-deleted over
457///   `[old_tx_lo, pruned_tx_seq_exclusive)`.
458/// - `tx_seq_by_digest` — point-deleted; the digests are read from
459///   `tx_metadata_by_seq` (the only history CF that still carries them)
460///   over the pruned range, before that range is deleted.
461/// - `transaction_bitmap` / `event_bitmap` — evicted by advancing the
462///   shared `tx_seq` floor so their compaction filters drop
463///   fully-pruned buckets, then forcing a compaction.
464///
465/// The live cohort and the tiny `epochs` CF are never pruned.
466///
467/// `pruned_checkpoint_watermark` is the highest checkpoint the
468/// perpetual store has pruned (inclusive); `pruned_tx_seq_exclusive` is
469/// the first still-retained `tx_seq`. These mirror
470/// `sui_core::rpc_index::RpcIndexStore::prune`'s parameters so the
471/// embedded rpc-store and the legacy index prune in lockstep on the
472/// same floor. Idempotent: a re-run with the same or a lower floor is a
473/// no-op.
474pub fn prune_history_cohort(
475    db: &Db,
476    schema: &RpcStoreSchema,
477    pruned_checkpoint_watermark: u64,
478    pruned_tx_seq_exclusive: u64,
479) -> anyhow::Result<()> {
480    let cursor = schema.get_pruning_watermarks()?.unwrap_or_default();
481    let tx_lo = cursor.tx_seq_lo;
482    let tx_hi = pruned_tx_seq_exclusive;
483    // Lowest still-available checkpoint after this prune: the perpetual
484    // store has pruned through `pruned_checkpoint_watermark` inclusive.
485    let checkpoint_lo = pruned_checkpoint_watermark.saturating_add(1);
486
487    // No-op if the floor would not advance on either axis (idempotent
488    // re-run, or the perpetual floor is behind ours).
489    if tx_hi <= tx_lo && checkpoint_lo <= cursor.checkpoint_lo {
490        return Ok(());
491    }
492
493    let mut batch = db.batch();
494
495    // Unindex the digest reverse map for the pruned `tx_seq` range. The
496    // digests live in `tx_metadata_by_seq`; iterate it (seeking to the
497    // first present row) rather than point-getting each `tx_seq`, so a
498    // sparse range or an unknown (zero) floor costs work proportional to
499    // the rows present, not to the width of the interval.
500    for entry in schema.iter_tx_seq_digests(tx_lo, tx_hi)? {
501        let (_tx_seq, digest) = entry?;
502        batch.delete(&schema.tx_seq_by_digest, &tx_seq_by_digest::Key(digest))?;
503    }
504    batch.delete_range(&schema.tx_metadata_by_seq, &U64Be(tx_lo), &U64Be(tx_hi))?;
505
506    // Advance the persisted floor atomically with the deletes, taking
507    // the monotonic max on each axis so a stale lower floor never
508    // regresses an axis the other call already advanced.
509    let new = Watermarks {
510        tx_seq_lo: tx_hi.max(tx_lo),
511        checkpoint_lo: checkpoint_lo.max(cursor.checkpoint_lo),
512    };
513    let (k, v) = pruning_watermark::store(&new);
514    batch.put(&schema.pruning_watermark, &k, &v)?;
515    batch.commit()?;
516
517    // Durable now: advance the in-memory bitmap floor and force a
518    // compaction so the bitmap filters drop fully-pruned buckets
519    // promptly rather than waiting for a natural sweep.
520    schema.set_pruning_floor(new.tx_seq_lo);
521    db.compact_range_cf(transaction_bitmap::NAME, None, None)
522        .context("Compacting transaction_bitmap after prune")?;
523    db.compact_range_cf(event_bitmap::NAME, None, None)
524        .context("Compacting event_bitmap after prune")?;
525
526    Ok(())
527}
528
529/// The lowest epoch fully committed across every registered pipeline,
530/// or `None` if no pipeline has committed a watermark yet.
531///
532/// Taking the minimum is deliberately conservative: it lags the true
533/// tip epoch by at most one epoch while a pipeline catches up across
534/// a boundary, which only ever causes the pruner to retain slightly
535/// more.
536fn current_committed_epoch(db: &Db) -> anyhow::Result<Option<u64>> {
537    let framework = FrameworkSchema::new(db.clone());
538    let mut min_epoch: Option<u64> = None;
539    for entry in framework.watermarks.iter(..)? {
540        let (_, watermark) = entry?;
541        let epoch = watermark.epoch_hi_inclusive;
542        min_epoch = Some(min_epoch.map_or(epoch, |m| m.min(epoch)));
543    }
544    Ok(min_epoch)
545}
546
547/// The target checkpoint floor implied by epoch-based retention: the
548/// start checkpoint of the oldest epoch that is still retained.
549///
550/// Returns `None` when nothing is eligible yet — either the chain is
551/// younger than the retention window, or the oldest retained epoch's
552/// row (or its `start_checkpoint`) has not been observed.
553fn retention_checkpoint_floor(
554    schema: &RpcStoreSchema,
555    current_epoch: u64,
556    retention_epochs: u64,
557) -> anyhow::Result<Option<u64>> {
558    debug_assert!(retention_epochs >= 1, "validated in start_pruner");
559
560    // Retain epochs `[oldest_retained, current_epoch]`.
561    let oldest_retained = current_epoch.saturating_sub(retention_epochs - 1);
562    if oldest_retained == 0 {
563        // Epoch 0 is still retained, so no epoch has fully aged out.
564        return Ok(None);
565    }
566
567    let Some(info) = schema.get_epoch(oldest_retained)? else {
568        return Ok(None);
569    };
570    Ok(info.start_checkpoint)
571}
572
573/// Clamp the retention-derived floor so it never advances past the
574/// oldest in-memory snapshot's checkpoint. With no snapshots the
575/// retention floor stands; otherwise the floor is held at or below
576/// the oldest snapshot so that snapshot's advertised available range
577/// stays valid (and the bitmap compaction filter, which ignores
578/// snapshots, never drops a bucket the snapshot still serves).
579fn clamp_to_snapshot(retention_lo: u64, oldest_snapshot: Option<u64>) -> u64 {
580    match oldest_snapshot {
581        Some(snap) => retention_lo.min(snap),
582        None => retention_lo,
583    }
584}
585
586#[cfg(test)]
587mod tests {
588    use std::sync::Arc;
589
590    use prometheus::Registry;
591    use sui_consistent_store::Db;
592    use sui_consistent_store::DbOptions;
593    use sui_consistent_store::PipelineTaskKey;
594    use sui_consistent_store::Watermark;
595    use sui_indexer_alt_framework::pipeline::Processor;
596    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
597
598    use super::*;
599    use crate::schema::epochs;
600    use crate::schema::primitives::U64Varint;
601
602    fn fresh_db() -> (tempfile::TempDir, Db, RpcStoreSchema) {
603        let dir = tempfile::tempdir().unwrap();
604        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
605        (dir, db, schema)
606    }
607
608    /// Populate the CFs the pruner reads and deletes by running the
609    /// real pipelines' `process` over `checkpoint` and staging their
610    /// rows — `objects`, `effects`, `checkpoint_summary`, and the two
611    /// digest reverse indexes. These cover both deletion mechanisms
612    /// (range delete and point delete) plus the effects-driven object
613    /// retraction.
614    async fn seed(
615        db: &Db,
616        schema: &RpcStoreSchema,
617        checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
618    ) {
619        let mut batch = db.batch();
620        for row in crate::indexer::objects::Objects
621            .process(checkpoint)
622            .await
623            .unwrap()
624        {
625            batch
626                .put(
627                    &schema.objects,
628                    &objects::Key {
629                        id: row.id,
630                        version: row.version,
631                    },
632                    &row.value,
633                )
634                .unwrap();
635        }
636        for row in crate::indexer::effects::Effects
637            .process(checkpoint)
638            .await
639            .unwrap()
640        {
641            batch
642                .put(&schema.effects, &U64Be(row.tx_seq), &row.value)
643                .unwrap();
644        }
645        for row in crate::indexer::checkpoint_summary::CheckpointSummary
646            .process(checkpoint)
647            .await
648            .unwrap()
649        {
650            batch
651                .put(&schema.checkpoint_summary, &U64Be(row.seq), &row.value)
652                .unwrap();
653        }
654        for row in crate::indexer::tx_seq_by_digest::TxSeqByDigest
655            .process(checkpoint)
656            .await
657            .unwrap()
658        {
659            batch
660                .put(
661                    &schema.tx_seq_by_digest,
662                    &tx_seq_by_digest::Key(row.digest),
663                    &U64Varint(row.tx_seq),
664                )
665                .unwrap();
666        }
667        for row in crate::indexer::checkpoint_seq_by_digest::CheckpointSeqByDigest
668            .process(checkpoint)
669            .await
670            .unwrap()
671        {
672            batch
673                .put(
674                    &schema.checkpoint_seq_by_digest,
675                    &checkpoint_seq_by_digest::Key(row.digest),
676                    &U64Varint(row.seq),
677                )
678                .unwrap();
679        }
680        batch.commit().unwrap();
681    }
682
683    #[test]
684    fn clamp_to_snapshot_holds_floor_at_or_below_oldest_snapshot() {
685        // No snapshots: retention floor stands.
686        assert_eq!(clamp_to_snapshot(100, None), 100);
687        // Retention is well below the oldest snapshot: retention binds.
688        assert_eq!(clamp_to_snapshot(100, Some(250)), 100);
689        // Retention would overrun the oldest snapshot: clamp holds.
690        assert_eq!(clamp_to_snapshot(300, Some(250)), 250);
691        // Exactly at the oldest snapshot is allowed.
692        assert_eq!(clamp_to_snapshot(250, Some(250)), 250);
693    }
694
695    #[test]
696    fn retention_floor_none_when_chain_younger_than_window() {
697        let (_dir, _db, schema) = fresh_db();
698        // current_epoch=2, retention=5 => oldest_retained saturates to
699        // 0, so epoch 0 is still retained and nothing has aged out.
700        assert!(retention_checkpoint_floor(&schema, 2, 5).unwrap().is_none());
701    }
702
703    #[test]
704    fn retention_floor_is_start_checkpoint_of_oldest_retained_epoch() {
705        let (_dir, db, schema) = fresh_db();
706        // Seed epoch 3's start record at checkpoint 300.
707        let mut batch = db.batch();
708        batch
709            .merge(
710                &schema.epochs,
711                &U64Be(3),
712                &epochs::start(1, 1, 0, Some(300), None),
713            )
714            .unwrap();
715        batch.commit().unwrap();
716        // current_epoch=5, retention=3 => retain [3, 5], oldest
717        // retained is epoch 3, whose start checkpoint is the floor.
718        assert_eq!(
719            retention_checkpoint_floor(&schema, 5, 3).unwrap(),
720            Some(300)
721        );
722    }
723
724    #[test]
725    fn retention_floor_none_when_oldest_epoch_row_missing() {
726        let (_dir, _db, schema) = fresh_db();
727        // Oldest retained epoch is 9, but no row has been observed.
728        assert!(
729            retention_checkpoint_floor(&schema, 10, 2)
730                .unwrap()
731                .is_none()
732        );
733    }
734
735    #[test]
736    fn current_committed_epoch_takes_min_across_watermarks() {
737        let (_dir, db, _schema) = fresh_db();
738        let framework = FrameworkSchema::new(db.clone());
739        let mut batch = db.batch();
740        batch
741            .put(
742                &framework.watermarks,
743                &PipelineTaskKey::new("a"),
744                &Watermark {
745                    epoch_hi_inclusive: 7,
746                    ..Default::default()
747                },
748            )
749            .unwrap();
750        batch
751            .put(
752                &framework.watermarks,
753                &PipelineTaskKey::new("b"),
754                &Watermark {
755                    epoch_hi_inclusive: 5,
756                    ..Default::default()
757                },
758            )
759            .unwrap();
760        batch.commit().unwrap();
761        assert_eq!(current_committed_epoch(&db).unwrap(), Some(5));
762    }
763
764    #[test]
765    fn current_committed_epoch_none_when_no_watermarks() {
766        let (_dir, db, _schema) = fresh_db();
767        assert!(current_committed_epoch(&db).unwrap().is_none());
768    }
769
770    /// The bitmap eviction path end to end: with the floor advanced
771    /// past a bucket, a forced compaction runs the bucket's
772    /// compaction filter and drops it, while a bucket above the floor
773    /// survives. This is what `prune_once` relies on when it compacts
774    /// the bitmap CFs after a floor advance.
775    #[test]
776    fn bitmap_buckets_below_floor_are_evicted_by_compaction() {
777        use std::sync::atomic::Ordering;
778
779        use crate::schema::pruning_watermark::tx_seq_floor;
780        use crate::schema::transaction_bitmap;
781
782        // The floor is process-wide; snapshot and restore it so this
783        // test doesn't perturb others sharing the same atomic.
784        let baseline = tx_seq_floor().load(Ordering::Relaxed);
785        let (_dir, db, schema) = fresh_db();
786        let dim = b"sender:alice".to_vec();
787
788        // Materialize one bucket fully below the floor (bucket 0) and
789        // one above it (bucket 1). The compaction filter keys off the
790        // bucket id in the key, so the stored bitmap contents are
791        // immaterial here.
792        let mut bitmap0 = roaring::RoaringBitmap::new();
793        bitmap0.insert(transaction_bitmap::bit_of(5));
794        let mut bitmap1 = roaring::RoaringBitmap::new();
795        bitmap1.insert(transaction_bitmap::bit_of(
796            transaction_bitmap::TX_BUCKET_SIZE + 5,
797        ));
798        let (k0, v0) = transaction_bitmap::store_bitmap(dim.clone(), 0, bitmap0);
799        let (k1, v1) = transaction_bitmap::store_bitmap(dim.clone(), 1, bitmap1);
800
801        let mut batch = db.batch();
802        batch.put(&schema.transaction_bitmap, &k0, &v0).unwrap();
803        batch.put(&schema.transaction_bitmap, &k1, &v1).unwrap();
804        batch.commit().unwrap();
805        db.flush().unwrap();
806
807        // Advance the floor to the top of bucket 0, then force a
808        // compaction. Bucket 0's whole range is below the floor, so
809        // its filter returns Remove; bucket 1 straddles above it.
810        schema.set_pruning_floor(transaction_bitmap::TX_BUCKET_SIZE);
811        db.compact_range_cf(transaction_bitmap::NAME, None, None)
812            .unwrap();
813
814        assert!(
815            schema
816                .get_transaction_bitmap(dim.clone(), 0)
817                .unwrap()
818                .is_none(),
819            "fully-pruned bucket 0 should be evicted by compaction",
820        );
821        assert!(
822            schema.get_transaction_bitmap(dim, 1).unwrap().is_some(),
823            "bucket 1 above the floor must remain",
824        );
825
826        tx_seq_floor().store(baseline, Ordering::Relaxed);
827    }
828
829    /// A committed chunk advances the process-wide bitmap floor (the
830    /// value the bitmap CFs' compaction filters read) to the chunk's
831    /// new `tx_seq_lo`. The filter's own removal logic is covered by
832    /// `transaction_bitmap::should_remove_bucket`.
833    #[tokio::test]
834    async fn prune_chunk_advances_the_bitmap_floor_atomic() {
835        use std::sync::atomic::Ordering;
836
837        use crate::schema::pruning_watermark::tx_seq_floor;
838
839        // The floor is process-wide; snapshot and restore it so this
840        // test doesn't perturb others sharing the same atomic.
841        let baseline = tx_seq_floor().load(Ordering::Relaxed);
842
843        let (_dir, db, schema) = fresh_db();
844        let checkpoint = Arc::new(
845            TestCheckpointBuilder::new(0)
846                .start_transaction(0)
847                .create_owned_object(0)
848                .finish_transaction()
849                .start_transaction(0)
850                .transfer_object(0, 1)
851                .finish_transaction()
852                .build_checkpoint(),
853        );
854        seed(&db, &schema, &checkpoint).await;
855
856        let metrics = PrunerMetrics::new(None, &Registry::new());
857        let new = prune_chunk(&db, &schema, Watermarks::default(), 1, &metrics).unwrap();
858
859        assert_eq!(
860            tx_seq_floor().load(Ordering::Relaxed),
861            new.tx_seq_lo,
862            "the chunk must publish its new tx_seq floor to the bitmap atomic",
863        );
864
865        tx_seq_floor().store(baseline, Ordering::Relaxed);
866    }
867
868    #[test]
869    fn start_pruner_rejects_zero_retention() {
870        let (_dir, db, _schema) = fresh_db();
871        let config = PrunerConfig {
872            retention_epochs: 0,
873            ..PrunerConfig::default()
874        };
875        let err = start_pruner(db, config, PrunerMetrics::new(None, &Registry::new())).unwrap_err();
876        assert!(
877            format!("{err:#}").contains("retention_epochs"),
878            "expected a retention_epochs validation error, got: {err:#}",
879        );
880    }
881
882    #[test]
883    fn start_pruner_rejects_zero_checkpoints_per_tick() {
884        let (_dir, db, _schema) = fresh_db();
885        let config = PrunerConfig {
886            max_checkpoints_per_tick: 0,
887            ..PrunerConfig::default()
888        };
889        let err = start_pruner(db, config, PrunerMetrics::new(None, &Registry::new())).unwrap_err();
890        assert!(
891            format!("{err:#}").contains("max_checkpoints_per_tick"),
892            "expected a max_checkpoints_per_tick validation error, got: {err:#}",
893        );
894    }
895
896    /// A single `prune_once` pass advances the floor by at most
897    /// `max_checkpoints_per_tick` checkpoints, and successive passes
898    /// converge to the retention target. Five single-transaction
899    /// checkpoints are eligible (retention floor at checkpoint 5); a
900    /// per-tick budget of 2 must take three passes to drain them
901    /// (2, 4, 5), after which the floor sits at the target and further
902    /// passes are no-ops.
903    #[tokio::test]
904    async fn prune_once_advances_at_most_the_per_tick_budget() {
905        use std::sync::atomic::Ordering;
906
907        use crate::schema::pruning_watermark::tx_seq_floor;
908
909        // The floor is process-wide; snapshot and restore it so this
910        // test doesn't perturb others sharing the same atomic.
911        let baseline = tx_seq_floor().load(Ordering::Relaxed);
912
913        let (_dir, db, schema) = fresh_db();
914
915        // Five single-transaction checkpoints (seq 0..=4) from one
916        // accumulating builder, so `network_total_transactions` grows
917        // by one per checkpoint and the pruned tx range is contiguous.
918        let mut builder = TestCheckpointBuilder::new(0);
919        let mut checkpoints = Vec::new();
920        for i in 0..5u64 {
921            builder = builder
922                .start_transaction(0)
923                .create_owned_object(i)
924                .finish_transaction();
925            checkpoints.push(Arc::new(builder.build_checkpoint()));
926        }
927        for cp in &checkpoints {
928            seed(&db, &schema, cp).await;
929        }
930
931        // Drive the target floor: the committed epoch is 2, and with
932        // `retention_epochs = 1` the oldest retained epoch is 2, whose
933        // start checkpoint (5) is the target floor — so checkpoints
934        // [0, 5) are eligible.
935        let framework = FrameworkSchema::new(db.clone());
936        let mut batch = db.batch();
937        batch
938            .put(
939                &framework.watermarks,
940                &PipelineTaskKey::new("p"),
941                &Watermark {
942                    epoch_hi_inclusive: 2,
943                    ..Default::default()
944                },
945            )
946            .unwrap();
947        batch
948            .merge(
949                &schema.epochs,
950                &U64Be(2),
951                &epochs::start(1, 1, 0, Some(5), None),
952            )
953            .unwrap();
954        batch.commit().unwrap();
955
956        let config = PrunerConfig {
957            retention_epochs: 1,
958            interval_ms: 1,
959            max_chunk_checkpoints: 2,
960            max_checkpoints_per_tick: 2,
961        };
962        let metrics = PrunerMetrics::new(None, &Registry::new());
963
964        let floor = |schema: &RpcStoreSchema| {
965            schema
966                .get_pruning_watermarks()
967                .unwrap()
968                .unwrap_or_default()
969                .checkpoint_lo
970        };
971
972        // Each pass advances by at most the per-tick budget of 2.
973        prune_once(&db, &schema, &config, &metrics).unwrap();
974        assert_eq!(floor(&schema), 2, "first tick advances by the budget");
975        prune_once(&db, &schema, &config, &metrics).unwrap();
976        assert_eq!(floor(&schema), 4, "second tick advances by the budget");
977        prune_once(&db, &schema, &config, &metrics).unwrap();
978        assert_eq!(floor(&schema), 5, "third tick reaches the target");
979
980        // Caught up: history below the floor is gone, the live target
981        // boundary is retained, and another pass is a no-op.
982        assert!(schema.get_effects(4).unwrap().is_none());
983        assert!(schema.get_checkpoint_summary(4).unwrap().is_none());
984        prune_once(&db, &schema, &config, &metrics).unwrap();
985        assert_eq!(floor(&schema), 5, "a pass at the target is a no-op");
986
987        tx_seq_floor().store(baseline, Ordering::Relaxed);
988    }
989
990    /// End-to-end chunk prune: one checkpoint where tx0 creates an
991    /// object and tx1 transfers it (superseding the first version).
992    /// Pruning the chunk must range-delete the per-tx / per-checkpoint
993    /// CFs, point-delete the digest reverse indexes, retract the
994    /// superseded object version, preserve the live version, and
995    /// advance the persisted floor.
996    #[tokio::test]
997    async fn prune_chunk_deletes_history_and_preserves_live_object() {
998        let (_dir, db, schema) = fresh_db();
999
1000        let checkpoint = Arc::new(
1001            TestCheckpointBuilder::new(0)
1002                .start_transaction(0)
1003                .create_owned_object(0)
1004                .finish_transaction()
1005                .start_transaction(0)
1006                .transfer_object(0, 1)
1007                .finish_transaction()
1008                .build_checkpoint(),
1009        );
1010
1011        let obj0 = TestCheckpointBuilder::derive_object_id(0);
1012        let v_a = checkpoint.transactions[0].effects.lamport_version();
1013        let v_b = checkpoint.transactions[1].effects.lamport_version();
1014        assert_ne!(v_a, v_b, "the transfer must bump the object's version");
1015        let digest0 = *checkpoint.transactions[0].effects.transaction_digest();
1016        let digest1 = *checkpoint.transactions[1].effects.transaction_digest();
1017        let ckpt_digest = checkpoint.summary.data().digest();
1018
1019        seed(&db, &schema, &checkpoint).await;
1020
1021        // Preconditions: both versions present, history present.
1022        assert!(schema.get_object_by_key(obj0, v_a).unwrap().is_some());
1023        assert!(schema.get_object_by_key(obj0, v_b).unwrap().is_some());
1024        assert!(schema.get_effects(0).unwrap().is_some());
1025        assert!(schema.get_effects(1).unwrap().is_some());
1026        assert!(schema.get_checkpoint_summary(0).unwrap().is_some());
1027
1028        // Prune the whole checkpoint: checkpoints [0, 1), tx [0, 2).
1029        let metrics = PrunerMetrics::new(None, &Registry::new());
1030        let new = prune_chunk(&db, &schema, Watermarks::default(), 1, &metrics).unwrap();
1031        assert_eq!(
1032            new,
1033            Watermarks {
1034                tx_seq_lo: 2,
1035                checkpoint_lo: 1,
1036            },
1037        );
1038
1039        // Superseded version retracted; live version preserved.
1040        assert!(
1041            schema.get_object_by_key(obj0, v_a).unwrap().is_none(),
1042            "superseded version v_a should be pruned",
1043        );
1044        assert!(
1045            schema.get_object_by_key(obj0, v_b).unwrap().is_some(),
1046            "live version v_b must be preserved",
1047        );
1048
1049        // Range-deleted CFs are emptied over the pruned range.
1050        assert!(schema.get_effects(0).unwrap().is_none());
1051        assert!(schema.get_effects(1).unwrap().is_none());
1052        assert!(schema.get_checkpoint_summary(0).unwrap().is_none());
1053
1054        // Point-deleted digest reverse indexes are gone.
1055        assert!(
1056            schema
1057                .tx_seq_by_digest
1058                .get(&tx_seq_by_digest::Key(digest0))
1059                .unwrap()
1060                .is_none()
1061        );
1062        assert!(
1063            schema
1064                .tx_seq_by_digest
1065                .get(&tx_seq_by_digest::Key(digest1))
1066                .unwrap()
1067                .is_none()
1068        );
1069        assert!(
1070            schema
1071                .checkpoint_seq_by_digest
1072                .get(&checkpoint_seq_by_digest::Key(ckpt_digest))
1073                .unwrap()
1074                .is_none()
1075        );
1076
1077        // The persisted floor advanced.
1078        assert_eq!(
1079            schema.get_pruning_watermarks().unwrap().unwrap(),
1080            Watermarks {
1081                tx_seq_lo: 2,
1082                checkpoint_lo: 1,
1083            },
1084        );
1085    }
1086
1087    /// Advance the floor across two single-checkpoint chunks and
1088    /// confirm a superseded object version is retracted only once the
1089    /// chunk containing its *superseding* transaction is pruned.
1090    ///
1091    /// Checkpoint 0 creates `obj0@v_a`; checkpoint 1 transfers it to
1092    /// `obj0@v_b`. Pruning checkpoint 0 alone must keep `v_a` (its
1093    /// superseding transaction is still live); pruning checkpoint 1
1094    /// then retracts `v_a` while preserving the live `v_b`.
1095    #[tokio::test]
1096    async fn prune_chunk_retracts_version_only_when_superseding_tx_is_pruned() {
1097        let (_dir, db, schema) = fresh_db();
1098
1099        // One builder across two checkpoints so `network_total_transactions`
1100        // accumulates and the shared live-object set carries obj0 forward.
1101        let mut builder = TestCheckpointBuilder::new(0)
1102            .start_transaction(0)
1103            .create_owned_object(0)
1104            .finish_transaction();
1105        let cp0 = Arc::new(builder.build_checkpoint());
1106        builder = builder
1107            .start_transaction(0)
1108            .transfer_object(0, 1)
1109            .finish_transaction();
1110        let cp1 = Arc::new(builder.build_checkpoint());
1111
1112        let obj0 = TestCheckpointBuilder::derive_object_id(0);
1113        let v_a = cp0.transactions[0].effects.lamport_version();
1114        let v_b = cp1.transactions[0].effects.lamport_version();
1115        assert_ne!(v_a, v_b);
1116
1117        seed(&db, &schema, &cp0).await;
1118        seed(&db, &schema, &cp1).await;
1119        let metrics = PrunerMetrics::new(None, &Registry::new());
1120
1121        // Chunk 1: prune checkpoint 0 only (tx [0, 1)). obj0's
1122        // superseding transaction is in checkpoint 1, so v_a stays.
1123        let after_first = prune_chunk(&db, &schema, Watermarks::default(), 1, &metrics).unwrap();
1124        assert_eq!(
1125            after_first,
1126            Watermarks {
1127                tx_seq_lo: 1,
1128                checkpoint_lo: 1,
1129            },
1130        );
1131        assert!(schema.get_effects(0).unwrap().is_none());
1132        assert!(schema.get_effects(1).unwrap().is_some());
1133        assert!(
1134            schema.get_object_by_key(obj0, v_a).unwrap().is_some(),
1135            "v_a must survive while its superseding tx is still retained",
1136        );
1137
1138        // Chunk 2: prune checkpoint 1 (tx [1, 2)). Now the superseding
1139        // transaction is pruned, retracting v_a; v_b remains live.
1140        let after_second = prune_chunk(&db, &schema, after_first, 2, &metrics).unwrap();
1141        assert_eq!(
1142            after_second,
1143            Watermarks {
1144                tx_seq_lo: 2,
1145                checkpoint_lo: 2,
1146            },
1147        );
1148        assert!(schema.get_effects(1).unwrap().is_none());
1149        assert!(
1150            schema.get_object_by_key(obj0, v_a).unwrap().is_none(),
1151            "v_a must be retracted once its superseding tx is pruned",
1152        );
1153        assert!(
1154            schema.get_object_by_key(obj0, v_b).unwrap().is_some(),
1155            "live v_b must be preserved",
1156        );
1157    }
1158
1159    /// The checkpoint-pinned `object_version_by_checkpoint` index is
1160    /// retracted in lockstep with the `objects` history: a
1161    /// checkpoint-pinned entry survives until the transaction that
1162    /// supersedes its object is pruned, and is dropped once that
1163    /// transaction's checkpoint ages out.
1164    ///
1165    /// Checkpoint 0 creates `obj0@v_a`; checkpoint 1 transfers it to
1166    /// `obj0@v_b`. Pruning checkpoint 0 keeps the cp0-pinned entry (its
1167    /// superseding transaction is still retained); pruning checkpoint 1
1168    /// retracts it while preserving the cp1-pinned floor entry.
1169    #[tokio::test]
1170    async fn prune_chunk_retracts_object_version_by_checkpoint() {
1171        use crate::indexer::object_version_by_checkpoint::ObjectVersionByCheckpoint;
1172
1173        let (_dir, db, schema) = fresh_db();
1174
1175        let mut builder = TestCheckpointBuilder::new(0)
1176            .start_transaction(0)
1177            .create_owned_object(0)
1178            .finish_transaction();
1179        let cp0 = Arc::new(builder.build_checkpoint());
1180        builder = builder
1181            .start_transaction(0)
1182            .transfer_object(0, 1)
1183            .finish_transaction();
1184        let cp1 = Arc::new(builder.build_checkpoint());
1185
1186        let obj0 = TestCheckpointBuilder::derive_object_id(0);
1187        let v_a = cp0.transactions[0].effects.lamport_version();
1188        let v_b = cp1.transactions[0].effects.lamport_version();
1189        assert_ne!(v_a, v_b);
1190
1191        // Seed the base CFs the pruner reads (`seed` populates
1192        // `checkpoint_summary`, from which the pruner derives each
1193        // transaction's checkpoint) plus the checkpoint-pinned index
1194        // under test.
1195        for cp in [&cp0, &cp1] {
1196            seed(&db, &schema, cp).await;
1197            let mut batch = db.batch();
1198            for row in ObjectVersionByCheckpoint::default()
1199                .process(cp)
1200                .await
1201                .unwrap()
1202            {
1203                // Seed only the change rows; the floor candidates are
1204                // exercised in the pipeline's own tests.
1205                let crate::indexer::object_version_by_checkpoint::Row::Change {
1206                    id,
1207                    checkpoint,
1208                    version,
1209                } = row
1210                else {
1211                    continue;
1212                };
1213                let (k, v) = object_version_by_checkpoint::store(id, checkpoint, version);
1214                batch
1215                    .put(&schema.object_version_by_checkpoint, &k, &v)
1216                    .unwrap();
1217            }
1218            batch.commit().unwrap();
1219        }
1220
1221        // Precondition: obj0 resolves at both checkpoints.
1222        assert_eq!(
1223            schema.get_object_version_at_checkpoint(obj0, 0).unwrap(),
1224            Some(v_a),
1225        );
1226        assert_eq!(
1227            schema.get_object_version_at_checkpoint(obj0, 1).unwrap(),
1228            Some(v_b),
1229        );
1230
1231        let metrics = PrunerMetrics::new(None, &Registry::new());
1232
1233        // Prune checkpoint 0 only: tx0 creates obj0 and supersedes
1234        // nothing, so the cp0-pinned entry survives.
1235        let after_first = prune_chunk(&db, &schema, Watermarks::default(), 1, &metrics).unwrap();
1236        assert_eq!(
1237            schema.get_object_version_at_checkpoint(obj0, 0).unwrap(),
1238            Some(v_a),
1239            "cp0-pinned entry must survive while its superseding tx is retained",
1240        );
1241
1242        // Prune checkpoint 1: tx1 supersedes obj0@v_a, retracting the
1243        // cp0-pinned entry; the cp1-pinned floor entry remains.
1244        prune_chunk(&db, &schema, after_first, 2, &metrics).unwrap();
1245        assert_eq!(
1246            schema.get_object_version_at_checkpoint(obj0, 0).unwrap(),
1247            None,
1248            "cp0-pinned entry must be retracted once its superseding tx is pruned",
1249        );
1250        assert_eq!(
1251            schema.get_object_version_at_checkpoint(obj0, 1).unwrap(),
1252            Some(v_b),
1253            "cp1-pinned floor entry must be preserved",
1254        );
1255    }
1256
1257    /// `prune_history_cohort` (the embedded entry point) range-deletes
1258    /// `tx_metadata_by_seq`, point-deletes `tx_seq_by_digest` for the
1259    /// pruned digests, and advances the persisted floor — all from the
1260    /// floor the perpetual pruner supplies, without touching any raw
1261    /// chain-data CF.
1262    #[test]
1263    fn prune_history_cohort_deletes_tx_metadata_and_advances_floor() {
1264        use sui_types::digests::TransactionDigest;
1265
1266        use crate::schema::tx_metadata_by_seq;
1267
1268        let (_dir, db, schema) = fresh_db();
1269
1270        // Six transactions, tx_seq 0..6, each with a metadata row and a
1271        // digest -> tx_seq reverse-index entry.
1272        let digests: Vec<TransactionDigest> =
1273            (0u8..6).map(|i| TransactionDigest::new([i; 32])).collect();
1274        let mut batch = db.batch();
1275        for (tx_seq, digest) in digests.iter().enumerate() {
1276            let tx_seq = tx_seq as u64;
1277            batch
1278                .put(
1279                    &schema.tx_metadata_by_seq,
1280                    &U64Be(tx_seq),
1281                    &tx_metadata_by_seq::store(&tx_metadata_by_seq::Metadata {
1282                        digest: *digest,
1283                        checkpoint_seq: tx_seq,
1284                        ckpt_position: 0,
1285                        event_count: 0,
1286                        timestamp_ms: 0,
1287                    }),
1288                )
1289                .unwrap();
1290            batch
1291                .put(
1292                    &schema.tx_seq_by_digest,
1293                    &tx_seq_by_digest::Key(*digest),
1294                    &U64Varint(tx_seq),
1295                )
1296                .unwrap();
1297        }
1298        batch.commit().unwrap();
1299
1300        // Perpetual store has pruned through checkpoint 2; tx_seq 3 is
1301        // the first still-retained transaction.
1302        prune_history_cohort(&db, &schema, 2, 3).unwrap();
1303
1304        // tx_metadata 0..3 pruned, 3..6 retained.
1305        for tx_seq in 0..3 {
1306            assert!(
1307                schema.get_tx_metadata_by_seq(tx_seq).unwrap().is_none(),
1308                "tx_metadata {tx_seq} should be pruned",
1309            );
1310        }
1311        for tx_seq in 3..6 {
1312            assert!(
1313                schema.get_tx_metadata_by_seq(tx_seq).unwrap().is_some(),
1314                "tx_metadata {tx_seq} should be retained",
1315            );
1316        }
1317
1318        // Digest reverse index unindexed for the pruned range only.
1319        for digest in &digests[0..3] {
1320            assert!(schema.get_tx_seq_by_digest(digest).unwrap().is_none());
1321        }
1322        for digest in &digests[3..6] {
1323            assert!(schema.get_tx_seq_by_digest(digest).unwrap().is_some());
1324        }
1325
1326        // Floor advanced: tx_seq 3 and checkpoint 3 (= pruned 2 + 1).
1327        assert_eq!(
1328            schema.get_pruning_watermarks().unwrap(),
1329            Some(Watermarks {
1330                tx_seq_lo: 3,
1331                checkpoint_lo: 3,
1332            }),
1333        );
1334
1335        // Idempotent: a re-run at the same floor is a no-op.
1336        prune_history_cohort(&db, &schema, 2, 3).unwrap();
1337        assert_eq!(
1338            schema.get_pruning_watermarks().unwrap(),
1339            Some(Watermarks {
1340                tx_seq_lo: 3,
1341                checkpoint_lo: 3,
1342            }),
1343        );
1344    }
1345
1346    /// `prune_history_cohort` visits only the rows that exist when the
1347    /// floor is unknown (no prior watermark, so `tx_lo == 0`) and the
1348    /// `tx_seq` range is sparse with large gaps — it must not walk every
1349    /// integer in the interval.
1350    #[test]
1351    fn prune_history_cohort_handles_sparse_tx_seqs() {
1352        use sui_types::digests::TransactionDigest;
1353
1354        use crate::schema::tx_metadata_by_seq;
1355
1356        let (_dir, db, schema) = fresh_db();
1357
1358        // Three rows spread across a wide interval.
1359        let entries = [
1360            (0u64, [10u8; 32]),
1361            (500_000u64, [11u8; 32]),
1362            (999_999u64, [12u8; 32]),
1363        ];
1364        let mut batch = db.batch();
1365        for (tx_seq, digest_bytes) in entries {
1366            let digest = TransactionDigest::new(digest_bytes);
1367            batch
1368                .put(
1369                    &schema.tx_metadata_by_seq,
1370                    &U64Be(tx_seq),
1371                    &tx_metadata_by_seq::store(&tx_metadata_by_seq::Metadata {
1372                        digest,
1373                        checkpoint_seq: tx_seq,
1374                        ckpt_position: 0,
1375                        event_count: 0,
1376                        timestamp_ms: 0,
1377                    }),
1378                )
1379                .unwrap();
1380            batch
1381                .put(
1382                    &schema.tx_seq_by_digest,
1383                    &tx_seq_by_digest::Key(digest),
1384                    &U64Varint(tx_seq),
1385                )
1386                .unwrap();
1387        }
1388        batch.commit().unwrap();
1389
1390        // No prior pruning watermark (floor unknown -> 0); prune through
1391        // checkpoint 0 / tx_seq 600_000 exclusive. Only the two rows
1392        // below 600_000 are unindexed; the one at 999_999 survives.
1393        prune_history_cohort(&db, &schema, 0, 600_000).unwrap();
1394
1395        assert!(schema.get_tx_metadata_by_seq(0).unwrap().is_none());
1396        assert!(schema.get_tx_metadata_by_seq(500_000).unwrap().is_none());
1397        assert!(schema.get_tx_metadata_by_seq(999_999).unwrap().is_some());
1398        assert!(
1399            schema
1400                .get_tx_seq_by_digest(&TransactionDigest::new([10u8; 32]))
1401                .unwrap()
1402                .is_none()
1403        );
1404        assert!(
1405            schema
1406                .get_tx_seq_by_digest(&TransactionDigest::new([11u8; 32]))
1407                .unwrap()
1408                .is_none()
1409        );
1410        assert!(
1411            schema
1412                .get_tx_seq_by_digest(&TransactionDigest::new([12u8; 32]))
1413                .unwrap()
1414                .is_some()
1415        );
1416        assert_eq!(
1417            schema.get_pruning_watermarks().unwrap(),
1418            Some(Watermarks {
1419                tx_seq_lo: 600_000,
1420                checkpoint_lo: 1,
1421            }),
1422        );
1423    }
1424}