1use std::collections::BTreeMap;
32use std::ops::Bound;
33use std::sync::Arc;
34
35use anyhow::Context as _;
36use async_trait::async_trait;
37use sui_consistent_store::Batch;
38use sui_consistent_store::Db;
39use sui_consistent_store::DbMap;
40use sui_consistent_store::PipelineTaskKey;
41use sui_consistent_store::Restore;
42use sui_consistent_store::error::Error;
43use sui_consistent_store::reader::Reader;
44use sui_consistent_store::restore_state;
45use sui_indexer_alt_framework::pipeline::Processor;
46use sui_indexer_alt_framework::pipeline::sequential;
47use sui_types::base_types::ObjectID;
48use sui_types::base_types::SequenceNumber;
49use sui_types::effects::TransactionEffectsAPI;
50use sui_types::full_checkpoint_content::Checkpoint;
51use sui_types::object::Object;
52
53use crate::RpcStoreSchema;
54use crate::indexer::Schema;
55use crate::indexer::Store;
56use crate::indexer::checkpoint_input_objects;
57use crate::indexer::checkpoint_output_objects;
58use crate::schema::object_version_by_checkpoint;
59
60#[derive(Default)]
68pub struct ObjectVersionByCheckpoint {
69 anchor: Option<u64>,
70}
71
72pub enum Row {
74 Change {
77 id: ObjectID,
78 checkpoint: u64,
79 version: SequenceNumber,
80 },
81 Floor {
86 id: ObjectID,
87 checkpoint: u64,
88 version: SequenceNumber,
89 },
90}
91
92impl ObjectVersionByCheckpoint {
93 pub fn for_restore(checkpoint: u64) -> Self {
96 Self {
97 anchor: Some(checkpoint),
98 }
99 }
100
101 pub fn with_anchor(anchor: Option<u64>) -> Self {
105 Self { anchor }
106 }
107}
108
109#[async_trait]
110impl Processor for ObjectVersionByCheckpoint {
111 const NAME: &'static str = "object_version_by_checkpoint";
112 type Value = Row;
113
114 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
115 let cp = checkpoint.summary.data().sequence_number;
116
117 let outputs = checkpoint_output_objects(checkpoint)?;
120 let mut rows: Vec<Row> = outputs
121 .iter()
122 .map(|(id, (object, _))| Row::Change {
123 id: *id,
124 checkpoint: cp,
125 version: object.version(),
126 })
127 .collect();
128
129 let mut removed: BTreeMap<ObjectID, SequenceNumber> = BTreeMap::new();
137 for tx in &checkpoint.transactions {
138 let lamport = tx.effects.lamport_version();
139 for oref in tx
140 .effects
141 .deleted()
142 .into_iter()
143 .chain(tx.effects.unwrapped_then_deleted())
144 .chain(tx.effects.wrapped())
145 {
146 removed
147 .entry(oref.0)
148 .and_modify(|v| *v = (*v).max(lamport))
149 .or_insert(lamport);
150 }
151 }
152
153 for (id, version) in removed {
154 if outputs.contains_key(&id) {
158 continue;
159 }
160 rows.push(Row::Change {
161 id,
162 checkpoint: cp,
163 version,
164 });
165 }
166
167 if self.anchor.is_some_and(|t| cp <= t) {
175 for (id, (input, _)) in checkpoint_input_objects(checkpoint)? {
176 rows.push(Row::Floor {
177 id,
178 checkpoint: cp,
179 version: input.version(),
180 });
181 }
182 }
183
184 Ok(rows)
185 }
186}
187
188impl Restore for ObjectVersionByCheckpoint {
189 type Schema = RpcStoreSchema;
190
191 fn restore(
192 &self,
193 schema: &Self::Schema,
194 object: &Object,
195 batch: &mut Batch,
196 ) -> anyhow::Result<()> {
197 let checkpoint = self
204 .anchor
205 .context("object_version_by_checkpoint restored without a restore anchor checkpoint")?;
206 let (key, value) =
210 object_version_by_checkpoint::store_restored(object.id(), checkpoint, object.version());
211 batch.put(&schema.object_version_by_checkpoint, &key, &value)?;
212 Ok(())
213 }
214}
215
216#[async_trait]
217impl sequential::Handler for ObjectVersionByCheckpoint {
218 type Store = Store;
219 type Batch = Vec<Row>;
220
221 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
222 batch.extend(values);
223 }
224
225 async fn commit<'a>(
226 &self,
227 batch: &Self::Batch,
228 conn: &mut sui_consistent_store::Connection<'a, Schema>,
229 ) -> anyhow::Result<usize> {
230 let cf = &conn.store.schema().object_version_by_checkpoint;
231
232 let mut count = 0;
233 for row in batch {
234 match row {
235 Row::Change {
236 id,
237 checkpoint,
238 version,
239 } => {
240 let (k, v) = object_version_by_checkpoint::store(*id, *checkpoint, *version);
241 conn.batch.put(cf, &k, &v)?;
242 count += 1;
243 }
244 Row::Floor {
245 id,
246 checkpoint,
247 version,
248 } => {
249 if is_first_appearance(cf, *id, *checkpoint)? {
254 let (k, v) = object_version_by_checkpoint::store(*id, 0, *version);
255 conn.batch.put(cf, &k, &v)?;
256 count += 1;
257 }
258 }
259 }
260 }
261 Ok(count)
262 }
263}
264
265pub(crate) fn restored_anchor(db: &Db) -> anyhow::Result<Option<u64>> {
269 let key = PipelineTaskKey::new(ObjectVersionByCheckpoint::NAME);
270 Ok(db
271 .framework()
272 .restore
273 .get(&key)?
274 .and_then(|s| match s.state {
275 Some(restore_state::State::Complete(c)) => Some(c.restored_at),
276 _ => None,
277 }))
278}
279
280fn is_first_appearance<R: Reader>(
286 cf: &DbMap<object_version_by_checkpoint::Key, object_version_by_checkpoint::Value, R>,
287 id: ObjectID,
288 checkpoint: u64,
289) -> Result<bool, Error> {
290 let lo = object_version_by_checkpoint::Key { id, checkpoint: 0 };
291 let hi = object_version_by_checkpoint::Key { id, checkpoint };
292 let seen = cf
293 .iter_rev((Bound::Included(lo), Bound::Excluded(hi)))?
294 .next()
295 .is_some();
296 Ok(!seen)
297}
298
299#[cfg(test)]
300mod tests {
301 use std::sync::Arc;
302
303 use sui_consistent_store::Db;
304 use sui_consistent_store::DbOptions;
305 use sui_consistent_store::FrameworkSchema;
306 use sui_consistent_store::RestoreState;
307 use sui_types::base_types::SuiAddress;
308 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
309
310 use super::*;
311
312 fn fresh_db() -> (tempfile::TempDir, Db, RpcStoreSchema) {
313 let dir = tempfile::tempdir().unwrap();
314 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
315 (dir, db, schema)
316 }
317
318 fn seed_restored_at(db: &Db, restored_at: u64) {
320 let framework = FrameworkSchema::new(db.clone());
321 let mut batch = db.batch();
322 batch
323 .put(
324 &framework.restore,
325 &PipelineTaskKey::new(ObjectVersionByCheckpoint::NAME),
326 &RestoreState {
327 state: Some(restore_state::State::Complete(restore_state::Complete {
328 restored_at,
329 })),
330 },
331 )
332 .unwrap();
333 batch.commit().unwrap();
334 }
335
336 fn put(schema: &RpcStoreSchema, db: &Db, id: ObjectID, checkpoint: u64, version: u64) {
337 let (k, v) =
338 object_version_by_checkpoint::store(id, checkpoint, SequenceNumber::from_u64(version));
339 let mut batch = db.batch();
340 batch
341 .put(&schema.object_version_by_checkpoint, &k, &v)
342 .unwrap();
343 batch.commit().unwrap();
344 }
345
346 #[tokio::test]
347 async fn process_runs_against_synthetic_checkpoint() {
348 let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
349 let _rows = ObjectVersionByCheckpoint::default()
350 .process(&checkpoint)
351 .await
352 .unwrap();
353 }
354
355 #[tokio::test]
358 async fn process_records_final_live_version() {
359 let checkpoint = Arc::new(
360 TestCheckpointBuilder::new(7)
361 .start_transaction(0)
362 .create_owned_object(0)
363 .finish_transaction()
364 .build_checkpoint(),
365 );
366 let created_id = TestCheckpointBuilder::derive_object_id(0);
367 let version = checkpoint.transactions[0].effects.lamport_version();
368
369 let rows = ObjectVersionByCheckpoint::with_anchor(Some(100))
372 .process(&checkpoint)
373 .await
374 .unwrap();
375 let found = rows.iter().find_map(|r| match r {
376 Row::Change {
377 id,
378 checkpoint,
379 version,
380 } if *id == created_id => Some((*checkpoint, *version)),
381 _ => None,
382 });
383 assert_eq!(found, Some((7, version)));
384 assert!(
387 !rows
388 .iter()
389 .any(|r| matches!(r, Row::Floor { id, .. } if *id == created_id))
390 );
391 }
392
393 #[tokio::test]
396 async fn process_records_tombstone_for_deleted_object() {
397 let mut builder = TestCheckpointBuilder::new(0)
398 .start_transaction(0)
399 .create_owned_object(0)
400 .finish_transaction();
401 let _cp0 = builder.build_checkpoint();
402 builder = builder
403 .start_transaction(0)
404 .delete_object(0)
405 .finish_transaction();
406 let cp1 = Arc::new(builder.build_checkpoint());
407
408 let deleted_id = TestCheckpointBuilder::derive_object_id(0);
409 let tombstone_version = cp1.transactions[0].effects.lamport_version();
410
411 let rows = ObjectVersionByCheckpoint::default()
412 .process(&cp1)
413 .await
414 .unwrap();
415 let found = rows.iter().find_map(|r| match r {
416 Row::Change {
417 id,
418 checkpoint,
419 version,
420 } if *id == deleted_id => Some((*checkpoint, *version)),
421 _ => None,
422 });
423 assert_eq!(found, Some((1, tombstone_version)));
424 }
425
426 #[tokio::test]
429 async fn process_emits_floor_candidate_for_input_object() {
430 let mut builder = TestCheckpointBuilder::new(0)
431 .start_transaction(0)
432 .create_owned_object(0)
433 .finish_transaction();
434 let cp0 = builder.build_checkpoint();
435 builder = builder
436 .start_transaction(0)
437 .transfer_object(0, 1)
438 .finish_transaction();
439 let cp1 = Arc::new(builder.build_checkpoint());
440
441 let obj = TestCheckpointBuilder::derive_object_id(0);
442 let incoming = cp0.transactions[0].effects.lamport_version();
443
444 let rows = ObjectVersionByCheckpoint::with_anchor(Some(100))
446 .process(&cp1)
447 .await
448 .unwrap();
449 let floor = rows.iter().find_map(|r| match r {
450 Row::Floor {
451 id,
452 checkpoint,
453 version,
454 } if *id == obj => Some((*checkpoint, *version)),
455 _ => None,
456 });
457 assert_eq!(floor, Some((1, incoming)), "input object floor candidate");
458 }
459
460 #[tokio::test]
463 async fn process_skips_floor_candidates_past_the_anchor() {
464 let mut builder = TestCheckpointBuilder::new(0)
465 .start_transaction(0)
466 .create_owned_object(0)
467 .finish_transaction();
468 let _cp0 = builder.build_checkpoint();
469 builder = builder
470 .start_transaction(0)
471 .transfer_object(0, 1)
472 .finish_transaction();
473 let cp1 = Arc::new(builder.build_checkpoint());
474
475 let rows = ObjectVersionByCheckpoint::with_anchor(Some(0))
478 .process(&cp1)
479 .await
480 .unwrap();
481 assert!(!rows.iter().any(|r| matches!(r, Row::Floor { .. })));
482 let rows = ObjectVersionByCheckpoint::default()
484 .process(&cp1)
485 .await
486 .unwrap();
487 assert!(!rows.iter().any(|r| matches!(r, Row::Floor { .. })));
488 }
489
490 #[test]
493 fn restore_writes_one_row_at_the_anchor() {
494 let (_dir, db, schema) = fresh_db();
495
496 let object =
497 Object::with_id_owner_for_testing(ObjectID::from_single_byte(1), SuiAddress::ZERO);
498
499 let mut batch = db.batch();
500 ObjectVersionByCheckpoint::for_restore(123)
501 .restore(&schema, &object, &mut batch)
502 .unwrap();
503 batch.commit().unwrap();
504
505 for cp in [122, 123, 200] {
506 assert_eq!(
507 schema
508 .get_object_version_at_checkpoint(object.id(), cp)
509 .unwrap(),
510 Some(object.version()),
511 );
512 }
513 }
514
515 #[test]
516 fn restored_anchor_reads_restore_state() {
517 let (_dir, db, _schema) = fresh_db();
518 assert_eq!(restored_anchor(&db).unwrap(), None);
520 seed_restored_at(&db, 42);
522 assert_eq!(restored_anchor(&db).unwrap(), Some(42));
523 }
524
525 #[test]
526 fn is_first_appearance_true_with_no_prior_row() {
527 let (_dir, _db, schema) = fresh_db();
528 let id = ObjectID::random();
529 assert!(is_first_appearance(&schema.object_version_by_checkpoint, id, 50).unwrap());
530 }
531
532 #[test]
533 fn is_first_appearance_false_when_already_seen() {
534 let (_dir, db, schema) = fresh_db();
535 let id = ObjectID::random();
536 put(&schema, &db, id, 30, 5);
538 assert!(!is_first_appearance(&schema.object_version_by_checkpoint, id, 50).unwrap());
539 }
540
541 #[test]
542 fn is_first_appearance_ignores_rows_at_or_above_the_checkpoint() {
543 let (_dir, db, schema) = fresh_db();
544 let id = ObjectID::random();
545 put(&schema, &db, id, 100, 9);
548 assert!(is_first_appearance(&schema.object_version_by_checkpoint, id, 50).unwrap());
549 }
550
551 #[test]
557 fn synthetic_floor_serves_reads_below_first_change() {
558 let (_dir, db, schema) = fresh_db();
559 let id = ObjectID::random();
560 let anchor = 100; let window_entry = SequenceNumber::from_u64(5); let v1 = SequenceNumber::from_u64(6); let mut batch = db.batch();
567 let (rk, rv) = object_version_by_checkpoint::store_restored(id, anchor, v1);
568 batch
569 .put(&schema.object_version_by_checkpoint, &rk, &rv)
570 .unwrap();
571 let (ck, cv) = object_version_by_checkpoint::store(id, 50, v1);
572 batch
573 .put(&schema.object_version_by_checkpoint, &ck, &cv)
574 .unwrap();
575 batch.commit().unwrap();
576
577 assert!(is_first_appearance(&schema.object_version_by_checkpoint, id, 50).unwrap());
580 let mut batch = db.batch();
581 let (fk, fv) = object_version_by_checkpoint::store(id, 0, window_entry);
582 batch
583 .put(&schema.object_version_by_checkpoint, &fk, &fv)
584 .unwrap();
585 batch.commit().unwrap();
586
587 assert_eq!(
590 schema.get_object_version_at_checkpoint(id, 30).unwrap(),
591 Some(window_entry),
592 );
593 assert_eq!(
595 schema.get_object_version_at_checkpoint(id, 50).unwrap(),
596 Some(v1),
597 );
598 assert_eq!(
599 schema.get_object_version_at_checkpoint(id, 75).unwrap(),
600 Some(v1),
601 );
602 assert_eq!(
603 schema.get_object_version_at_checkpoint(id, 100).unwrap(),
604 Some(v1),
605 );
606 }
607}