sui_rpc_store/indexer/restore.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Entry point for bulk-loading the [`RpcStoreSchema`]'s
5//! derived-index CFs from a [`RestoreSource`].
6//!
7//! Registers the three live-object-derivable index pipelines
8//! ([`ObjectByOwner`], [`ObjectByType`], [`Balance`]) plus the
9//! [`ObjectVersionByCheckpoint`] and [`PackageVersions`] floor rows —
10//! and, when the caller's [`RestoreLayer`] opts in, the raw
11//! [`Objects`] CF — against a single [`RestoreDriver`] and returns a
12//! [`Service`] driving the restore through to completion. Once
13//! finished, every registered
14//! pipeline's `__restore` row is `Complete` and its `__watermark`
15//! row is set to the source's target, so the regular
16//! [`Indexer::add_pipelines`] path will accept them for tip
17//! indexing.
18//!
19//! Restoration is run separately from tip indexing — open the
20//! database, call [`restore_indexes`] to populate the indexes,
21//! then construct an [`Indexer`] over the same store to start
22//! tip-following.
23//!
24//! [`Indexer`]: crate::Indexer
25//! [`Indexer::add_pipelines`]: crate::Indexer::add_pipelines
26
27use std::sync::Arc;
28
29use anyhow::Context as _;
30use sui_consistent_store::Batch;
31use sui_consistent_store::ChainId;
32use sui_consistent_store::Db;
33use sui_consistent_store::FrameworkSchema;
34use sui_consistent_store::PipelineTaskKey;
35use sui_consistent_store::Schema as _;
36use sui_consistent_store::Watermark;
37use sui_consistent_store::restore::RestoreDriver;
38use sui_consistent_store::restore::RestoreDriverConfig;
39use sui_consistent_store::restore::RestoreSource;
40use sui_consistent_store::restore::metrics::RestoreMetrics;
41use sui_futures::service::Service;
42use sui_indexer_alt_framework::pipeline::Processor;
43use sui_types::storage::ObjectStore;
44use sui_types::sui_system_state::SuiSystemStateTrait;
45use sui_types::sui_system_state::get_sui_system_state;
46use tracing::info;
47use tracing::warn;
48
49use crate::RestoreLayer;
50use crate::RpcStoreReader;
51use crate::RpcStoreSchema;
52use crate::indexer::balance::Balance;
53use crate::indexer::checkpoint_contents::CheckpointContents;
54use crate::indexer::checkpoint_seq_by_digest::CheckpointSeqByDigest;
55use crate::indexer::checkpoint_summary::CheckpointSummary;
56use crate::indexer::effects::Effects;
57use crate::indexer::epochs::Epochs;
58use crate::indexer::event_bitmap::EventBitmap;
59use crate::indexer::events::Events;
60use crate::indexer::object_by_owner::ObjectByOwner;
61use crate::indexer::object_by_type::ObjectByType;
62use crate::indexer::object_version_by_checkpoint::ObjectVersionByCheckpoint;
63use crate::indexer::objects::Objects;
64use crate::indexer::package_versions::PackageVersions;
65use crate::indexer::transaction_bitmap::TransactionBitmap;
66use crate::indexer::transactions::Transactions;
67use crate::indexer::tx_metadata_by_seq::TxMetadataBySeq;
68use crate::indexer::tx_seq_by_digest::TxSeqByDigest;
69use crate::schema::epochs;
70use crate::schema::primitives::U64Be;
71use crate::schema::pruning_watermark;
72
73/// The embedded fullnode's **live cohort**: the pipelines that
74/// [`restore_indexes`] bulk-loads and that are restored to the
75/// perpetual store's tip `T`, then follow live from there. They are
76/// bounded by the live object set, so a snapshot restore reproduces
77/// them exactly.
78///
79/// Matches the live half of
80/// [`PipelineLayer::embedded`](crate::config::PipelineLayer::embedded);
81/// the `embedded_registers_only_cohort_pipelines` test pins the two
82/// together.
83pub const LIVE_COHORT: &[&str] = &[ObjectByOwner::NAME, ObjectByType::NAME, Balance::NAME];
84
85/// The embedded fullnode's **history cohort**: the pipelines seeded to
86/// the lowest available checkpoint `L` and backfilled upward from the
87/// perpetual store, then followed live.
88///
89/// Most cannot be reconstructed from a live-object snapshot at all --
90/// they record ledger history (`tx_seq` <-> digest maps, the
91/// transaction and event bitmaps) and per-epoch metadata (`epochs`) --
92/// so they are seeded, never restored.
93///
94/// `object_version_by_checkpoint` and `package_versions` are the
95/// exceptions: they are *both* restored and backfilled.
96/// [`restore_indexes`] bulk-loads their floor rows at the tip `T` (the
97/// versions live in the snapshot but predate the available window, so a
98/// checkpoint-bounded read treats them as having always existed), and
99/// the history seed then rewinds their `__watermark` to `L-1` so they
100/// also backfill the per-checkpoint detail over `(L, T]` --
101/// `object_version_by_checkpoint`'s per-checkpoint changes, and
102/// `package_versions`'s real publish checkpoint for versions published
103/// in the window. The embedded bootstrap runs the restore before the
104/// seed, so the `L-1` watermark wins.
105///
106/// Matches the history half of
107/// [`PipelineLayer::embedded`](crate::config::PipelineLayer::embedded).
108pub const HISTORY_COHORT: &[&str] = &[
109 Epochs::NAME,
110 ObjectVersionByCheckpoint::NAME,
111 PackageVersions::NAME,
112 TxSeqByDigest::NAME,
113 TxMetadataBySeq::NAME,
114 TransactionBitmap::NAME,
115 EventBitmap::NAME,
116];
117
118/// Register every [`Restore`]-implementing pipeline opted in by
119/// `layer` on a [`RestoreDriver`] bound to `db` / `schema` and
120/// `source`, then run the resulting [`Service`].
121///
122/// The live-cohort pipelines are always registered, plus
123/// `object_version_by_checkpoint` and `package_versions` -- history-
124/// cohort members whose floor rows are bulk-loaded from the live set
125/// here (the history seed separately rewinds their watermarks so they
126/// also backfill `(L, T]`). The raw [`Objects`] pipeline is only
127/// registered when `layer.objects` is set. The returned `Service`'s
128/// primary task completes once every registered pipeline transitions to
129/// [`RestoreState::Complete`].
130///
131/// [`Restore`]: sui_consistent_store::Restore
132/// [`RestoreState::Complete`]: sui_consistent_store::restore_state::Complete
133pub fn restore_indexes<Src: RestoreSource>(
134 db: Db,
135 schema: Arc<RpcStoreSchema>,
136 source: Src,
137 config: RestoreDriverConfig,
138 layer: RestoreLayer,
139 metrics: Arc<RestoreMetrics>,
140) -> anyhow::Result<Service> {
141 // Capture the anchor before the driver consumes `source`: the
142 // checkpoint-pinned object index attributes every restored live
143 // object to it.
144 let target_checkpoint = source.target_checkpoint();
145 let mut driver = RestoreDriver::new(db, schema, source, config, metrics);
146 // History-cohort members, but their floor rows are restored from the
147 // live set; the embedded history seed later rewinds their watermarks
148 // so they also backfill `(L, T]`.
149 driver.register(ObjectVersionByCheckpoint::for_restore(target_checkpoint))?;
150 driver.register(PackageVersions)?;
151 driver.register(ObjectByOwner)?;
152 driver.register(ObjectByType)?;
153 driver.register(Balance)?;
154 if layer.objects {
155 driver.register(Objects)?;
156 }
157 driver.run()
158}
159
160/// After [`restore_indexes`] returns, prime the framework state of
161/// every pipeline that the restore did *not* cover so tip indexing
162/// resumes from `target_watermark.checkpoint_hi_inclusive + 1`
163/// across the board instead of replaying from genesis for the
164/// raw-chain-data and bitmap pipelines.
165///
166/// Specifically, for every pipeline not in `layer`'s restored
167/// set, writes:
168///
169/// - `__watermark = target_watermark` — the framework's
170/// tip-resume reads this and starts at
171/// `checkpoint_hi_inclusive + 1`.
172/// - `__chain_id = target_chain_id` — pins the pipeline to the
173/// chain the snapshot was taken from, matching what
174/// [`restore_indexes`]'s finalize step already wrote for the
175/// restored pipelines.
176///
177/// Also writes the singleton `pruning_watermark` so
178/// `available_range` queries and the bitmap CFs' compaction
179/// filters reflect that data only starts at the post-restore
180/// floor (`tx_seq_lo = target_watermark.tx_hi`,
181/// `checkpoint_lo = checkpoint_hi_inclusive + 1`).
182///
183/// Idempotent: re-running after a successful restore overwrites
184/// the unrestored pipelines' watermarks with the same values and
185/// re-writes the pruning row.
186pub fn floor_unrestored_pipelines(
187 db: &Db,
188 target_watermark: Watermark,
189 target_chain_id: ChainId,
190 layer: &RestoreLayer,
191) -> anyhow::Result<()> {
192 let restored: &[&'static str] = if layer.objects {
193 &[
194 ObjectVersionByCheckpoint::NAME,
195 ObjectByOwner::NAME,
196 ObjectByType::NAME,
197 Balance::NAME,
198 PackageVersions::NAME,
199 Objects::NAME,
200 ]
201 } else {
202 &[
203 ObjectVersionByCheckpoint::NAME,
204 ObjectByOwner::NAME,
205 ObjectByType::NAME,
206 Balance::NAME,
207 PackageVersions::NAME,
208 ]
209 };
210
211 // Every rpc-store pipeline. Kept exhaustive so any new
212 // pipeline added to `PipelineLayer` needs an explicit
213 // decision here about whether it's a restore-time pipeline
214 // or a tip-only one.
215 let all: &[&'static str] = &[
216 Epochs::NAME,
217 CheckpointSummary::NAME,
218 CheckpointContents::NAME,
219 CheckpointSeqByDigest::NAME,
220 Transactions::NAME,
221 TxSeqByDigest::NAME,
222 TxMetadataBySeq::NAME,
223 Effects::NAME,
224 Events::NAME,
225 Objects::NAME,
226 ObjectVersionByCheckpoint::NAME,
227 ObjectByOwner::NAME,
228 ObjectByType::NAME,
229 Balance::NAME,
230 PackageVersions::NAME,
231 TransactionBitmap::NAME,
232 EventBitmap::NAME,
233 ];
234
235 // Use the owned `FrameworkSchema` over `Db` (rather than the
236 // borrowed view from `Db::framework`) so the `DbMap`s line up
237 // with `Batch::put`'s `R = Db` expectation.
238 let framework = FrameworkSchema::new(db.clone());
239 let mut batch = db.batch();
240 for name in all.iter().filter(|n| !restored.contains(n)) {
241 let key = PipelineTaskKey::new(*name);
242 batch
243 .put(&framework.watermarks, &key, &target_watermark)
244 .with_context(|| format!("stage __watermark for {name:?}"))?;
245 batch
246 .put(&framework.chain_ids, &key, &target_chain_id)
247 .with_context(|| format!("stage __chain_id for {name:?}"))?;
248 }
249
250 // Resolve the rpc-store schema handle once for the
251 // pruning-watermark CF. The schema is cheap to re-bind to a
252 // live `Db` and gives the typed `store` helper plus the
253 // pruning-floor setter the bitmap CFs depend on.
254 let schema =
255 Arc::new(RpcStoreSchema::open(db).context("re-open RpcStoreSchema for pruning watermark")?);
256 let (k, v) = pruning_watermark::store(&pruning_watermark::Watermarks {
257 tx_seq_lo: target_watermark.tx_hi,
258 checkpoint_lo: target_watermark.checkpoint_hi_inclusive.saturating_add(1),
259 });
260 batch
261 .put(&schema.pruning_watermark, &k, &v)
262 .context("stage pruning_watermark row")?;
263
264 // Seed the `epochs` row for the epoch the snapshot lands in. The
265 // chain advanced to it at the anchor's end-of-epoch checkpoint,
266 // but the `epochs` pipeline only emits a start record while
267 // processing such a checkpoint, which tip indexing skips on
268 // resume (it starts at anchor + 1). Without this seed, the
269 // current epoch's row would never get its protocol version, gas
270 // price, start timestamp, start checkpoint, or system state.
271 // Requires the raw objects to read the on-chain `SuiSystemState`,
272 // so it only runs when the `objects` CF was restored; failure to
273 // read it is logged and skipped rather than failing the restore.
274 if layer.objects {
275 let reader = RpcStoreReader::new(db.clone(), schema.clone());
276 let start_checkpoint = target_watermark.checkpoint_hi_inclusive.saturating_add(1);
277 match seed_current_epoch_start(&schema, &reader, Some(start_checkpoint), &mut batch) {
278 Ok(epoch) => info!(
279 epoch,
280 start_checkpoint, "seeded start record for restore epoch"
281 ),
282 Err(e) => warn!(
283 error = %e,
284 "could not seed the restore epoch's start record; get_epoch / \
285 get_committee / Move type-layout resolution for the current epoch \
286 will be unavailable until the next epoch boundary",
287 ),
288 }
289 }
290
291 batch.commit().context("commit floor batch")?;
292
293 // Mirror the on-disk floor into the process-wide atomic so any
294 // compaction-filter clones started in this process see the
295 // updated value immediately. A subsequent `Indexer::from_store`
296 // also calls `refresh_pruning_atomics` so cross-process boots
297 // converge.
298 schema.set_pruning_floor(target_watermark.tx_hi);
299
300 Ok(())
301}
302
303/// Stage a start record for the epoch reflected by the on-chain
304/// `SuiSystemState` in `objects`, keyed by that epoch.
305///
306/// The `epochs` pipeline derives a start record only from an
307/// end-of-epoch checkpoint's `epoch_info`, which a restore-then-tip
308/// flow never processes (tip indexing resumes at `anchor + 1`), so a
309/// freshly restored database has no start record for the epoch it
310/// landed in. This reconstructs that record straight from the
311/// restored object set: protocol version, reference gas price, and
312/// epoch-start timestamp come from the `SuiSystemState`, the BCS of
313/// which is stored so `get_committee` and Move type-layout resolution
314/// work too.
315///
316/// `start_checkpoint` is supplied by the caller and may be `None`.
317/// The formal-snapshot restore lands at an epoch boundary, so it
318/// passes `Some(anchor + 1)`. The embedded-fullnode restore lands at
319/// a *mid-epoch* tip, so the epoch's first checkpoint is unknown and
320/// it passes `None`; the upward backfill fills `start_checkpoint` in
321/// later if that boundary falls within the available range.
322///
323/// Stages a merge into `batch`; the caller commits. Returns the epoch
324/// that was seeded.
325pub fn seed_current_epoch_start(
326 schema: &RpcStoreSchema,
327 objects: &dyn ObjectStore,
328 start_checkpoint: Option<u64>,
329 batch: &mut Batch,
330) -> anyhow::Result<u64> {
331 let system_state =
332 get_sui_system_state(objects).context("read SuiSystemState from restored objects")?;
333 let epoch = system_state.epoch();
334 let system_state_bcs = bcs::to_bytes(&system_state).context("bcs encode SuiSystemState")?;
335 batch
336 .merge(
337 &schema.epochs,
338 &U64Be(epoch),
339 &epochs::start(
340 system_state.protocol_version(),
341 system_state.reference_gas_price(),
342 system_state.epoch_start_timestamp_ms(),
343 start_checkpoint,
344 Some(system_state_bcs),
345 ),
346 )
347 .context("stage epoch start seed")?;
348 Ok(epoch)
349}
350
351/// Seed the framework state for the embedded fullnode's
352/// [`HISTORY_COHORT`] after [`restore_indexes`] has bulk-loaded the
353/// [`LIVE_COHORT`].
354///
355/// The live cohort resumes from the restore target `T` (written by
356/// the restore driver's finalize step). The history cohort is *not*
357/// restored; instead each of its pipelines is seeded to
358/// `history_watermark` — the lowest available checkpoint `L` in the
359/// perpetual store — so tip indexing backfills `(L, T]` from the
360/// perpetual store and then follows live. For each history pipeline
361/// this writes:
362///
363/// - `__watermark = history_watermark` — the framework resumes at
364/// `history_watermark.checkpoint_hi_inclusive + 1`.
365/// - `__chain_id = chain_id` — pins the chain, matching what the
366/// restore driver wrote for the live cohort.
367///
368/// When `objects` is supplied, also seeds the current epoch's
369/// `epochs` row from its on-chain `SuiSystemState` — a *partial*
370/// start record without `start_checkpoint` (see
371/// [`seed_current_epoch_start`]) — so `get_epoch` / `get_committee`
372/// and Move type-layout resolution work immediately after restore
373/// rather than only once the backfill reaches the epoch's boundary.
374/// `objects` is read through the [`ObjectStore`] trait, so the
375/// embedded caller passes the validator's perpetual store directly
376/// (this crate stays free of any `sui-core` dependency).
377///
378/// Stamps the singleton `pruning_watermark` at the lowest available
379/// checkpoint `L` — the first checkpoint the backfill will index,
380/// `history_watermark.checkpoint_hi_inclusive + 1` — with
381/// `tx_seq_lo = history_watermark.tx_hi` (the first `tx_seq` that
382/// checkpoint contributes). This records that nothing below `L` is
383/// available and sets the bitmap compaction filter's floor. The
384/// backfill only ever writes `tx_seq` at or above the floor, so the
385/// filter drops nothing it produces. (The *upper* bound of history
386/// availability while a backfill is in progress is a separate
387/// concern, handled elsewhere.)
388///
389/// Idempotent: re-running overwrites the same rows. Does not touch
390/// the live cohort or the deactivated (perpetual-store-served) CFs.
391pub fn seed_history_cohort(
392 db: &Db,
393 schema: &RpcStoreSchema,
394 history_watermark: Watermark,
395 chain_id: ChainId,
396 objects: Option<&dyn ObjectStore>,
397) -> anyhow::Result<()> {
398 let framework = FrameworkSchema::new(db.clone());
399 let mut batch = db.batch();
400
401 for name in HISTORY_COHORT {
402 let key = PipelineTaskKey::new(*name);
403 batch
404 .put(&framework.watermarks, &key, &history_watermark)
405 .with_context(|| format!("stage __watermark for {name:?}"))?;
406 batch
407 .put(&framework.chain_ids, &key, &chain_id)
408 .with_context(|| format!("stage __chain_id for {name:?}"))?;
409 }
410
411 // Stamp the pruning floor at the lowest available checkpoint `L`
412 // (the first checkpoint the backfill will index). `tx_seq_lo` is
413 // the tx count through the seed point, i.e. the first `tx_seq`
414 // checkpoint `L` contributes, so the bitmap compaction filter
415 // drops nothing the backfill produces.
416 let tx_seq_lo = history_watermark.tx_hi;
417 let (k, v) = pruning_watermark::store(&pruning_watermark::Watermarks {
418 tx_seq_lo,
419 checkpoint_lo: history_watermark.checkpoint_hi_inclusive.saturating_add(1),
420 });
421 batch
422 .put(&schema.pruning_watermark, &k, &v)
423 .context("stage pruning_watermark row")?;
424
425 if let Some(objects) = objects {
426 // Mid-epoch restore: the epoch's first checkpoint precedes the
427 // tip, so seed a partial start record (no `start_checkpoint`).
428 match seed_current_epoch_start(schema, objects, None, &mut batch) {
429 Ok(epoch) => info!(epoch, "seeded partial start record for the current epoch"),
430 Err(e) => warn!(
431 error = %e,
432 "could not seed the current epoch's start record; get_epoch / \
433 get_committee / Move type-layout resolution for the current epoch \
434 will be unavailable until the backfill reaches its boundary",
435 ),
436 }
437 }
438
439 batch.commit().context("commit history-cohort seed batch")?;
440
441 // Mirror the on-disk floor into the process-wide atomic so any
442 // bitmap compaction-filter clones in this process see it
443 // immediately (a subsequent `Indexer::from_store` also calls
444 // `refresh_pruning_atomics`).
445 schema.set_pruning_floor(tx_seq_lo);
446
447 Ok(())
448}
449
450#[cfg(test)]
451mod tests {
452 use async_trait::async_trait;
453 use bytes::Bytes;
454 use futures::StreamExt;
455 use futures::stream;
456 use futures::stream::BoxStream;
457 use sui_consistent_store::ChainId;
458 use sui_consistent_store::Db;
459 use sui_consistent_store::DbOptions;
460 use sui_consistent_store::PipelineTaskKey;
461 use sui_consistent_store::Watermark;
462 use sui_consistent_store::restore::RestoreChunk;
463 use sui_consistent_store::restore_state;
464 use sui_indexer_alt_framework::pipeline::Processor;
465 use sui_types::base_types::ObjectID;
466 use sui_types::base_types::SuiAddress;
467 use sui_types::object::Object;
468
469 use super::*;
470 use crate::RpcStoreSchema;
471 use crate::indexer::objects::Objects;
472 use crate::schema::object_by_owner::OwnerKind;
473
474 /// Minimal [`RestoreSource`] that wraps a `Vec<RestoreChunk>`
475 /// and uses the 4-byte BE chunk index as cursor. Lets us
476 /// drive the end-to-end pipeline registration / commit path
477 /// without standing up a real snapshot.
478 struct VecSource {
479 target: u64,
480 chain_id: ChainId,
481 chunks: Vec<RestoreChunk>,
482 }
483
484 impl VecSource {
485 fn from_objects(target: u64, chain_id: ChainId, objects: Vec<Vec<Object>>) -> Self {
486 let chunks = objects
487 .into_iter()
488 .enumerate()
489 .map(|(i, objs)| RestoreChunk {
490 objects: objs,
491 cursor: Bytes::copy_from_slice(&(i as u32).to_be_bytes()),
492 })
493 .collect();
494 Self {
495 target,
496 chain_id,
497 chunks,
498 }
499 }
500 }
501
502 #[async_trait]
503 impl RestoreSource for VecSource {
504 fn target_checkpoint(&self) -> u64 {
505 self.target
506 }
507
508 fn target_chain_id(&self) -> ChainId {
509 self.chain_id
510 }
511
512 fn shards(&self) -> u32 {
513 1
514 }
515
516 fn stream(
517 &self,
518 shard_id: u32,
519 cursor: Option<Bytes>,
520 ) -> BoxStream<'_, anyhow::Result<RestoreChunk>> {
521 assert_eq!(shard_id, 0);
522 let resume_after = cursor.map(|c| {
523 let mut buf = [0u8; 4];
524 buf.copy_from_slice(&c[..4]);
525 u32::from_be_bytes(buf)
526 });
527 let chunks: Vec<_> = self
528 .chunks
529 .iter()
530 .enumerate()
531 .filter_map(|(i, chunk)| {
532 let i = i as u32;
533 if let Some(after) = resume_after
534 && i <= after
535 {
536 None
537 } else {
538 Some(Ok(RestoreChunk {
539 objects: chunk.objects.clone(),
540 cursor: chunk.cursor.clone(),
541 }))
542 }
543 })
544 .collect();
545 stream::iter(chunks).boxed()
546 }
547 }
548
549 /// End-to-end: drive a handful of address-owned objects
550 /// through every registered pipeline. Verifies that the
551 /// rows we expect end up in `object_version_by_checkpoint` and
552 /// `object_by_owner`, and that every pipeline's
553 /// `__restore` / `__watermark` rows are set up for the
554 /// tip-indexer to take over.
555 #[tokio::test]
556 async fn restore_indexes_populates_schema_and_finalises() {
557 let dir = tempfile::tempdir().unwrap();
558 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
559 let schema = Arc::new(schema);
560
561 let owner = SuiAddress::random_for_testing_only();
562 let objects: Vec<Object> = (1..=4u8)
563 .map(|i| Object::with_id_owner_for_testing(ObjectID::from_single_byte(i), owner))
564 .collect();
565
566 let chain_id = ChainId([7u8; 32]);
567 let source = VecSource::from_objects(123, chain_id, vec![objects.clone()]);
568
569 restore_indexes(
570 db.clone(),
571 schema.clone(),
572 source,
573 RestoreDriverConfig::default(),
574 RestoreLayer::indexes_only(),
575 RestoreMetrics::new(None, &prometheus::Registry::new()),
576 )
577 .unwrap()
578 .shutdown()
579 .await
580 .unwrap();
581
582 // Each object's restore floor row landed in the checkpoint-pinned
583 // index at the anchor checkpoint (123).
584 for o in &objects {
585 assert_eq!(
586 schema
587 .get_object_version_at_checkpoint(o.id(), 123)
588 .unwrap(),
589 Some(o.version()),
590 );
591 }
592
593 // Owner index has every object under the same
594 // AddressOwner(owner) key.
595 let owned: Vec<(OwnerKind, ObjectID)> = schema
596 .iter_objects_owned_by_address(owner)
597 .unwrap()
598 .map(Result::unwrap)
599 .map(|(key, _v)| (key.kind, key.object_id))
600 .collect();
601 let mut got_ids: Vec<_> = owned.iter().map(|(_, id)| *id).collect();
602 got_ids.sort();
603 let mut expected_ids: Vec<_> = objects.iter().map(|o| o.id()).collect();
604 expected_ids.sort();
605 assert_eq!(got_ids, expected_ids);
606 for (kind, _) in &owned {
607 assert!(matches!(kind, OwnerKind::AddressOwner(addr) if *addr == owner));
608 }
609
610 // `indexes_only` did not register `objects`, so the
611 // `(id, version)` CF stays empty.
612 for o in &objects {
613 assert_eq!(schema.get_object_by_key(o.id(), o.version()).unwrap(), None,);
614 }
615
616 // Every registered pipeline finished and has __restore
617 // Complete, __watermark, and __chain_id all set. `objects`
618 // was not registered with `indexes_only`, so it has no
619 // __restore row at all.
620 for name in [
621 ObjectVersionByCheckpoint::NAME,
622 ObjectByOwner::NAME,
623 ObjectByType::NAME,
624 Balance::NAME,
625 PackageVersions::NAME,
626 ] {
627 let key = PipelineTaskKey::new(name);
628 let state = db.framework().restore.get(&key).unwrap().unwrap();
629 match state.state.unwrap() {
630 restore_state::State::Complete(c) => assert_eq!(c.restored_at, 123),
631 other => panic!("expected Complete, got {other:?}"),
632 }
633 let wm = db.framework().watermarks.get(&key).unwrap().unwrap();
634 assert_eq!(wm, Watermark::for_checkpoint(123));
635 let pinned_chain_id = db.framework().chain_ids.get(&key).unwrap().unwrap();
636 assert_eq!(pinned_chain_id, chain_id);
637 }
638 let objects_key = PipelineTaskKey::new(Objects::NAME);
639 assert!(
640 db.framework().restore.get(&objects_key).unwrap().is_none(),
641 "indexes_only should leave the objects pipeline unregistered",
642 );
643 }
644
645 /// `floor_unrestored_pipelines` writes a `__watermark` /
646 /// `__chain_id` row for every pipeline outside the restored
647 /// set and stamps the singleton `pruning_watermark` so the
648 /// available range tracks the post-restore floor.
649 #[test]
650 fn floor_unrestored_pipelines_writes_watermarks_for_tip_only_pipelines() {
651 let dir = tempfile::tempdir().unwrap();
652 let (db, _schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
653
654 let chain_id = ChainId([42u8; 32]);
655 let target = Watermark {
656 epoch_hi_inclusive: 7,
657 checkpoint_hi_inclusive: 1_000,
658 tx_hi: 5_000,
659 timestamp_ms_hi_inclusive: 1_700_000_000_000,
660 };
661
662 floor_unrestored_pipelines(&db, target, chain_id, &RestoreLayer::all()).unwrap();
663
664 // Sample raw-chain-data / bitmap pipelines that the
665 // formal-snapshot path doesn't cover — every one of them
666 // should be primed with the target watermark + chain id.
667 for name in [
668 Epochs::NAME,
669 CheckpointSummary::NAME,
670 CheckpointContents::NAME,
671 CheckpointSeqByDigest::NAME,
672 Transactions::NAME,
673 TxSeqByDigest::NAME,
674 TxMetadataBySeq::NAME,
675 Effects::NAME,
676 Events::NAME,
677 TransactionBitmap::NAME,
678 EventBitmap::NAME,
679 ] {
680 let key = PipelineTaskKey::new(name);
681 assert_eq!(
682 db.framework().watermarks.get(&key).unwrap(),
683 Some(target),
684 "{name} should have the post-restore watermark",
685 );
686 assert_eq!(
687 db.framework().chain_ids.get(&key).unwrap(),
688 Some(chain_id),
689 "{name} should pin the restored chain id",
690 );
691 }
692
693 // Restored pipelines are left to whatever the restore
694 // driver wrote (here: nothing, since we didn't actually
695 // run a restore in this test). The helper must not
696 // clobber them.
697 for name in [
698 ObjectVersionByCheckpoint::NAME,
699 ObjectByOwner::NAME,
700 ObjectByType::NAME,
701 Balance::NAME,
702 PackageVersions::NAME,
703 Objects::NAME,
704 ] {
705 let key = PipelineTaskKey::new(name);
706 assert!(
707 db.framework().watermarks.get(&key).unwrap().is_none(),
708 "{name} watermark should be untouched by the floor helper",
709 );
710 assert!(
711 db.framework().chain_ids.get(&key).unwrap().is_none(),
712 "{name} chain id should be untouched by the floor helper",
713 );
714 }
715
716 // Pruning singleton reflects the post-restore floor: tx
717 // ids and checkpoint sequences below this row aren't
718 // available in the new database.
719 let schema = RpcStoreSchema::open(&db).unwrap();
720 assert_eq!(
721 schema.get_pruning_watermarks().unwrap(),
722 Some(crate::schema::pruning_watermark::Watermarks {
723 tx_seq_lo: target.tx_hi,
724 checkpoint_lo: target.checkpoint_hi_inclusive + 1,
725 }),
726 );
727 }
728
729 /// With `RestoreLayer::indexes_only`, the `objects` pipeline
730 /// is *not* in the restored set, so the floor helper primes
731 /// it the same way it does the raw-chain-data pipelines.
732 #[test]
733 fn floor_unrestored_pipelines_includes_objects_when_layer_skips_it() {
734 let dir = tempfile::tempdir().unwrap();
735 let (db, _schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
736
737 let chain_id = ChainId([11u8; 32]);
738 let target = Watermark::for_checkpoint(42);
739
740 floor_unrestored_pipelines(&db, target, chain_id, &RestoreLayer::indexes_only()).unwrap();
741
742 let key = PipelineTaskKey::new(Objects::NAME);
743 assert_eq!(db.framework().watermarks.get(&key).unwrap(), Some(target),);
744 assert_eq!(db.framework().chain_ids.get(&key).unwrap(), Some(chain_id));
745 }
746
747 /// `RestoreLayer::all` additionally registers the `objects`
748 /// pipeline, so every restored live object lands in the
749 /// `(id, version)` CF and the pipeline itself transitions to
750 /// `Complete`.
751 #[tokio::test]
752 async fn restore_indexes_with_objects_layer_populates_objects_cf() {
753 let dir = tempfile::tempdir().unwrap();
754 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
755 let schema = Arc::new(schema);
756
757 let owner = SuiAddress::random_for_testing_only();
758 let objects: Vec<Object> = (1..=4u8)
759 .map(|i| Object::with_id_owner_for_testing(ObjectID::from_single_byte(i), owner))
760 .collect();
761
762 let chain_id = ChainId([9u8; 32]);
763 let source = VecSource::from_objects(123, chain_id, vec![objects.clone()]);
764
765 restore_indexes(
766 db.clone(),
767 schema.clone(),
768 source,
769 RestoreDriverConfig::default(),
770 RestoreLayer::all(),
771 RestoreMetrics::new(None, &prometheus::Registry::new()),
772 )
773 .unwrap()
774 .shutdown()
775 .await
776 .unwrap();
777
778 // Every object lands at its current version in `objects`.
779 for o in &objects {
780 assert_eq!(
781 schema.get_object_by_key(o.id(), o.version()).unwrap(),
782 Some(o.clone()),
783 );
784 }
785
786 // The `objects` pipeline's __restore / __watermark /
787 // __chain_id rows all match the source target.
788 let key = PipelineTaskKey::new(Objects::NAME);
789 let state = db.framework().restore.get(&key).unwrap().unwrap();
790 match state.state.unwrap() {
791 restore_state::State::Complete(c) => assert_eq!(c.restored_at, 123),
792 other => panic!("expected Complete, got {other:?}"),
793 }
794 assert_eq!(
795 db.framework().watermarks.get(&key).unwrap().unwrap(),
796 Watermark::for_checkpoint(123),
797 );
798 assert_eq!(
799 db.framework().chain_ids.get(&key).unwrap().unwrap(),
800 chain_id,
801 );
802 }
803
804 /// `seed_history_cohort` primes every history-cohort pipeline with
805 /// the seed watermark and the chain id, stamps the pruning floor at
806 /// the lowest available checkpoint `L`, and leaves the live cohort
807 /// untouched (the restore driver owns it).
808 #[test]
809 fn seed_history_cohort_seeds_history_watermarks_and_pruning_floor() {
810 let dir = tempfile::tempdir().unwrap();
811 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
812
813 let chain_id = ChainId([5u8; 32]);
814 // Seed point: committed through checkpoint 999 / tx 5000, so the
815 // backfill resumes at (and the floor is) checkpoint 1000.
816 let seed = Watermark {
817 checkpoint_hi_inclusive: 999,
818 tx_hi: 5_000,
819 ..Watermark::default()
820 };
821 seed_history_cohort(&db, &schema, seed, chain_id, None).unwrap();
822
823 // History cohort resumes from the seed point, pinned to the
824 // chain.
825 for name in HISTORY_COHORT {
826 let key = PipelineTaskKey::new(*name);
827 assert_eq!(
828 db.framework().watermarks.get(&key).unwrap(),
829 Some(seed),
830 "{name} should resume from the seed point",
831 );
832 assert_eq!(
833 db.framework().chain_ids.get(&key).unwrap(),
834 Some(chain_id),
835 "{name} should pin the chain id",
836 );
837 }
838
839 // Live cohort is the restore driver's responsibility — the
840 // history seed must not touch it.
841 for name in LIVE_COHORT {
842 let key = PipelineTaskKey::new(*name);
843 assert!(
844 db.framework().watermarks.get(&key).unwrap().is_none(),
845 "{name} watermark must be untouched by the history seed",
846 );
847 }
848
849 // Pruning floor sits at the lowest available checkpoint
850 // (seed + 1) and the seed point's tx count.
851 assert_eq!(
852 schema.get_pruning_watermarks().unwrap(),
853 Some(crate::schema::pruning_watermark::Watermarks {
854 tx_seq_lo: 5_000,
855 checkpoint_lo: 1_000,
856 }),
857 );
858 }
859
860 /// The live and history cohorts are disjoint and each has the
861 /// expected size. (Their union being exactly the embedded layer's
862 /// enabled set is pinned by `embedded_registers_only_cohort_pipelines`.)
863 #[test]
864 fn cohorts_are_disjoint() {
865 let live: std::collections::BTreeSet<_> = LIVE_COHORT.iter().collect();
866 let history: std::collections::BTreeSet<_> = HISTORY_COHORT.iter().collect();
867 assert!(live.is_disjoint(&history), "cohorts must not overlap");
868 assert_eq!(live.len(), 3);
869 assert_eq!(history.len(), 7);
870 }
871}