1use std::sync::Arc;
84
85use anyhow::Context as _;
86use prometheus::IntCounter;
87use prometheus::IntGauge;
88use prometheus::Registry;
89use prometheus::register_int_counter_with_registry;
90use prometheus::register_int_gauge_with_registry;
91use sui_consistent_store::Batch;
92use sui_consistent_store::Db;
93use sui_consistent_store::FrameworkSchema;
94use sui_consistent_store::Schema;
95use sui_indexer_alt_framework::service::Service;
96use sui_types::base_types::ObjectID;
97use sui_types::effects::TransactionEffectsAPI;
98use sui_types::message_envelope::Message;
99use tokio::time::MissedTickBehavior;
100use tracing::debug;
101use tracing::info;
102use tracing::warn;
103
104use crate::RpcStoreSchema;
105use crate::config::PrunerConfig;
106use crate::schema::checkpoint_seq_by_digest;
107use crate::schema::event_bitmap;
108use crate::schema::object_version_by_checkpoint;
109use crate::schema::objects;
110use crate::schema::primitives::U64Be;
111use crate::schema::pruning_watermark;
112use crate::schema::pruning_watermark::Watermarks;
113use crate::schema::transaction_bitmap;
114use crate::schema::tx_seq_by_digest;
115
116pub struct PrunerMetrics {
118 pub checkpoint_lo: IntGauge,
121 pub tx_seq_lo: IntGauge,
124 pub chunks_committed: IntCounter,
126 pub objects_deleted: IntCounter,
128}
129
130impl PrunerMetrics {
131 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
132 let prefix = prefix.unwrap_or("rpc_store_pruner");
133 let name = |n| format!("{prefix}_{n}");
134
135 Arc::new(Self {
136 checkpoint_lo: register_int_gauge_with_registry!(
137 name("checkpoint_lo"),
138 "Lowest still-available checkpoint sequence number (pruning floor)",
139 registry,
140 )
141 .unwrap(),
142 tx_seq_lo: register_int_gauge_with_registry!(
143 name("tx_seq_lo"),
144 "Lowest still-available transaction sequence number (pruning floor)",
145 registry,
146 )
147 .unwrap(),
148 chunks_committed: register_int_counter_with_registry!(
149 name("chunks_committed"),
150 "Total pruning chunks committed",
151 registry,
152 )
153 .unwrap(),
154 objects_deleted: register_int_counter_with_registry!(
155 name("objects_deleted"),
156 "Total superseded object versions and tombstones deleted by the pruner",
157 registry,
158 )
159 .unwrap(),
160 })
161 }
162}
163
164pub fn start_pruner(
171 db: Db,
172 config: PrunerConfig,
173 metrics: Arc<PrunerMetrics>,
174) -> anyhow::Result<Service> {
175 anyhow::ensure!(
176 config.retention_epochs >= 1,
177 "PrunerConfig::retention_epochs must be >= 1; 0 would prune the current epoch",
178 );
179 anyhow::ensure!(
180 config.max_checkpoints_per_tick >= 1,
181 "PrunerConfig::max_checkpoints_per_tick must be >= 1; 0 would never make progress",
182 );
183
184 let schema = Arc::new(RpcStoreSchema::open(&db).context("Opening schema for pruner")?);
187
188 let service = Service::new().spawn_aborting(async move {
189 let mut ticker = tokio::time::interval(config.interval());
190 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
191
192 loop {
193 ticker.tick().await;
194
195 let db = db.clone();
196 let schema = schema.clone();
197 let config = config.clone();
198 let metrics = metrics.clone();
199
200 let res =
203 tokio::task::spawn_blocking(move || prune_once(&db, &schema, &config, &metrics))
204 .await;
205
206 match res {
207 Ok(Ok(())) => {}
208 Ok(Err(e)) => {
209 warn!("rpc-store pruner pass failed (will retry next interval): {e:#}")
210 }
211 Err(e) => warn!("rpc-store pruner task join error: {e}"),
212 }
213 }
214 });
215
216 Ok(service)
217}
218
219fn prune_once(
222 db: &Db,
223 schema: &RpcStoreSchema,
224 config: &PrunerConfig,
225 metrics: &PrunerMetrics,
226) -> anyhow::Result<()> {
227 let Some(current_epoch) = current_committed_epoch(db)? else {
228 debug!("rpc-store pruner: no committed watermark yet; nothing to prune");
229 return Ok(());
230 };
231
232 let Some(retention_lo) =
233 retention_checkpoint_floor(schema, current_epoch, config.retention_epochs)?
234 else {
235 debug!(
236 current_epoch,
237 "rpc-store pruner: retention floor not yet reached; nothing to prune"
238 );
239 return Ok(());
240 };
241
242 let target_lo = clamp_to_snapshot(retention_lo, db.snapshot_range().map(|r| *r.start()));
244
245 let mut cursor = schema.get_pruning_watermarks()?.unwrap_or_default();
246 if target_lo <= cursor.checkpoint_lo {
247 debug!(
248 target_lo,
249 current_lo = cursor.checkpoint_lo,
250 "rpc-store pruner: floor already at or beyond target"
251 );
252 return Ok(());
253 }
254
255 let tick_target = target_lo.min(cursor.checkpoint_lo + config.max_checkpoints_per_tick);
260
261 info!(
262 from = cursor.checkpoint_lo,
263 to = tick_target,
264 target = target_lo,
265 current_epoch,
266 "rpc-store pruner: advancing floor"
267 );
268
269 while cursor.checkpoint_lo < tick_target {
270 let chunk_ckpt_hi = (cursor.checkpoint_lo + config.max_chunk_checkpoints).min(tick_target);
271 cursor = prune_chunk(db, schema, cursor, chunk_ckpt_hi, metrics)?;
272 metrics.checkpoint_lo.set(cursor.checkpoint_lo as i64);
273 metrics.tx_seq_lo.set(cursor.tx_seq_lo as i64);
274 metrics.chunks_committed.inc();
275 }
276
277 if cursor.checkpoint_lo >= target_lo {
286 db.compact_range_cf(transaction_bitmap::NAME, None, None)
287 .context("Compacting transaction_bitmap after prune")?;
288 db.compact_range_cf(event_bitmap::NAME, None, None)
289 .context("Compacting event_bitmap after prune")?;
290 }
291
292 Ok(())
293}
294
295fn prune_chunk(
298 db: &Db,
299 schema: &RpcStoreSchema,
300 cursor: Watermarks,
301 chunk_ckpt_hi: u64,
302 metrics: &PrunerMetrics,
303) -> anyhow::Result<Watermarks> {
304 let ckpt_lo = cursor.checkpoint_lo;
305 let tx_lo = cursor.tx_seq_lo;
306
307 let last_ckpt = chunk_ckpt_hi - 1;
314 let tx_hi = schema
315 .get_checkpoint_summary(last_ckpt)?
316 .with_context(|| format!("checkpoint_summary missing for checkpoint {last_ckpt}"))?
317 .data()
318 .network_total_transactions;
319
320 let mut batch = db.batch();
321 let mut objects_deleted: u64 = 0;
322
323 let mut tx_cursor = tx_lo;
332 for seq in ckpt_lo..chunk_ckpt_hi {
333 let summary = schema
338 .get_checkpoint_summary(seq)?
339 .with_context(|| format!("checkpoint_summary missing for checkpoint {seq}"))?;
340 let ckpt_tx_hi = summary.data().network_total_transactions;
341
342 for tx_seq in tx_cursor..ckpt_tx_hi {
343 let Some((effects, _unchanged)) = schema.get_effects(tx_seq)? else {
344 continue;
345 };
346 for (id, version) in effects.modified_at_versions() {
347 batch.delete(&schema.objects, &objects::Key { id, version })?;
348 retract_object_version_by_checkpoint(&mut batch, schema, id, seq, false)?;
352 objects_deleted += 1;
353 }
354 for (id, version) in effects.all_tombstones() {
355 batch.delete(&schema.objects, &objects::Key { id, version })?;
356 retract_object_version_by_checkpoint(&mut batch, schema, id, seq, true)?;
359 objects_deleted += 1;
360 }
361 batch.delete(
362 &schema.tx_seq_by_digest,
363 &tx_seq_by_digest::Key(*effects.transaction_digest()),
364 )?;
365 }
366 tx_cursor = ckpt_tx_hi;
367
368 batch.delete(
370 &schema.checkpoint_seq_by_digest,
371 &checkpoint_seq_by_digest::Key(summary.data().digest()),
372 )?;
373 }
374
375 batch.delete_range(&schema.transactions, &U64Be(tx_lo), &U64Be(tx_hi))?;
379 batch.delete_range(&schema.effects, &U64Be(tx_lo), &U64Be(tx_hi))?;
380 batch.delete_range(&schema.events, &U64Be(tx_lo), &U64Be(tx_hi))?;
381 batch.delete_range(&schema.tx_metadata_by_seq, &U64Be(tx_lo), &U64Be(tx_hi))?;
382 batch.delete_range(
383 &schema.checkpoint_summary,
384 &U64Be(ckpt_lo),
385 &U64Be(chunk_ckpt_hi),
386 )?;
387 batch.delete_range(
388 &schema.checkpoint_contents,
389 &U64Be(ckpt_lo),
390 &U64Be(chunk_ckpt_hi),
391 )?;
392
393 let new = Watermarks {
395 tx_seq_lo: tx_hi,
396 checkpoint_lo: chunk_ckpt_hi,
397 };
398 let (k, v) = pruning_watermark::store(&new);
399 batch.put(&schema.pruning_watermark, &k, &v)?;
400
401 batch.commit()?;
402
403 schema.set_pruning_floor(new.tx_seq_lo);
406 metrics.objects_deleted.inc_by(objects_deleted);
407
408 Ok(new)
409}
410
411fn retract_object_version_by_checkpoint(
430 batch: &mut Batch,
431 schema: &RpcStoreSchema,
432 id: ObjectID,
433 cp: u64,
434 removed: bool,
435) -> anyhow::Result<()> {
436 let lo = object_version_by_checkpoint::Key { id, checkpoint: 0 };
437 let hi = object_version_by_checkpoint::Key { id, checkpoint: cp };
438 batch.delete_range(&schema.object_version_by_checkpoint, &lo, &hi)?;
439 if removed {
440 batch.delete(&schema.object_version_by_checkpoint, &hi)?;
441 }
442 Ok(())
443}
444
445pub fn prune_history_cohort(
475 db: &Db,
476 schema: &RpcStoreSchema,
477 pruned_checkpoint_watermark: u64,
478 pruned_tx_seq_exclusive: u64,
479) -> anyhow::Result<()> {
480 let cursor = schema.get_pruning_watermarks()?.unwrap_or_default();
481 let tx_lo = cursor.tx_seq_lo;
482 let tx_hi = pruned_tx_seq_exclusive;
483 let checkpoint_lo = pruned_checkpoint_watermark.saturating_add(1);
486
487 if tx_hi <= tx_lo && checkpoint_lo <= cursor.checkpoint_lo {
490 return Ok(());
491 }
492
493 let mut batch = db.batch();
494
495 for entry in schema.iter_tx_seq_digests(tx_lo, tx_hi)? {
501 let (_tx_seq, digest) = entry?;
502 batch.delete(&schema.tx_seq_by_digest, &tx_seq_by_digest::Key(digest))?;
503 }
504 batch.delete_range(&schema.tx_metadata_by_seq, &U64Be(tx_lo), &U64Be(tx_hi))?;
505
506 let new = Watermarks {
510 tx_seq_lo: tx_hi.max(tx_lo),
511 checkpoint_lo: checkpoint_lo.max(cursor.checkpoint_lo),
512 };
513 let (k, v) = pruning_watermark::store(&new);
514 batch.put(&schema.pruning_watermark, &k, &v)?;
515 batch.commit()?;
516
517 schema.set_pruning_floor(new.tx_seq_lo);
521 db.compact_range_cf(transaction_bitmap::NAME, None, None)
522 .context("Compacting transaction_bitmap after prune")?;
523 db.compact_range_cf(event_bitmap::NAME, None, None)
524 .context("Compacting event_bitmap after prune")?;
525
526 Ok(())
527}
528
529fn current_committed_epoch(db: &Db) -> anyhow::Result<Option<u64>> {
537 let framework = FrameworkSchema::new(db.clone());
538 let mut min_epoch: Option<u64> = None;
539 for entry in framework.watermarks.iter(..)? {
540 let (_, watermark) = entry?;
541 let epoch = watermark.epoch_hi_inclusive;
542 min_epoch = Some(min_epoch.map_or(epoch, |m| m.min(epoch)));
543 }
544 Ok(min_epoch)
545}
546
547fn retention_checkpoint_floor(
554 schema: &RpcStoreSchema,
555 current_epoch: u64,
556 retention_epochs: u64,
557) -> anyhow::Result<Option<u64>> {
558 debug_assert!(retention_epochs >= 1, "validated in start_pruner");
559
560 let oldest_retained = current_epoch.saturating_sub(retention_epochs - 1);
562 if oldest_retained == 0 {
563 return Ok(None);
565 }
566
567 let Some(info) = schema.get_epoch(oldest_retained)? else {
568 return Ok(None);
569 };
570 Ok(info.start_checkpoint)
571}
572
573fn clamp_to_snapshot(retention_lo: u64, oldest_snapshot: Option<u64>) -> u64 {
580 match oldest_snapshot {
581 Some(snap) => retention_lo.min(snap),
582 None => retention_lo,
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use std::sync::Arc;
589
590 use prometheus::Registry;
591 use sui_consistent_store::Db;
592 use sui_consistent_store::DbOptions;
593 use sui_consistent_store::PipelineTaskKey;
594 use sui_consistent_store::Watermark;
595 use sui_indexer_alt_framework::pipeline::Processor;
596 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
597
598 use super::*;
599 use crate::schema::epochs;
600 use crate::schema::primitives::U64Varint;
601
602 fn fresh_db() -> (tempfile::TempDir, Db, RpcStoreSchema) {
603 let dir = tempfile::tempdir().unwrap();
604 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
605 (dir, db, schema)
606 }
607
608 async fn seed(
615 db: &Db,
616 schema: &RpcStoreSchema,
617 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
618 ) {
619 let mut batch = db.batch();
620 for row in crate::indexer::objects::Objects
621 .process(checkpoint)
622 .await
623 .unwrap()
624 {
625 batch
626 .put(
627 &schema.objects,
628 &objects::Key {
629 id: row.id,
630 version: row.version,
631 },
632 &row.value,
633 )
634 .unwrap();
635 }
636 for row in crate::indexer::effects::Effects
637 .process(checkpoint)
638 .await
639 .unwrap()
640 {
641 batch
642 .put(&schema.effects, &U64Be(row.tx_seq), &row.value)
643 .unwrap();
644 }
645 for row in crate::indexer::checkpoint_summary::CheckpointSummary
646 .process(checkpoint)
647 .await
648 .unwrap()
649 {
650 batch
651 .put(&schema.checkpoint_summary, &U64Be(row.seq), &row.value)
652 .unwrap();
653 }
654 for row in crate::indexer::tx_seq_by_digest::TxSeqByDigest
655 .process(checkpoint)
656 .await
657 .unwrap()
658 {
659 batch
660 .put(
661 &schema.tx_seq_by_digest,
662 &tx_seq_by_digest::Key(row.digest),
663 &U64Varint(row.tx_seq),
664 )
665 .unwrap();
666 }
667 for row in crate::indexer::checkpoint_seq_by_digest::CheckpointSeqByDigest
668 .process(checkpoint)
669 .await
670 .unwrap()
671 {
672 batch
673 .put(
674 &schema.checkpoint_seq_by_digest,
675 &checkpoint_seq_by_digest::Key(row.digest),
676 &U64Varint(row.seq),
677 )
678 .unwrap();
679 }
680 batch.commit().unwrap();
681 }
682
683 #[test]
684 fn clamp_to_snapshot_holds_floor_at_or_below_oldest_snapshot() {
685 assert_eq!(clamp_to_snapshot(100, None), 100);
687 assert_eq!(clamp_to_snapshot(100, Some(250)), 100);
689 assert_eq!(clamp_to_snapshot(300, Some(250)), 250);
691 assert_eq!(clamp_to_snapshot(250, Some(250)), 250);
693 }
694
695 #[test]
696 fn retention_floor_none_when_chain_younger_than_window() {
697 let (_dir, _db, schema) = fresh_db();
698 assert!(retention_checkpoint_floor(&schema, 2, 5).unwrap().is_none());
701 }
702
703 #[test]
704 fn retention_floor_is_start_checkpoint_of_oldest_retained_epoch() {
705 let (_dir, db, schema) = fresh_db();
706 let mut batch = db.batch();
708 batch
709 .merge(
710 &schema.epochs,
711 &U64Be(3),
712 &epochs::start(1, 1, 0, Some(300), None),
713 )
714 .unwrap();
715 batch.commit().unwrap();
716 assert_eq!(
719 retention_checkpoint_floor(&schema, 5, 3).unwrap(),
720 Some(300)
721 );
722 }
723
724 #[test]
725 fn retention_floor_none_when_oldest_epoch_row_missing() {
726 let (_dir, _db, schema) = fresh_db();
727 assert!(
729 retention_checkpoint_floor(&schema, 10, 2)
730 .unwrap()
731 .is_none()
732 );
733 }
734
735 #[test]
736 fn current_committed_epoch_takes_min_across_watermarks() {
737 let (_dir, db, _schema) = fresh_db();
738 let framework = FrameworkSchema::new(db.clone());
739 let mut batch = db.batch();
740 batch
741 .put(
742 &framework.watermarks,
743 &PipelineTaskKey::new("a"),
744 &Watermark {
745 epoch_hi_inclusive: 7,
746 ..Default::default()
747 },
748 )
749 .unwrap();
750 batch
751 .put(
752 &framework.watermarks,
753 &PipelineTaskKey::new("b"),
754 &Watermark {
755 epoch_hi_inclusive: 5,
756 ..Default::default()
757 },
758 )
759 .unwrap();
760 batch.commit().unwrap();
761 assert_eq!(current_committed_epoch(&db).unwrap(), Some(5));
762 }
763
764 #[test]
765 fn current_committed_epoch_none_when_no_watermarks() {
766 let (_dir, db, _schema) = fresh_db();
767 assert!(current_committed_epoch(&db).unwrap().is_none());
768 }
769
770 #[test]
776 fn bitmap_buckets_below_floor_are_evicted_by_compaction() {
777 use std::sync::atomic::Ordering;
778
779 use crate::schema::pruning_watermark::tx_seq_floor;
780 use crate::schema::transaction_bitmap;
781
782 let baseline = tx_seq_floor().load(Ordering::Relaxed);
785 let (_dir, db, schema) = fresh_db();
786 let dim = b"sender:alice".to_vec();
787
788 let mut bitmap0 = roaring::RoaringBitmap::new();
793 bitmap0.insert(transaction_bitmap::bit_of(5));
794 let mut bitmap1 = roaring::RoaringBitmap::new();
795 bitmap1.insert(transaction_bitmap::bit_of(
796 transaction_bitmap::TX_BUCKET_SIZE + 5,
797 ));
798 let (k0, v0) = transaction_bitmap::store_bitmap(dim.clone(), 0, bitmap0);
799 let (k1, v1) = transaction_bitmap::store_bitmap(dim.clone(), 1, bitmap1);
800
801 let mut batch = db.batch();
802 batch.put(&schema.transaction_bitmap, &k0, &v0).unwrap();
803 batch.put(&schema.transaction_bitmap, &k1, &v1).unwrap();
804 batch.commit().unwrap();
805 db.flush().unwrap();
806
807 schema.set_pruning_floor(transaction_bitmap::TX_BUCKET_SIZE);
811 db.compact_range_cf(transaction_bitmap::NAME, None, None)
812 .unwrap();
813
814 assert!(
815 schema
816 .get_transaction_bitmap(dim.clone(), 0)
817 .unwrap()
818 .is_none(),
819 "fully-pruned bucket 0 should be evicted by compaction",
820 );
821 assert!(
822 schema.get_transaction_bitmap(dim, 1).unwrap().is_some(),
823 "bucket 1 above the floor must remain",
824 );
825
826 tx_seq_floor().store(baseline, Ordering::Relaxed);
827 }
828
829 #[tokio::test]
834 async fn prune_chunk_advances_the_bitmap_floor_atomic() {
835 use std::sync::atomic::Ordering;
836
837 use crate::schema::pruning_watermark::tx_seq_floor;
838
839 let baseline = tx_seq_floor().load(Ordering::Relaxed);
842
843 let (_dir, db, schema) = fresh_db();
844 let checkpoint = Arc::new(
845 TestCheckpointBuilder::new(0)
846 .start_transaction(0)
847 .create_owned_object(0)
848 .finish_transaction()
849 .start_transaction(0)
850 .transfer_object(0, 1)
851 .finish_transaction()
852 .build_checkpoint(),
853 );
854 seed(&db, &schema, &checkpoint).await;
855
856 let metrics = PrunerMetrics::new(None, &Registry::new());
857 let new = prune_chunk(&db, &schema, Watermarks::default(), 1, &metrics).unwrap();
858
859 assert_eq!(
860 tx_seq_floor().load(Ordering::Relaxed),
861 new.tx_seq_lo,
862 "the chunk must publish its new tx_seq floor to the bitmap atomic",
863 );
864
865 tx_seq_floor().store(baseline, Ordering::Relaxed);
866 }
867
868 #[test]
869 fn start_pruner_rejects_zero_retention() {
870 let (_dir, db, _schema) = fresh_db();
871 let config = PrunerConfig {
872 retention_epochs: 0,
873 ..PrunerConfig::default()
874 };
875 let err = start_pruner(db, config, PrunerMetrics::new(None, &Registry::new())).unwrap_err();
876 assert!(
877 format!("{err:#}").contains("retention_epochs"),
878 "expected a retention_epochs validation error, got: {err:#}",
879 );
880 }
881
882 #[test]
883 fn start_pruner_rejects_zero_checkpoints_per_tick() {
884 let (_dir, db, _schema) = fresh_db();
885 let config = PrunerConfig {
886 max_checkpoints_per_tick: 0,
887 ..PrunerConfig::default()
888 };
889 let err = start_pruner(db, config, PrunerMetrics::new(None, &Registry::new())).unwrap_err();
890 assert!(
891 format!("{err:#}").contains("max_checkpoints_per_tick"),
892 "expected a max_checkpoints_per_tick validation error, got: {err:#}",
893 );
894 }
895
896 #[tokio::test]
904 async fn prune_once_advances_at_most_the_per_tick_budget() {
905 use std::sync::atomic::Ordering;
906
907 use crate::schema::pruning_watermark::tx_seq_floor;
908
909 let baseline = tx_seq_floor().load(Ordering::Relaxed);
912
913 let (_dir, db, schema) = fresh_db();
914
915 let mut builder = TestCheckpointBuilder::new(0);
919 let mut checkpoints = Vec::new();
920 for i in 0..5u64 {
921 builder = builder
922 .start_transaction(0)
923 .create_owned_object(i)
924 .finish_transaction();
925 checkpoints.push(Arc::new(builder.build_checkpoint()));
926 }
927 for cp in &checkpoints {
928 seed(&db, &schema, cp).await;
929 }
930
931 let framework = FrameworkSchema::new(db.clone());
936 let mut batch = db.batch();
937 batch
938 .put(
939 &framework.watermarks,
940 &PipelineTaskKey::new("p"),
941 &Watermark {
942 epoch_hi_inclusive: 2,
943 ..Default::default()
944 },
945 )
946 .unwrap();
947 batch
948 .merge(
949 &schema.epochs,
950 &U64Be(2),
951 &epochs::start(1, 1, 0, Some(5), None),
952 )
953 .unwrap();
954 batch.commit().unwrap();
955
956 let config = PrunerConfig {
957 retention_epochs: 1,
958 interval_ms: 1,
959 max_chunk_checkpoints: 2,
960 max_checkpoints_per_tick: 2,
961 };
962 let metrics = PrunerMetrics::new(None, &Registry::new());
963
964 let floor = |schema: &RpcStoreSchema| {
965 schema
966 .get_pruning_watermarks()
967 .unwrap()
968 .unwrap_or_default()
969 .checkpoint_lo
970 };
971
972 prune_once(&db, &schema, &config, &metrics).unwrap();
974 assert_eq!(floor(&schema), 2, "first tick advances by the budget");
975 prune_once(&db, &schema, &config, &metrics).unwrap();
976 assert_eq!(floor(&schema), 4, "second tick advances by the budget");
977 prune_once(&db, &schema, &config, &metrics).unwrap();
978 assert_eq!(floor(&schema), 5, "third tick reaches the target");
979
980 assert!(schema.get_effects(4).unwrap().is_none());
983 assert!(schema.get_checkpoint_summary(4).unwrap().is_none());
984 prune_once(&db, &schema, &config, &metrics).unwrap();
985 assert_eq!(floor(&schema), 5, "a pass at the target is a no-op");
986
987 tx_seq_floor().store(baseline, Ordering::Relaxed);
988 }
989
990 #[tokio::test]
997 async fn prune_chunk_deletes_history_and_preserves_live_object() {
998 let (_dir, db, schema) = fresh_db();
999
1000 let checkpoint = Arc::new(
1001 TestCheckpointBuilder::new(0)
1002 .start_transaction(0)
1003 .create_owned_object(0)
1004 .finish_transaction()
1005 .start_transaction(0)
1006 .transfer_object(0, 1)
1007 .finish_transaction()
1008 .build_checkpoint(),
1009 );
1010
1011 let obj0 = TestCheckpointBuilder::derive_object_id(0);
1012 let v_a = checkpoint.transactions[0].effects.lamport_version();
1013 let v_b = checkpoint.transactions[1].effects.lamport_version();
1014 assert_ne!(v_a, v_b, "the transfer must bump the object's version");
1015 let digest0 = *checkpoint.transactions[0].effects.transaction_digest();
1016 let digest1 = *checkpoint.transactions[1].effects.transaction_digest();
1017 let ckpt_digest = checkpoint.summary.data().digest();
1018
1019 seed(&db, &schema, &checkpoint).await;
1020
1021 assert!(schema.get_object_by_key(obj0, v_a).unwrap().is_some());
1023 assert!(schema.get_object_by_key(obj0, v_b).unwrap().is_some());
1024 assert!(schema.get_effects(0).unwrap().is_some());
1025 assert!(schema.get_effects(1).unwrap().is_some());
1026 assert!(schema.get_checkpoint_summary(0).unwrap().is_some());
1027
1028 let metrics = PrunerMetrics::new(None, &Registry::new());
1030 let new = prune_chunk(&db, &schema, Watermarks::default(), 1, &metrics).unwrap();
1031 assert_eq!(
1032 new,
1033 Watermarks {
1034 tx_seq_lo: 2,
1035 checkpoint_lo: 1,
1036 },
1037 );
1038
1039 assert!(
1041 schema.get_object_by_key(obj0, v_a).unwrap().is_none(),
1042 "superseded version v_a should be pruned",
1043 );
1044 assert!(
1045 schema.get_object_by_key(obj0, v_b).unwrap().is_some(),
1046 "live version v_b must be preserved",
1047 );
1048
1049 assert!(schema.get_effects(0).unwrap().is_none());
1051 assert!(schema.get_effects(1).unwrap().is_none());
1052 assert!(schema.get_checkpoint_summary(0).unwrap().is_none());
1053
1054 assert!(
1056 schema
1057 .tx_seq_by_digest
1058 .get(&tx_seq_by_digest::Key(digest0))
1059 .unwrap()
1060 .is_none()
1061 );
1062 assert!(
1063 schema
1064 .tx_seq_by_digest
1065 .get(&tx_seq_by_digest::Key(digest1))
1066 .unwrap()
1067 .is_none()
1068 );
1069 assert!(
1070 schema
1071 .checkpoint_seq_by_digest
1072 .get(&checkpoint_seq_by_digest::Key(ckpt_digest))
1073 .unwrap()
1074 .is_none()
1075 );
1076
1077 assert_eq!(
1079 schema.get_pruning_watermarks().unwrap().unwrap(),
1080 Watermarks {
1081 tx_seq_lo: 2,
1082 checkpoint_lo: 1,
1083 },
1084 );
1085 }
1086
1087 #[tokio::test]
1096 async fn prune_chunk_retracts_version_only_when_superseding_tx_is_pruned() {
1097 let (_dir, db, schema) = fresh_db();
1098
1099 let mut builder = TestCheckpointBuilder::new(0)
1102 .start_transaction(0)
1103 .create_owned_object(0)
1104 .finish_transaction();
1105 let cp0 = Arc::new(builder.build_checkpoint());
1106 builder = builder
1107 .start_transaction(0)
1108 .transfer_object(0, 1)
1109 .finish_transaction();
1110 let cp1 = Arc::new(builder.build_checkpoint());
1111
1112 let obj0 = TestCheckpointBuilder::derive_object_id(0);
1113 let v_a = cp0.transactions[0].effects.lamport_version();
1114 let v_b = cp1.transactions[0].effects.lamport_version();
1115 assert_ne!(v_a, v_b);
1116
1117 seed(&db, &schema, &cp0).await;
1118 seed(&db, &schema, &cp1).await;
1119 let metrics = PrunerMetrics::new(None, &Registry::new());
1120
1121 let after_first = prune_chunk(&db, &schema, Watermarks::default(), 1, &metrics).unwrap();
1124 assert_eq!(
1125 after_first,
1126 Watermarks {
1127 tx_seq_lo: 1,
1128 checkpoint_lo: 1,
1129 },
1130 );
1131 assert!(schema.get_effects(0).unwrap().is_none());
1132 assert!(schema.get_effects(1).unwrap().is_some());
1133 assert!(
1134 schema.get_object_by_key(obj0, v_a).unwrap().is_some(),
1135 "v_a must survive while its superseding tx is still retained",
1136 );
1137
1138 let after_second = prune_chunk(&db, &schema, after_first, 2, &metrics).unwrap();
1141 assert_eq!(
1142 after_second,
1143 Watermarks {
1144 tx_seq_lo: 2,
1145 checkpoint_lo: 2,
1146 },
1147 );
1148 assert!(schema.get_effects(1).unwrap().is_none());
1149 assert!(
1150 schema.get_object_by_key(obj0, v_a).unwrap().is_none(),
1151 "v_a must be retracted once its superseding tx is pruned",
1152 );
1153 assert!(
1154 schema.get_object_by_key(obj0, v_b).unwrap().is_some(),
1155 "live v_b must be preserved",
1156 );
1157 }
1158
1159 #[tokio::test]
1170 async fn prune_chunk_retracts_object_version_by_checkpoint() {
1171 use crate::indexer::object_version_by_checkpoint::ObjectVersionByCheckpoint;
1172
1173 let (_dir, db, schema) = fresh_db();
1174
1175 let mut builder = TestCheckpointBuilder::new(0)
1176 .start_transaction(0)
1177 .create_owned_object(0)
1178 .finish_transaction();
1179 let cp0 = Arc::new(builder.build_checkpoint());
1180 builder = builder
1181 .start_transaction(0)
1182 .transfer_object(0, 1)
1183 .finish_transaction();
1184 let cp1 = Arc::new(builder.build_checkpoint());
1185
1186 let obj0 = TestCheckpointBuilder::derive_object_id(0);
1187 let v_a = cp0.transactions[0].effects.lamport_version();
1188 let v_b = cp1.transactions[0].effects.lamport_version();
1189 assert_ne!(v_a, v_b);
1190
1191 for cp in [&cp0, &cp1] {
1196 seed(&db, &schema, cp).await;
1197 let mut batch = db.batch();
1198 for row in ObjectVersionByCheckpoint::default()
1199 .process(cp)
1200 .await
1201 .unwrap()
1202 {
1203 let crate::indexer::object_version_by_checkpoint::Row::Change {
1206 id,
1207 checkpoint,
1208 version,
1209 } = row
1210 else {
1211 continue;
1212 };
1213 let (k, v) = object_version_by_checkpoint::store(id, checkpoint, version);
1214 batch
1215 .put(&schema.object_version_by_checkpoint, &k, &v)
1216 .unwrap();
1217 }
1218 batch.commit().unwrap();
1219 }
1220
1221 assert_eq!(
1223 schema.get_object_version_at_checkpoint(obj0, 0).unwrap(),
1224 Some(v_a),
1225 );
1226 assert_eq!(
1227 schema.get_object_version_at_checkpoint(obj0, 1).unwrap(),
1228 Some(v_b),
1229 );
1230
1231 let metrics = PrunerMetrics::new(None, &Registry::new());
1232
1233 let after_first = prune_chunk(&db, &schema, Watermarks::default(), 1, &metrics).unwrap();
1236 assert_eq!(
1237 schema.get_object_version_at_checkpoint(obj0, 0).unwrap(),
1238 Some(v_a),
1239 "cp0-pinned entry must survive while its superseding tx is retained",
1240 );
1241
1242 prune_chunk(&db, &schema, after_first, 2, &metrics).unwrap();
1245 assert_eq!(
1246 schema.get_object_version_at_checkpoint(obj0, 0).unwrap(),
1247 None,
1248 "cp0-pinned entry must be retracted once its superseding tx is pruned",
1249 );
1250 assert_eq!(
1251 schema.get_object_version_at_checkpoint(obj0, 1).unwrap(),
1252 Some(v_b),
1253 "cp1-pinned floor entry must be preserved",
1254 );
1255 }
1256
1257 #[test]
1263 fn prune_history_cohort_deletes_tx_metadata_and_advances_floor() {
1264 use sui_types::digests::TransactionDigest;
1265
1266 use crate::schema::tx_metadata_by_seq;
1267
1268 let (_dir, db, schema) = fresh_db();
1269
1270 let digests: Vec<TransactionDigest> =
1273 (0u8..6).map(|i| TransactionDigest::new([i; 32])).collect();
1274 let mut batch = db.batch();
1275 for (tx_seq, digest) in digests.iter().enumerate() {
1276 let tx_seq = tx_seq as u64;
1277 batch
1278 .put(
1279 &schema.tx_metadata_by_seq,
1280 &U64Be(tx_seq),
1281 &tx_metadata_by_seq::store(&tx_metadata_by_seq::Metadata {
1282 digest: *digest,
1283 checkpoint_seq: tx_seq,
1284 ckpt_position: 0,
1285 event_count: 0,
1286 timestamp_ms: 0,
1287 }),
1288 )
1289 .unwrap();
1290 batch
1291 .put(
1292 &schema.tx_seq_by_digest,
1293 &tx_seq_by_digest::Key(*digest),
1294 &U64Varint(tx_seq),
1295 )
1296 .unwrap();
1297 }
1298 batch.commit().unwrap();
1299
1300 prune_history_cohort(&db, &schema, 2, 3).unwrap();
1303
1304 for tx_seq in 0..3 {
1306 assert!(
1307 schema.get_tx_metadata_by_seq(tx_seq).unwrap().is_none(),
1308 "tx_metadata {tx_seq} should be pruned",
1309 );
1310 }
1311 for tx_seq in 3..6 {
1312 assert!(
1313 schema.get_tx_metadata_by_seq(tx_seq).unwrap().is_some(),
1314 "tx_metadata {tx_seq} should be retained",
1315 );
1316 }
1317
1318 for digest in &digests[0..3] {
1320 assert!(schema.get_tx_seq_by_digest(digest).unwrap().is_none());
1321 }
1322 for digest in &digests[3..6] {
1323 assert!(schema.get_tx_seq_by_digest(digest).unwrap().is_some());
1324 }
1325
1326 assert_eq!(
1328 schema.get_pruning_watermarks().unwrap(),
1329 Some(Watermarks {
1330 tx_seq_lo: 3,
1331 checkpoint_lo: 3,
1332 }),
1333 );
1334
1335 prune_history_cohort(&db, &schema, 2, 3).unwrap();
1337 assert_eq!(
1338 schema.get_pruning_watermarks().unwrap(),
1339 Some(Watermarks {
1340 tx_seq_lo: 3,
1341 checkpoint_lo: 3,
1342 }),
1343 );
1344 }
1345
1346 #[test]
1351 fn prune_history_cohort_handles_sparse_tx_seqs() {
1352 use sui_types::digests::TransactionDigest;
1353
1354 use crate::schema::tx_metadata_by_seq;
1355
1356 let (_dir, db, schema) = fresh_db();
1357
1358 let entries = [
1360 (0u64, [10u8; 32]),
1361 (500_000u64, [11u8; 32]),
1362 (999_999u64, [12u8; 32]),
1363 ];
1364 let mut batch = db.batch();
1365 for (tx_seq, digest_bytes) in entries {
1366 let digest = TransactionDigest::new(digest_bytes);
1367 batch
1368 .put(
1369 &schema.tx_metadata_by_seq,
1370 &U64Be(tx_seq),
1371 &tx_metadata_by_seq::store(&tx_metadata_by_seq::Metadata {
1372 digest,
1373 checkpoint_seq: tx_seq,
1374 ckpt_position: 0,
1375 event_count: 0,
1376 timestamp_ms: 0,
1377 }),
1378 )
1379 .unwrap();
1380 batch
1381 .put(
1382 &schema.tx_seq_by_digest,
1383 &tx_seq_by_digest::Key(digest),
1384 &U64Varint(tx_seq),
1385 )
1386 .unwrap();
1387 }
1388 batch.commit().unwrap();
1389
1390 prune_history_cohort(&db, &schema, 0, 600_000).unwrap();
1394
1395 assert!(schema.get_tx_metadata_by_seq(0).unwrap().is_none());
1396 assert!(schema.get_tx_metadata_by_seq(500_000).unwrap().is_none());
1397 assert!(schema.get_tx_metadata_by_seq(999_999).unwrap().is_some());
1398 assert!(
1399 schema
1400 .get_tx_seq_by_digest(&TransactionDigest::new([10u8; 32]))
1401 .unwrap()
1402 .is_none()
1403 );
1404 assert!(
1405 schema
1406 .get_tx_seq_by_digest(&TransactionDigest::new([11u8; 32]))
1407 .unwrap()
1408 .is_none()
1409 );
1410 assert!(
1411 schema
1412 .get_tx_seq_by_digest(&TransactionDigest::new([12u8; 32]))
1413 .unwrap()
1414 .is_some()
1415 );
1416 assert_eq!(
1417 schema.get_pruning_watermarks().unwrap(),
1418 Some(Watermarks {
1419 tx_seq_lo: 600_000,
1420 checkpoint_lo: 1,
1421 }),
1422 );
1423 }
1424}