sui_indexer_alt/consistent_pruning.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use std::collections::BTreeMap;
use anyhow::bail;
use dashmap::DashMap;
use sui_indexer_alt_framework::types::base_types::ObjectID;
#[derive(Default)]
pub(crate) struct PruningLookupTable {
table: DashMap<u64, PruningInfo>,
}
pub(crate) struct PruningInfo {
/// For each object, whether this object was mutated or deleted in this checkpoint.
/// This will determine the prune checkpoint for this object.
info: BTreeMap<ObjectID, UpdateKind>,
}
enum UpdateKind {
/// This object was mutated in this checkpoint.
/// To prune, we should prune anything prior to this checkpoint.
Mutate,
/// This object was deleted in this checkpoint.
/// To prune, we should prune anything prior to this checkpoint,
/// as well as this checkpoint.
Delete,
}
impl PruningInfo {
pub fn new() -> Self {
Self {
info: BTreeMap::new(),
}
}
/// Add an object that was mutated in this checkpoint.
pub fn add_mutated_object(&mut self, object_id: ObjectID) {
let old = self.info.insert(object_id, UpdateKind::Mutate);
assert!(old.is_none(), "object already exists in pruning info");
}
/// Add an object that was deleted in this checkpoint.
pub fn add_deleted_object(&mut self, object_id: ObjectID) {
let old = self.info.insert(object_id, UpdateKind::Delete);
assert!(old.is_none(), "object already exists in pruning info");
}
}
impl PruningLookupTable {
pub fn insert(&self, checkpoint: u64, prune_info: PruningInfo) {
self.table.insert(checkpoint, prune_info);
}
/// Returns a list of (object_id, checkpoint_number) pairs where each pair indicates a
/// checkpoint whose immediate predecessor should be pruned. The checkpoint_number is exclusive,
/// meaning we'll prune the entry with the largest checkpoint number less than it.
///
/// For deletions, we include two entries:
/// 1. One to prune its immediate predecessor (like mutations)
/// 2. Another with checkpoint+1 to prune the deletion entry itself
///
/// Example:
/// - Create at CP 0
/// - Modify at CP 1, 2, 10
/// - Delete at CP 15
///
/// Returns pairs for:
/// - (obj, 1) // will prune CP 0 because 0 < 1
/// - (obj, 2) // will prune CP 1 because 1 < 2
/// - (obj, 10) // will prune CP 2 because 2 < 10
/// - (obj, 15) // will prune CP 10 because 10 < 15
/// - (obj, 16) // will prune CP 15 because 15 < 16
pub fn get_prune_info(
&self,
cp_from: u64,
cp_to_exclusive: u64,
) -> anyhow::Result<Vec<(ObjectID, u64)>> {
if cp_from >= cp_to_exclusive {
bail!(
"No valid range to take from the lookup table: from={}, to_exclusive={}",
cp_from,
cp_to_exclusive
);
}
let mut result = Vec::new();
for cp in cp_from..cp_to_exclusive {
let info = self
.table
.get(&cp)
.ok_or_else(|| anyhow::anyhow!("Prune info for checkpoint {cp} not found"))?;
for (object_id, update_kind) in &info.value().info {
result.push((*object_id, cp));
if matches!(update_kind, UpdateKind::Delete) {
result.push((*object_id, cp + 1));
}
}
}
Ok(result)
}
// Remove prune info for checkpoints that we no longer need.
// NOTE: Only call this when we have successfully pruned all the checkpoints in the range.
pub fn gc_prune_info(&self, cp_from: u64, cp_to_exclusive: u64) {
for cp in cp_from..cp_to_exclusive {
self.table.remove(&cp);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pruning_lookup_table_mutations() {
let table = PruningLookupTable::default();
let obj1 = ObjectID::random();
let obj2 = ObjectID::random();
// Checkpoint 1: obj1 mutated
let mut info1 = PruningInfo::new();
info1.add_mutated_object(obj1);
table.insert(1, info1);
// Checkpoint 2: obj2 mutated
let mut info2 = PruningInfo::new();
info2.add_mutated_object(obj2);
table.insert(2, info2);
// Prune checkpoints 1-2
let result = table.get_prune_info(1, 3).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0].1, 1);
assert_eq!(result[1].1, 2);
// Remove prune info for checkpoints 1-2
table.gc_prune_info(1, 3);
assert!(table.table.is_empty());
}
#[test]
fn test_pruning_lookup_table_deletions() {
let table = PruningLookupTable::default();
let obj = ObjectID::random();
// Checkpoint 1: obj mutated
let mut info1 = PruningInfo::new();
info1.add_mutated_object(obj);
table.insert(1, info1);
// Checkpoint 2: obj deleted
let mut info2 = PruningInfo::new();
info2.add_deleted_object(obj);
table.insert(2, info2);
// Prune checkpoints 1-2
let result = table.get_prune_info(1, 3).unwrap();
assert_eq!(result.len(), 3);
// For deleted objects, we prune up to and including the deletion checkpoint
assert_eq!(result[2].1, 3);
}
#[test]
fn test_missing_checkpoint() {
let table = PruningLookupTable::default();
let obj = ObjectID::random();
let mut info = PruningInfo::new();
info.add_mutated_object(obj);
table.insert(1, info);
// Try to prune checkpoint that doesn't exist in the lookup table.
assert!(table.get_prune_info(2, 3).is_err());
}
#[test]
fn test_multiple_updates() {
let table = PruningLookupTable::default();
let obj = ObjectID::random();
// Checkpoint 1: obj mutated
let mut info1 = PruningInfo::new();
info1.add_mutated_object(obj);
table.insert(1, info1);
// Checkpoint 2: obj mutated again
let mut info2 = PruningInfo::new();
info2.add_mutated_object(obj);
table.insert(2, info2);
// Prune checkpoints 1-2
let result = table.get_prune_info(1, 3).unwrap();
assert_eq!(result.len(), 2);
// Don't dedupe entries for the same object
assert_eq!(result[0].1, 1);
assert_eq!(result[1].1, 2);
}
}