sui_rpc_store/indexer/
restore.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Entry point for bulk-loading the [`RpcStoreSchema`]'s
5//! derived-index CFs from a [`RestoreSource`].
6//!
7//! Registers the three live-object-derivable index pipelines
8//! ([`ObjectByOwner`], [`ObjectByType`], [`Balance`]) plus the
9//! [`ObjectVersionByCheckpoint`] and [`PackageVersions`] floor rows —
10//! and, when the caller's [`RestoreLayer`] opts in, the raw
11//! [`Objects`] CF — against a single [`RestoreDriver`] and returns a
12//! [`Service`] driving the restore through to completion. Once
13//! finished, every registered
14//! pipeline's `__restore` row is `Complete` and its `__watermark`
15//! row is set to the source's target, so the regular
16//! [`Indexer::add_pipelines`] path will accept them for tip
17//! indexing.
18//!
19//! Restoration is run separately from tip indexing — open the
20//! database, call [`restore_indexes`] to populate the indexes,
21//! then construct an [`Indexer`] over the same store to start
22//! tip-following.
23//!
24//! [`Indexer`]: crate::Indexer
25//! [`Indexer::add_pipelines`]: crate::Indexer::add_pipelines
26
27use std::sync::Arc;
28
29use anyhow::Context as _;
30use sui_consistent_store::Batch;
31use sui_consistent_store::ChainId;
32use sui_consistent_store::Db;
33use sui_consistent_store::FrameworkSchema;
34use sui_consistent_store::PipelineTaskKey;
35use sui_consistent_store::Schema as _;
36use sui_consistent_store::Watermark;
37use sui_consistent_store::restore::RestoreDriver;
38use sui_consistent_store::restore::RestoreDriverConfig;
39use sui_consistent_store::restore::RestoreSource;
40use sui_consistent_store::restore::metrics::RestoreMetrics;
41use sui_futures::service::Service;
42use sui_indexer_alt_framework::pipeline::Processor;
43use sui_types::storage::ObjectStore;
44use sui_types::sui_system_state::SuiSystemStateTrait;
45use sui_types::sui_system_state::get_sui_system_state;
46use tracing::info;
47use tracing::warn;
48
49use crate::RestoreLayer;
50use crate::RpcStoreReader;
51use crate::RpcStoreSchema;
52use crate::indexer::balance::Balance;
53use crate::indexer::checkpoint_contents::CheckpointContents;
54use crate::indexer::checkpoint_seq_by_digest::CheckpointSeqByDigest;
55use crate::indexer::checkpoint_summary::CheckpointSummary;
56use crate::indexer::effects::Effects;
57use crate::indexer::epochs::Epochs;
58use crate::indexer::event_bitmap::EventBitmap;
59use crate::indexer::events::Events;
60use crate::indexer::object_by_owner::ObjectByOwner;
61use crate::indexer::object_by_type::ObjectByType;
62use crate::indexer::object_version_by_checkpoint::ObjectVersionByCheckpoint;
63use crate::indexer::objects::Objects;
64use crate::indexer::package_versions::PackageVersions;
65use crate::indexer::transaction_bitmap::TransactionBitmap;
66use crate::indexer::transactions::Transactions;
67use crate::indexer::tx_metadata_by_seq::TxMetadataBySeq;
68use crate::indexer::tx_seq_by_digest::TxSeqByDigest;
69use crate::schema::epochs;
70use crate::schema::primitives::U64Be;
71use crate::schema::pruning_watermark;
72
73/// The embedded fullnode's **live cohort**: the pipelines that
74/// [`restore_indexes`] bulk-loads and that are restored to the
75/// perpetual store's tip `T`, then follow live from there. They are
76/// bounded by the live object set, so a snapshot restore reproduces
77/// them exactly.
78///
79/// Matches the live half of
80/// [`PipelineLayer::embedded`](crate::config::PipelineLayer::embedded);
81/// the `embedded_registers_only_cohort_pipelines` test pins the two
82/// together.
83pub const LIVE_COHORT: &[&str] = &[ObjectByOwner::NAME, ObjectByType::NAME, Balance::NAME];
84
85/// The embedded fullnode's **history cohort**: the pipelines seeded to
86/// the lowest available checkpoint `L` and backfilled upward from the
87/// perpetual store, then followed live.
88///
89/// Most cannot be reconstructed from a live-object snapshot at all --
90/// they record ledger history (`tx_seq` <-> digest maps, the
91/// transaction and event bitmaps) and per-epoch metadata (`epochs`) --
92/// so they are seeded, never restored.
93///
94/// `object_version_by_checkpoint` and `package_versions` are the
95/// exceptions: they are *both* restored and backfilled.
96/// [`restore_indexes`] bulk-loads their floor rows at the tip `T` (the
97/// versions live in the snapshot but predate the available window, so a
98/// checkpoint-bounded read treats them as having always existed), and
99/// the history seed then rewinds their `__watermark` to `L-1` so they
100/// also backfill the per-checkpoint detail over `(L, T]` --
101/// `object_version_by_checkpoint`'s per-checkpoint changes, and
102/// `package_versions`'s real publish checkpoint for versions published
103/// in the window. The embedded bootstrap runs the restore before the
104/// seed, so the `L-1` watermark wins.
105///
106/// Matches the history half of
107/// [`PipelineLayer::embedded`](crate::config::PipelineLayer::embedded).
108pub const HISTORY_COHORT: &[&str] = &[
109    Epochs::NAME,
110    ObjectVersionByCheckpoint::NAME,
111    PackageVersions::NAME,
112    TxSeqByDigest::NAME,
113    TxMetadataBySeq::NAME,
114    TransactionBitmap::NAME,
115    EventBitmap::NAME,
116];
117
118/// Register every [`Restore`]-implementing pipeline opted in by
119/// `layer` on a [`RestoreDriver`] bound to `db` / `schema` and
120/// `source`, then run the resulting [`Service`].
121///
122/// The live-cohort pipelines are always registered, plus
123/// `object_version_by_checkpoint` and `package_versions` -- history-
124/// cohort members whose floor rows are bulk-loaded from the live set
125/// here (the history seed separately rewinds their watermarks so they
126/// also backfill `(L, T]`). The raw [`Objects`] pipeline is only
127/// registered when `layer.objects` is set. The returned `Service`'s
128/// primary task completes once every registered pipeline transitions to
129/// [`RestoreState::Complete`].
130///
131/// [`Restore`]: sui_consistent_store::Restore
132/// [`RestoreState::Complete`]: sui_consistent_store::restore_state::Complete
133pub fn restore_indexes<Src: RestoreSource>(
134    db: Db,
135    schema: Arc<RpcStoreSchema>,
136    source: Src,
137    config: RestoreDriverConfig,
138    layer: RestoreLayer,
139    metrics: Arc<RestoreMetrics>,
140) -> anyhow::Result<Service> {
141    // Capture the anchor before the driver consumes `source`: the
142    // checkpoint-pinned object index attributes every restored live
143    // object to it.
144    let target_checkpoint = source.target_checkpoint();
145    let mut driver = RestoreDriver::new(db, schema, source, config, metrics);
146    // History-cohort members, but their floor rows are restored from the
147    // live set; the embedded history seed later rewinds their watermarks
148    // so they also backfill `(L, T]`.
149    driver.register(ObjectVersionByCheckpoint::for_restore(target_checkpoint))?;
150    driver.register(PackageVersions)?;
151    driver.register(ObjectByOwner)?;
152    driver.register(ObjectByType)?;
153    driver.register(Balance)?;
154    if layer.objects {
155        driver.register(Objects)?;
156    }
157    driver.run()
158}
159
160/// After [`restore_indexes`] returns, prime the framework state of
161/// every pipeline that the restore did *not* cover so tip indexing
162/// resumes from `target_watermark.checkpoint_hi_inclusive + 1`
163/// across the board instead of replaying from genesis for the
164/// raw-chain-data and bitmap pipelines.
165///
166/// Specifically, for every pipeline not in `layer`'s restored
167/// set, writes:
168///
169/// - `__watermark = target_watermark` — the framework's
170///   tip-resume reads this and starts at
171///   `checkpoint_hi_inclusive + 1`.
172/// - `__chain_id = target_chain_id` — pins the pipeline to the
173///   chain the snapshot was taken from, matching what
174///   [`restore_indexes`]'s finalize step already wrote for the
175///   restored pipelines.
176///
177/// Also writes the singleton `pruning_watermark` so
178/// `available_range` queries and the bitmap CFs' compaction
179/// filters reflect that data only starts at the post-restore
180/// floor (`tx_seq_lo = target_watermark.tx_hi`,
181/// `checkpoint_lo = checkpoint_hi_inclusive + 1`).
182///
183/// Idempotent: re-running after a successful restore overwrites
184/// the unrestored pipelines' watermarks with the same values and
185/// re-writes the pruning row.
186pub fn floor_unrestored_pipelines(
187    db: &Db,
188    target_watermark: Watermark,
189    target_chain_id: ChainId,
190    layer: &RestoreLayer,
191) -> anyhow::Result<()> {
192    let restored: &[&'static str] = if layer.objects {
193        &[
194            ObjectVersionByCheckpoint::NAME,
195            ObjectByOwner::NAME,
196            ObjectByType::NAME,
197            Balance::NAME,
198            PackageVersions::NAME,
199            Objects::NAME,
200        ]
201    } else {
202        &[
203            ObjectVersionByCheckpoint::NAME,
204            ObjectByOwner::NAME,
205            ObjectByType::NAME,
206            Balance::NAME,
207            PackageVersions::NAME,
208        ]
209    };
210
211    // Every rpc-store pipeline. Kept exhaustive so any new
212    // pipeline added to `PipelineLayer` needs an explicit
213    // decision here about whether it's a restore-time pipeline
214    // or a tip-only one.
215    let all: &[&'static str] = &[
216        Epochs::NAME,
217        CheckpointSummary::NAME,
218        CheckpointContents::NAME,
219        CheckpointSeqByDigest::NAME,
220        Transactions::NAME,
221        TxSeqByDigest::NAME,
222        TxMetadataBySeq::NAME,
223        Effects::NAME,
224        Events::NAME,
225        Objects::NAME,
226        ObjectVersionByCheckpoint::NAME,
227        ObjectByOwner::NAME,
228        ObjectByType::NAME,
229        Balance::NAME,
230        PackageVersions::NAME,
231        TransactionBitmap::NAME,
232        EventBitmap::NAME,
233    ];
234
235    // Use the owned `FrameworkSchema` over `Db` (rather than the
236    // borrowed view from `Db::framework`) so the `DbMap`s line up
237    // with `Batch::put`'s `R = Db` expectation.
238    let framework = FrameworkSchema::new(db.clone());
239    let mut batch = db.batch();
240    for name in all.iter().filter(|n| !restored.contains(n)) {
241        let key = PipelineTaskKey::new(*name);
242        batch
243            .put(&framework.watermarks, &key, &target_watermark)
244            .with_context(|| format!("stage __watermark for {name:?}"))?;
245        batch
246            .put(&framework.chain_ids, &key, &target_chain_id)
247            .with_context(|| format!("stage __chain_id for {name:?}"))?;
248    }
249
250    // Resolve the rpc-store schema handle once for the
251    // pruning-watermark CF. The schema is cheap to re-bind to a
252    // live `Db` and gives the typed `store` helper plus the
253    // pruning-floor setter the bitmap CFs depend on.
254    let schema =
255        Arc::new(RpcStoreSchema::open(db).context("re-open RpcStoreSchema for pruning watermark")?);
256    let (k, v) = pruning_watermark::store(&pruning_watermark::Watermarks {
257        tx_seq_lo: target_watermark.tx_hi,
258        checkpoint_lo: target_watermark.checkpoint_hi_inclusive.saturating_add(1),
259    });
260    batch
261        .put(&schema.pruning_watermark, &k, &v)
262        .context("stage pruning_watermark row")?;
263
264    // Seed the `epochs` row for the epoch the snapshot lands in. The
265    // chain advanced to it at the anchor's end-of-epoch checkpoint,
266    // but the `epochs` pipeline only emits a start record while
267    // processing such a checkpoint, which tip indexing skips on
268    // resume (it starts at anchor + 1). Without this seed, the
269    // current epoch's row would never get its protocol version, gas
270    // price, start timestamp, start checkpoint, or system state.
271    // Requires the raw objects to read the on-chain `SuiSystemState`,
272    // so it only runs when the `objects` CF was restored; failure to
273    // read it is logged and skipped rather than failing the restore.
274    if layer.objects {
275        let reader = RpcStoreReader::new(db.clone(), schema.clone());
276        let start_checkpoint = target_watermark.checkpoint_hi_inclusive.saturating_add(1);
277        match seed_current_epoch_start(&schema, &reader, Some(start_checkpoint), &mut batch) {
278            Ok(epoch) => info!(
279                epoch,
280                start_checkpoint, "seeded start record for restore epoch"
281            ),
282            Err(e) => warn!(
283                error = %e,
284                "could not seed the restore epoch's start record; get_epoch / \
285                 get_committee / Move type-layout resolution for the current epoch \
286                 will be unavailable until the next epoch boundary",
287            ),
288        }
289    }
290
291    batch.commit().context("commit floor batch")?;
292
293    // Mirror the on-disk floor into the process-wide atomic so any
294    // compaction-filter clones started in this process see the
295    // updated value immediately. A subsequent `Indexer::from_store`
296    // also calls `refresh_pruning_atomics` so cross-process boots
297    // converge.
298    schema.set_pruning_floor(target_watermark.tx_hi);
299
300    Ok(())
301}
302
303/// Stage a start record for the epoch reflected by the on-chain
304/// `SuiSystemState` in `objects`, keyed by that epoch.
305///
306/// The `epochs` pipeline derives a start record only from an
307/// end-of-epoch checkpoint's `epoch_info`, which a restore-then-tip
308/// flow never processes (tip indexing resumes at `anchor + 1`), so a
309/// freshly restored database has no start record for the epoch it
310/// landed in. This reconstructs that record straight from the
311/// restored object set: protocol version, reference gas price, and
312/// epoch-start timestamp come from the `SuiSystemState`, the BCS of
313/// which is stored so `get_committee` and Move type-layout resolution
314/// work too.
315///
316/// `start_checkpoint` is supplied by the caller and may be `None`.
317/// The formal-snapshot restore lands at an epoch boundary, so it
318/// passes `Some(anchor + 1)`. The embedded-fullnode restore lands at
319/// a *mid-epoch* tip, so the epoch's first checkpoint is unknown and
320/// it passes `None`; the upward backfill fills `start_checkpoint` in
321/// later if that boundary falls within the available range.
322///
323/// Stages a merge into `batch`; the caller commits. Returns the epoch
324/// that was seeded.
325pub fn seed_current_epoch_start(
326    schema: &RpcStoreSchema,
327    objects: &dyn ObjectStore,
328    start_checkpoint: Option<u64>,
329    batch: &mut Batch,
330) -> anyhow::Result<u64> {
331    let system_state =
332        get_sui_system_state(objects).context("read SuiSystemState from restored objects")?;
333    let epoch = system_state.epoch();
334    let system_state_bcs = bcs::to_bytes(&system_state).context("bcs encode SuiSystemState")?;
335    batch
336        .merge(
337            &schema.epochs,
338            &U64Be(epoch),
339            &epochs::start(
340                system_state.protocol_version(),
341                system_state.reference_gas_price(),
342                system_state.epoch_start_timestamp_ms(),
343                start_checkpoint,
344                Some(system_state_bcs),
345            ),
346        )
347        .context("stage epoch start seed")?;
348    Ok(epoch)
349}
350
351/// Seed the framework state for the embedded fullnode's
352/// [`HISTORY_COHORT`] after [`restore_indexes`] has bulk-loaded the
353/// [`LIVE_COHORT`].
354///
355/// The live cohort resumes from the restore target `T` (written by
356/// the restore driver's finalize step). The history cohort is *not*
357/// restored; instead each of its pipelines is seeded to
358/// `history_watermark` — the lowest available checkpoint `L` in the
359/// perpetual store — so tip indexing backfills `(L, T]` from the
360/// perpetual store and then follows live. For each history pipeline
361/// this writes:
362///
363/// - `__watermark = history_watermark` — the framework resumes at
364///   `history_watermark.checkpoint_hi_inclusive + 1`.
365/// - `__chain_id = chain_id` — pins the chain, matching what the
366///   restore driver wrote for the live cohort.
367///
368/// When `objects` is supplied, also seeds the current epoch's
369/// `epochs` row from its on-chain `SuiSystemState` — a *partial*
370/// start record without `start_checkpoint` (see
371/// [`seed_current_epoch_start`]) — so `get_epoch` / `get_committee`
372/// and Move type-layout resolution work immediately after restore
373/// rather than only once the backfill reaches the epoch's boundary.
374/// `objects` is read through the [`ObjectStore`] trait, so the
375/// embedded caller passes the validator's perpetual store directly
376/// (this crate stays free of any `sui-core` dependency).
377///
378/// Stamps the singleton `pruning_watermark` at the lowest available
379/// checkpoint `L` — the first checkpoint the backfill will index,
380/// `history_watermark.checkpoint_hi_inclusive + 1` — with
381/// `tx_seq_lo = history_watermark.tx_hi` (the first `tx_seq` that
382/// checkpoint contributes). This records that nothing below `L` is
383/// available and sets the bitmap compaction filter's floor. The
384/// backfill only ever writes `tx_seq` at or above the floor, so the
385/// filter drops nothing it produces. (The *upper* bound of history
386/// availability while a backfill is in progress is a separate
387/// concern, handled elsewhere.)
388///
389/// Idempotent: re-running overwrites the same rows. Does not touch
390/// the live cohort or the deactivated (perpetual-store-served) CFs.
391pub fn seed_history_cohort(
392    db: &Db,
393    schema: &RpcStoreSchema,
394    history_watermark: Watermark,
395    chain_id: ChainId,
396    objects: Option<&dyn ObjectStore>,
397) -> anyhow::Result<()> {
398    let framework = FrameworkSchema::new(db.clone());
399    let mut batch = db.batch();
400
401    for name in HISTORY_COHORT {
402        let key = PipelineTaskKey::new(*name);
403        batch
404            .put(&framework.watermarks, &key, &history_watermark)
405            .with_context(|| format!("stage __watermark for {name:?}"))?;
406        batch
407            .put(&framework.chain_ids, &key, &chain_id)
408            .with_context(|| format!("stage __chain_id for {name:?}"))?;
409    }
410
411    // Stamp the pruning floor at the lowest available checkpoint `L`
412    // (the first checkpoint the backfill will index). `tx_seq_lo` is
413    // the tx count through the seed point, i.e. the first `tx_seq`
414    // checkpoint `L` contributes, so the bitmap compaction filter
415    // drops nothing the backfill produces.
416    let tx_seq_lo = history_watermark.tx_hi;
417    let (k, v) = pruning_watermark::store(&pruning_watermark::Watermarks {
418        tx_seq_lo,
419        checkpoint_lo: history_watermark.checkpoint_hi_inclusive.saturating_add(1),
420    });
421    batch
422        .put(&schema.pruning_watermark, &k, &v)
423        .context("stage pruning_watermark row")?;
424
425    if let Some(objects) = objects {
426        // Mid-epoch restore: the epoch's first checkpoint precedes the
427        // tip, so seed a partial start record (no `start_checkpoint`).
428        match seed_current_epoch_start(schema, objects, None, &mut batch) {
429            Ok(epoch) => info!(epoch, "seeded partial start record for the current epoch"),
430            Err(e) => warn!(
431                error = %e,
432                "could not seed the current epoch's start record; get_epoch / \
433                 get_committee / Move type-layout resolution for the current epoch \
434                 will be unavailable until the backfill reaches its boundary",
435            ),
436        }
437    }
438
439    batch.commit().context("commit history-cohort seed batch")?;
440
441    // Mirror the on-disk floor into the process-wide atomic so any
442    // bitmap compaction-filter clones in this process see it
443    // immediately (a subsequent `Indexer::from_store` also calls
444    // `refresh_pruning_atomics`).
445    schema.set_pruning_floor(tx_seq_lo);
446
447    Ok(())
448}
449
450#[cfg(test)]
451mod tests {
452    use async_trait::async_trait;
453    use bytes::Bytes;
454    use futures::StreamExt;
455    use futures::stream;
456    use futures::stream::BoxStream;
457    use sui_consistent_store::ChainId;
458    use sui_consistent_store::Db;
459    use sui_consistent_store::DbOptions;
460    use sui_consistent_store::PipelineTaskKey;
461    use sui_consistent_store::Watermark;
462    use sui_consistent_store::restore::RestoreChunk;
463    use sui_consistent_store::restore_state;
464    use sui_indexer_alt_framework::pipeline::Processor;
465    use sui_types::base_types::ObjectID;
466    use sui_types::base_types::SuiAddress;
467    use sui_types::object::Object;
468
469    use super::*;
470    use crate::RpcStoreSchema;
471    use crate::indexer::objects::Objects;
472    use crate::schema::object_by_owner::OwnerKind;
473
474    /// Minimal [`RestoreSource`] that wraps a `Vec<RestoreChunk>`
475    /// and uses the 4-byte BE chunk index as cursor. Lets us
476    /// drive the end-to-end pipeline registration / commit path
477    /// without standing up a real snapshot.
478    struct VecSource {
479        target: u64,
480        chain_id: ChainId,
481        chunks: Vec<RestoreChunk>,
482    }
483
484    impl VecSource {
485        fn from_objects(target: u64, chain_id: ChainId, objects: Vec<Vec<Object>>) -> Self {
486            let chunks = objects
487                .into_iter()
488                .enumerate()
489                .map(|(i, objs)| RestoreChunk {
490                    objects: objs,
491                    cursor: Bytes::copy_from_slice(&(i as u32).to_be_bytes()),
492                })
493                .collect();
494            Self {
495                target,
496                chain_id,
497                chunks,
498            }
499        }
500    }
501
502    #[async_trait]
503    impl RestoreSource for VecSource {
504        fn target_checkpoint(&self) -> u64 {
505            self.target
506        }
507
508        fn target_chain_id(&self) -> ChainId {
509            self.chain_id
510        }
511
512        fn shards(&self) -> u32 {
513            1
514        }
515
516        fn stream(
517            &self,
518            shard_id: u32,
519            cursor: Option<Bytes>,
520        ) -> BoxStream<'_, anyhow::Result<RestoreChunk>> {
521            assert_eq!(shard_id, 0);
522            let resume_after = cursor.map(|c| {
523                let mut buf = [0u8; 4];
524                buf.copy_from_slice(&c[..4]);
525                u32::from_be_bytes(buf)
526            });
527            let chunks: Vec<_> = self
528                .chunks
529                .iter()
530                .enumerate()
531                .filter_map(|(i, chunk)| {
532                    let i = i as u32;
533                    if let Some(after) = resume_after
534                        && i <= after
535                    {
536                        None
537                    } else {
538                        Some(Ok(RestoreChunk {
539                            objects: chunk.objects.clone(),
540                            cursor: chunk.cursor.clone(),
541                        }))
542                    }
543                })
544                .collect();
545            stream::iter(chunks).boxed()
546        }
547    }
548
549    /// End-to-end: drive a handful of address-owned objects
550    /// through every registered pipeline. Verifies that the
551    /// rows we expect end up in `object_version_by_checkpoint` and
552    /// `object_by_owner`, and that every pipeline's
553    /// `__restore` / `__watermark` rows are set up for the
554    /// tip-indexer to take over.
555    #[tokio::test]
556    async fn restore_indexes_populates_schema_and_finalises() {
557        let dir = tempfile::tempdir().unwrap();
558        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
559        let schema = Arc::new(schema);
560
561        let owner = SuiAddress::random_for_testing_only();
562        let objects: Vec<Object> = (1..=4u8)
563            .map(|i| Object::with_id_owner_for_testing(ObjectID::from_single_byte(i), owner))
564            .collect();
565
566        let chain_id = ChainId([7u8; 32]);
567        let source = VecSource::from_objects(123, chain_id, vec![objects.clone()]);
568
569        restore_indexes(
570            db.clone(),
571            schema.clone(),
572            source,
573            RestoreDriverConfig::default(),
574            RestoreLayer::indexes_only(),
575            RestoreMetrics::new(None, &prometheus::Registry::new()),
576        )
577        .unwrap()
578        .shutdown()
579        .await
580        .unwrap();
581
582        // Each object's restore floor row landed in the checkpoint-pinned
583        // index at the anchor checkpoint (123).
584        for o in &objects {
585            assert_eq!(
586                schema
587                    .get_object_version_at_checkpoint(o.id(), 123)
588                    .unwrap(),
589                Some(o.version()),
590            );
591        }
592
593        // Owner index has every object under the same
594        // AddressOwner(owner) key.
595        let owned: Vec<(OwnerKind, ObjectID)> = schema
596            .iter_objects_owned_by_address(owner)
597            .unwrap()
598            .map(Result::unwrap)
599            .map(|(key, _v)| (key.kind, key.object_id))
600            .collect();
601        let mut got_ids: Vec<_> = owned.iter().map(|(_, id)| *id).collect();
602        got_ids.sort();
603        let mut expected_ids: Vec<_> = objects.iter().map(|o| o.id()).collect();
604        expected_ids.sort();
605        assert_eq!(got_ids, expected_ids);
606        for (kind, _) in &owned {
607            assert!(matches!(kind, OwnerKind::AddressOwner(addr) if *addr == owner));
608        }
609
610        // `indexes_only` did not register `objects`, so the
611        // `(id, version)` CF stays empty.
612        for o in &objects {
613            assert_eq!(schema.get_object_by_key(o.id(), o.version()).unwrap(), None,);
614        }
615
616        // Every registered pipeline finished and has __restore
617        // Complete, __watermark, and __chain_id all set. `objects`
618        // was not registered with `indexes_only`, so it has no
619        // __restore row at all.
620        for name in [
621            ObjectVersionByCheckpoint::NAME,
622            ObjectByOwner::NAME,
623            ObjectByType::NAME,
624            Balance::NAME,
625            PackageVersions::NAME,
626        ] {
627            let key = PipelineTaskKey::new(name);
628            let state = db.framework().restore.get(&key).unwrap().unwrap();
629            match state.state.unwrap() {
630                restore_state::State::Complete(c) => assert_eq!(c.restored_at, 123),
631                other => panic!("expected Complete, got {other:?}"),
632            }
633            let wm = db.framework().watermarks.get(&key).unwrap().unwrap();
634            assert_eq!(wm, Watermark::for_checkpoint(123));
635            let pinned_chain_id = db.framework().chain_ids.get(&key).unwrap().unwrap();
636            assert_eq!(pinned_chain_id, chain_id);
637        }
638        let objects_key = PipelineTaskKey::new(Objects::NAME);
639        assert!(
640            db.framework().restore.get(&objects_key).unwrap().is_none(),
641            "indexes_only should leave the objects pipeline unregistered",
642        );
643    }
644
645    /// `floor_unrestored_pipelines` writes a `__watermark` /
646    /// `__chain_id` row for every pipeline outside the restored
647    /// set and stamps the singleton `pruning_watermark` so the
648    /// available range tracks the post-restore floor.
649    #[test]
650    fn floor_unrestored_pipelines_writes_watermarks_for_tip_only_pipelines() {
651        let dir = tempfile::tempdir().unwrap();
652        let (db, _schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
653
654        let chain_id = ChainId([42u8; 32]);
655        let target = Watermark {
656            epoch_hi_inclusive: 7,
657            checkpoint_hi_inclusive: 1_000,
658            tx_hi: 5_000,
659            timestamp_ms_hi_inclusive: 1_700_000_000_000,
660        };
661
662        floor_unrestored_pipelines(&db, target, chain_id, &RestoreLayer::all()).unwrap();
663
664        // Sample raw-chain-data / bitmap pipelines that the
665        // formal-snapshot path doesn't cover — every one of them
666        // should be primed with the target watermark + chain id.
667        for name in [
668            Epochs::NAME,
669            CheckpointSummary::NAME,
670            CheckpointContents::NAME,
671            CheckpointSeqByDigest::NAME,
672            Transactions::NAME,
673            TxSeqByDigest::NAME,
674            TxMetadataBySeq::NAME,
675            Effects::NAME,
676            Events::NAME,
677            TransactionBitmap::NAME,
678            EventBitmap::NAME,
679        ] {
680            let key = PipelineTaskKey::new(name);
681            assert_eq!(
682                db.framework().watermarks.get(&key).unwrap(),
683                Some(target),
684                "{name} should have the post-restore watermark",
685            );
686            assert_eq!(
687                db.framework().chain_ids.get(&key).unwrap(),
688                Some(chain_id),
689                "{name} should pin the restored chain id",
690            );
691        }
692
693        // Restored pipelines are left to whatever the restore
694        // driver wrote (here: nothing, since we didn't actually
695        // run a restore in this test). The helper must not
696        // clobber them.
697        for name in [
698            ObjectVersionByCheckpoint::NAME,
699            ObjectByOwner::NAME,
700            ObjectByType::NAME,
701            Balance::NAME,
702            PackageVersions::NAME,
703            Objects::NAME,
704        ] {
705            let key = PipelineTaskKey::new(name);
706            assert!(
707                db.framework().watermarks.get(&key).unwrap().is_none(),
708                "{name} watermark should be untouched by the floor helper",
709            );
710            assert!(
711                db.framework().chain_ids.get(&key).unwrap().is_none(),
712                "{name} chain id should be untouched by the floor helper",
713            );
714        }
715
716        // Pruning singleton reflects the post-restore floor: tx
717        // ids and checkpoint sequences below this row aren't
718        // available in the new database.
719        let schema = RpcStoreSchema::open(&db).unwrap();
720        assert_eq!(
721            schema.get_pruning_watermarks().unwrap(),
722            Some(crate::schema::pruning_watermark::Watermarks {
723                tx_seq_lo: target.tx_hi,
724                checkpoint_lo: target.checkpoint_hi_inclusive + 1,
725            }),
726        );
727    }
728
729    /// With `RestoreLayer::indexes_only`, the `objects` pipeline
730    /// is *not* in the restored set, so the floor helper primes
731    /// it the same way it does the raw-chain-data pipelines.
732    #[test]
733    fn floor_unrestored_pipelines_includes_objects_when_layer_skips_it() {
734        let dir = tempfile::tempdir().unwrap();
735        let (db, _schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
736
737        let chain_id = ChainId([11u8; 32]);
738        let target = Watermark::for_checkpoint(42);
739
740        floor_unrestored_pipelines(&db, target, chain_id, &RestoreLayer::indexes_only()).unwrap();
741
742        let key = PipelineTaskKey::new(Objects::NAME);
743        assert_eq!(db.framework().watermarks.get(&key).unwrap(), Some(target),);
744        assert_eq!(db.framework().chain_ids.get(&key).unwrap(), Some(chain_id));
745    }
746
747    /// `RestoreLayer::all` additionally registers the `objects`
748    /// pipeline, so every restored live object lands in the
749    /// `(id, version)` CF and the pipeline itself transitions to
750    /// `Complete`.
751    #[tokio::test]
752    async fn restore_indexes_with_objects_layer_populates_objects_cf() {
753        let dir = tempfile::tempdir().unwrap();
754        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
755        let schema = Arc::new(schema);
756
757        let owner = SuiAddress::random_for_testing_only();
758        let objects: Vec<Object> = (1..=4u8)
759            .map(|i| Object::with_id_owner_for_testing(ObjectID::from_single_byte(i), owner))
760            .collect();
761
762        let chain_id = ChainId([9u8; 32]);
763        let source = VecSource::from_objects(123, chain_id, vec![objects.clone()]);
764
765        restore_indexes(
766            db.clone(),
767            schema.clone(),
768            source,
769            RestoreDriverConfig::default(),
770            RestoreLayer::all(),
771            RestoreMetrics::new(None, &prometheus::Registry::new()),
772        )
773        .unwrap()
774        .shutdown()
775        .await
776        .unwrap();
777
778        // Every object lands at its current version in `objects`.
779        for o in &objects {
780            assert_eq!(
781                schema.get_object_by_key(o.id(), o.version()).unwrap(),
782                Some(o.clone()),
783            );
784        }
785
786        // The `objects` pipeline's __restore / __watermark /
787        // __chain_id rows all match the source target.
788        let key = PipelineTaskKey::new(Objects::NAME);
789        let state = db.framework().restore.get(&key).unwrap().unwrap();
790        match state.state.unwrap() {
791            restore_state::State::Complete(c) => assert_eq!(c.restored_at, 123),
792            other => panic!("expected Complete, got {other:?}"),
793        }
794        assert_eq!(
795            db.framework().watermarks.get(&key).unwrap().unwrap(),
796            Watermark::for_checkpoint(123),
797        );
798        assert_eq!(
799            db.framework().chain_ids.get(&key).unwrap().unwrap(),
800            chain_id,
801        );
802    }
803
804    /// `seed_history_cohort` primes every history-cohort pipeline with
805    /// the seed watermark and the chain id, stamps the pruning floor at
806    /// the lowest available checkpoint `L`, and leaves the live cohort
807    /// untouched (the restore driver owns it).
808    #[test]
809    fn seed_history_cohort_seeds_history_watermarks_and_pruning_floor() {
810        let dir = tempfile::tempdir().unwrap();
811        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
812
813        let chain_id = ChainId([5u8; 32]);
814        // Seed point: committed through checkpoint 999 / tx 5000, so the
815        // backfill resumes at (and the floor is) checkpoint 1000.
816        let seed = Watermark {
817            checkpoint_hi_inclusive: 999,
818            tx_hi: 5_000,
819            ..Watermark::default()
820        };
821        seed_history_cohort(&db, &schema, seed, chain_id, None).unwrap();
822
823        // History cohort resumes from the seed point, pinned to the
824        // chain.
825        for name in HISTORY_COHORT {
826            let key = PipelineTaskKey::new(*name);
827            assert_eq!(
828                db.framework().watermarks.get(&key).unwrap(),
829                Some(seed),
830                "{name} should resume from the seed point",
831            );
832            assert_eq!(
833                db.framework().chain_ids.get(&key).unwrap(),
834                Some(chain_id),
835                "{name} should pin the chain id",
836            );
837        }
838
839        // Live cohort is the restore driver's responsibility — the
840        // history seed must not touch it.
841        for name in LIVE_COHORT {
842            let key = PipelineTaskKey::new(*name);
843            assert!(
844                db.framework().watermarks.get(&key).unwrap().is_none(),
845                "{name} watermark must be untouched by the history seed",
846            );
847        }
848
849        // Pruning floor sits at the lowest available checkpoint
850        // (seed + 1) and the seed point's tx count.
851        assert_eq!(
852            schema.get_pruning_watermarks().unwrap(),
853            Some(crate::schema::pruning_watermark::Watermarks {
854                tx_seq_lo: 5_000,
855                checkpoint_lo: 1_000,
856            }),
857        );
858    }
859
860    /// The live and history cohorts are disjoint and each has the
861    /// expected size. (Their union being exactly the embedded layer's
862    /// enabled set is pinned by `embedded_registers_only_cohort_pipelines`.)
863    #[test]
864    fn cohorts_are_disjoint() {
865        let live: std::collections::BTreeSet<_> = LIVE_COHORT.iter().collect();
866        let history: std::collections::BTreeSet<_> = HISTORY_COHORT.iter().collect();
867        assert!(live.is_disjoint(&history), "cohorts must not overlap");
868        assert_eq!(live.len(), 3);
869        assert_eq!(history.len(), 7);
870    }
871}