sui_core/rpc_store_embed.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Startup orchestration for the embedded `sui-rpc-store` indexer.
5//!
6//! When a fullnode is configured with
7//! [`RpcConfig::use_experimental_rpc_store`](sui_config::RpcConfig::use_experimental_rpc_store),
8//! it serves the rpc-api
9//! index surface from an embedded [`sui_rpc_store`] instance instead of
10//! the legacy `rpc-index`. This module owns the lifecycle of that
11//! instance:
12//!
13//! 1. Open the rpc-store database under the node's `db_path()`.
14//! 2. Compare its persisted per-pipeline watermarks against the
15//! perpetual store's currently-available checkpoint range `[L, T]`
16//! (`L` = lowest available, `T` = highest executed) and `decide`
17//! what to do: resume as-is, (re)seed the history cohort, or
18//! (re)restore the live cohort.
19//! 3. Bulk-load the live cohort from the perpetual store and seed the
20//! history cohort when needed (blocking, before the node starts
21//! executing).
22//! 4. Build the read handle the rpc-api serves through, hand the store
23//! to the pruner, and spawn the tip-following indexer fed by the
24//! perpetual store ([`PerpetualStoreIngestionClient`]) and the
25//! checkpoint executor's broadcast stream
26//! ([`BroadcastStreamingClient`]).
27//!
28//! The live cohort (live-object-derivable indexes) is restored to the
29//! tip and follows forward. The history cohort (ledger-history bitmaps,
30//! `tx_seq` maps, per-epoch metadata) is seeded to the lowest available
31//! checkpoint and backfilled upward; the synchronizer's dynamic cohort
32//! lets it catch up to the live frontier without stalling tip
33//! snapshots.
34
35use std::sync::Arc;
36
37use anyhow::Context as _;
38use prometheus::Registry;
39use sui_config::NodeConfig;
40use sui_consistent_store::ChainId;
41use sui_consistent_store::Db;
42use sui_consistent_store::DbOptions;
43use sui_consistent_store::PipelineTaskKey;
44use sui_consistent_store::Watermark;
45use sui_consistent_store::metrics::ColumnFamilyStatsCollector;
46use sui_consistent_store::restore::RestoreDriverConfig;
47use sui_consistent_store::restore::metrics::RestoreMetrics;
48use sui_indexer_alt_framework::IndexerArgs;
49use sui_indexer_alt_framework::ingestion::BoxedStreamingClient;
50use sui_indexer_alt_framework::ingestion::IngestionConfig;
51use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClient;
52use sui_indexer_alt_framework::metrics::IngestionMetrics;
53use sui_indexer_alt_framework::pipeline::CommitterConfig;
54use sui_indexer_alt_framework::service::Service;
55use sui_rpc_store::ConsistencyConfig;
56use sui_rpc_store::HISTORY_COHORT;
57use sui_rpc_store::Indexer;
58use sui_rpc_store::LIVE_COHORT;
59use sui_rpc_store::METRICS_PREFIX;
60use sui_rpc_store::PipelineLayer;
61use sui_rpc_store::RestoreLayer;
62use sui_rpc_store::RpcStoreReader;
63use sui_rpc_store::RpcStoreSchema;
64use sui_rpc_store::Store as RpcStore;
65use sui_rpc_store::default_rocksdb_config;
66use sui_rpc_store::restore_indexes;
67use sui_rpc_store::seed_history_cohort;
68use sui_types::digests::ChainIdentifier;
69use sui_types::full_checkpoint_content::Checkpoint;
70use sui_types::storage::ObjectStore;
71use tokio::sync::broadcast;
72use tracing::error;
73use tracing::info;
74
75use crate::authority::authority_store::AuthorityStore;
76use crate::authority::authority_store_tables::AuthorityPerpetualTables;
77use crate::checkpoints::CheckpointStore;
78use crate::rpc_store_ingestion_client::PerpetualStoreIngestionClient;
79use crate::rpc_store_restore_source::PerpetualStoreRestoreSource;
80use crate::rpc_store_streaming_client::BroadcastStreamingClient;
81use crate::storage::RocksDbStore;
82
83/// Subdirectory of the node's `db_path()` holding the rpc-store.
84const RPC_STORE_DIR: &str = "rpc_store";
85
86/// Number of in-memory snapshots retained for consistent reads.
87/// Mirrors the standalone `sui-rpc-node` default; since a snapshot is
88/// taken at every checkpoint boundary this is roughly a 32-checkpoint
89/// consistency window.
90const SNAPSHOT_CAPACITY: usize = 32;
91
92fn db_options() -> DbOptions {
93 DbOptions {
94 rocksdb: default_rocksdb_config(),
95 snapshot_capacity: SNAPSHOT_CAPACITY,
96 }
97}
98
99/// Open the rpc-store database at `path`.
100#[cfg(not(msim))]
101async fn open_db(path: &std::path::Path) -> anyhow::Result<(Db, RpcStoreSchema)> {
102 Db::open::<RpcStoreSchema>(path, db_options())
103 .context("opening the embedded rpc-store database")
104}
105
106/// Open the rpc-store database at `path`, retrying a transient lock
107/// conflict.
108///
109/// A node restarted on the same `db_path` (the simtest restart path) can
110/// briefly observe the path as locked: the previous instance's RocksDB
111/// teardown frees its in-process lock registry entry inside RocksDB's
112/// native close, on its own threads, and that release is not synchronized
113/// with the drop of the last `Db` handle -- so even after every strong
114/// handle has dropped, the reopen can race the unfinished native teardown.
115/// Retrying is robust where a wait is not (the only operation that can
116/// observe "the path is openable again" is asking RocksDB to open it); a
117/// genuine, persistent failure surfaces once attempts are exhausted.
118/// Mirrors `typed_store::safe_drop_rocksdb`'s retry on the inverse
119/// (destroy-vs-teardown) race.
120///
121/// Simulation-only: under a real runtime `Node::stop` joins the node's
122/// thread, so the prior instance's teardown completes before the restart
123/// and the open never races.
124#[cfg(msim)]
125async fn open_db(path: &std::path::Path) -> anyhow::Result<(Db, RpcStoreSchema)> {
126 const OPEN_ATTEMPTS: usize = 60;
127 const OPEN_RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(100);
128
129 let mut attempt = 1;
130 loop {
131 match Db::open::<RpcStoreSchema>(path, db_options()) {
132 Ok(opened) => return Ok(opened),
133 Err(e) if attempt < OPEN_ATTEMPTS => {
134 tracing::warn!(
135 attempt,
136 "opening the embedded rpc-store database failed, retrying: {e:?}"
137 );
138 attempt += 1;
139 tokio::time::sleep(OPEN_RETRY_DELAY).await;
140 }
141 Err(e) => return Err(e).context("opening the embedded rpc-store database"),
142 }
143 }
144}
145
146/// What the startup orchestration does with the on-disk rpc-store.
147///
148/// The action chosen at startup is retained on [`EmbeddedRpcStore`] and
149/// exposed via [`EmbeddedRpcStore::bootstrap_action`] so tests (and
150/// future introspection surfaces) can tell whether a restart resumed the
151/// existing indexes or rebuilt them.
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum Bootstrap {
154 /// The on-disk state resumes within the available range; open it
155 /// and follow the tip with no blocking work.
156 Resume,
157
158 /// The live cohort is fine, but the history cohort is missing or
159 /// has fallen below the available floor; (re)seed it in place
160 /// without disturbing the live cohort.
161 SeedHistory,
162
163 /// (Re)bulk-load the live cohort from the perpetual store, then
164 /// seed the history cohort. `clear` wipes the database first (for
165 /// out-of-range or wrong-chain data); otherwise the restore
166 /// resumes from any in-progress per-shard cursors.
167 Restore { clear: bool },
168}
169
170/// Decide what bootstrap action the embedded store needs from the
171/// persisted framework state and the perpetual store's available
172/// range.
173///
174/// - `live_resume` is `Some(c)` when every [`LIVE_COHORT`] pipeline
175/// has a committed watermark, where `c` is the lowest checkpoint tip
176/// indexing would resume from across them
177/// (`min(checkpoint_hi_inclusive) + 1`); `None` when any live
178/// pipeline lacks a watermark (never restored, or a restore that did
179/// not finish).
180/// - `history_resume` is the same for the [`HISTORY_COHORT`], but a
181/// missing watermark maps to `0`: an unwatermarked pipeline resumes
182/// at `first_checkpoint`, which the embedded path leaves at its `0`
183/// default, so the history cohort backfills from genesis.
184/// - `chain_matches` is `Some(false)` when the database is bound to a
185/// different chain, `Some(true)` when it matches, and `None` when no
186/// chain id has been recorded yet.
187/// - `lowest_available` is `L`, the lowest checkpoint the perpetual
188/// store can still serve.
189fn decide(
190 live_resume: Option<u64>,
191 history_resume: u64,
192 chain_matches: Option<bool>,
193 lowest_available: u64,
194) -> Bootstrap {
195 // A database bound to another chain is unusable; wipe and rebuild.
196 if chain_matches == Some(false) {
197 return Bootstrap::Restore { clear: true };
198 }
199
200 let Some(live_resume) = live_resume else {
201 // The live cohort never finished restoring. Resume the restore
202 // in place -- the driver picks up from its per-shard cursors --
203 // rather than clearing partial progress.
204 return Bootstrap::Restore { clear: false };
205 };
206
207 // The live cohort's indexes reference checkpoints the perpetual
208 // store has since pruned; the bulk-loaded data is unusable.
209 if live_resume < lowest_available {
210 return Bootstrap::Restore { clear: true };
211 }
212
213 // The live cohort is in range; the history cohort either was never
214 // seeded above the floor or fell behind it. Re-seed it alone.
215 if history_resume < lowest_available {
216 return Bootstrap::SeedHistory;
217 }
218
219 Bootstrap::Resume
220}
221
222/// A bootstrapped embedded rpc-store, ready to hand to the pruner and
223/// the rpc-api read path and to start tip indexing.
224pub struct EmbeddedRpcStore {
225 /// Shared store handle. Cloned for the pruner (via [`Self::store`])
226 /// and for the tip indexer.
227 store: RpcStore,
228
229 /// Read handle exposing the rpc-store's index surface to
230 /// `sui-rpc-api`.
231 reader: RpcStoreReader,
232
233 /// Local checkpoint source for the tip ingestion client.
234 ingestion_source: RocksDbStore,
235
236 chain_id: ChainIdentifier,
237
238 /// The bootstrap action [`decide`] selected for the on-disk store at
239 /// startup. Retained for introspection (see
240 /// [`Self::bootstrap_action`]); does not affect runtime behavior.
241 action: Bootstrap,
242
243 /// Service wrapping the background task that builds and runs the tip
244 /// indexer, populated by [`Self::spawn_indexer`]. Its single primary
245 /// task builds the indexer's inner [`Service`] and joins it; dropping
246 /// this service on node shutdown aborts that task, which in turn drops
247 /// and aborts the inner service's pipeline tasks. This is what keeps
248 /// the indexer from leaking (notably across e2e tests sharing a
249 /// process), so no explicit [`Drop`] is needed.
250 indexer_service: Option<Service>,
251}
252
253impl EmbeddedRpcStore {
254 /// Open the rpc-store, bring it in line with the perpetual store's
255 /// available range (restoring / seeding as needed), and build the
256 /// store and read handles.
257 ///
258 /// Blocks while restoring the live cohort. Call before the node
259 /// starts executing checkpoints, so the perpetual store's range is
260 /// stable for the duration of the restore.
261 pub async fn bootstrap(
262 config: &NodeConfig,
263 authority_store: &Arc<AuthorityStore>,
264 checkpoint_store: &Arc<CheckpointStore>,
265 ingestion_source: RocksDbStore,
266 chain_identifier: ChainIdentifier,
267 registry: &Registry,
268 ) -> anyhow::Result<Self> {
269 let perpetual = authority_store.perpetual_tables.clone();
270 let path = config.db_path().join(RPC_STORE_DIR);
271 let (db, schema) = open_db(&path).await?;
272 let schema = Arc::new(schema);
273
274 // Expose per-CF RocksDB stats (sizes, compaction backlog,
275 // write-stall state) for the store's database.
276 registry
277 .register(Box::new(ColumnFamilyStatsCollector::new(
278 Some(METRICS_PREFIX),
279 &db,
280 )))
281 .context("registering the embedded rpc-store RocksDB stats collector")?;
282
283 // The highest checkpoint whose transaction outputs are durably
284 // committed to the perpetual store. This is the live cohort's restore
285 // target: the bulk restore reads the live object set, so the target
286 // must match the checkpoint that set reflects. We use the perpetual
287 // store's `highest_committed` watermark (written atomically with the
288 // objects) rather than the checkpoint store's `highest_executed`
289 // (bumped separately afterward), so an unclean stop cannot leave the
290 // restore reading objects beyond its target and double-counting them
291 // against the forward indexer. `None` only on a node's very first boot
292 // (genesis is executed later in startup), in which case there is
293 // nothing to bulk-load and the indexer builds both cohorts from genesis
294 // as the node executes.
295 let highest_committed = perpetual
296 .get_highest_committed_checkpoint()
297 .context("reading highest committed checkpoint")?
298 // Fall back to the checkpoint store's executed watermark for a
299 // database written before the atomic `highest_committed` watermark
300 // existed: it has no stamp yet, so this preserves the prior restore
301 // target until the next committed checkpoint stamps the consistent
302 // one. In normal operation `highest_committed` is written before
303 // `highest_executed` is bumped, so it is never absent while the
304 // executed watermark is present.
305 .or(checkpoint_store
306 .get_highest_executed_checkpoint_seq_number()
307 .context("reading highest executed checkpoint")?);
308 let lowest_available = lowest_available_checkpoint(&perpetual, checkpoint_store)?;
309
310 let chain_id = ChainId(*chain_identifier.as_bytes());
311 let live_resume = cohort_resume(&db, LIVE_COHORT)?;
312 let history_resume = cohort_resume(&db, HISTORY_COHORT)?.unwrap_or(0);
313 let chain_matches = stored_chain_id(&db)?.map(|stored| stored == chain_id);
314
315 let action = decide(live_resume, history_resume, chain_matches, lowest_available);
316 info!(
317 ?action,
318 lowest_available,
319 ?highest_committed,
320 "bootstrapping embedded rpc-store",
321 );
322
323 match action {
324 Bootstrap::Resume => {}
325 Bootstrap::SeedHistory => {
326 seed_history(
327 &db,
328 &schema,
329 &perpetual,
330 checkpoint_store,
331 lowest_available,
332 chain_id,
333 )?;
334 }
335 Bootstrap::Restore { clear } => {
336 if clear {
337 db.clear_all()
338 .context("clearing the out-of-range embedded rpc-store")?;
339 }
340 // A synced node enabling the embedded store for the
341 // first time (or recovering an out-of-range one):
342 // bulk-load the live cohort, then seed the history cohort
343 // so it backfills `(L, T]`. When `highest_committed` is
344 // `None` (a fresh node, nothing committed yet) there is
345 // nothing to load -- every pipeline stays unwatermarked
346 // so the indexer builds both cohorts from genesis as
347 // checkpoints execute.
348 if let Some(target) = highest_committed {
349 restore_live(
350 db.clone(),
351 schema.clone(),
352 perpetual.clone(),
353 target,
354 chain_id,
355 registry,
356 )
357 .await?;
358 // `L == 0` means genesis is still available, so the
359 // history cohort backfills from checkpoint 0 with no
360 // seed (an unwatermarked pipeline resumes at
361 // `first_checkpoint = 0`).
362 if lowest_available > 0 {
363 seed_history(
364 &db,
365 &schema,
366 &perpetual,
367 checkpoint_store,
368 lowest_available,
369 chain_id,
370 )?;
371 }
372 }
373 }
374 }
375
376 let store = sui_consistent_store::Store::new(db.clone(), schema.clone());
377 let reader = RpcStoreReader::new(db, schema);
378
379 Ok(Self {
380 store,
381 reader,
382 ingestion_source,
383 chain_id: chain_identifier,
384 action,
385 indexer_service: None,
386 })
387 }
388
389 /// The bootstrap action selected for the on-disk store at startup:
390 /// whether this run resumed the existing indexes, re-seeded the
391 /// history cohort, or rebuilt the live cohort. Read-only
392 /// introspection; primarily for tests.
393 pub fn bootstrap_action(&self) -> Bootstrap {
394 self.action
395 }
396
397 /// The highest checkpoint the live cohort has committed
398 /// (`min(checkpoint_hi_inclusive)` across its pipelines), i.e. how
399 /// far the live-object indexes have caught up to the tip. `None`
400 /// until every live pipeline has a watermark. Read-only
401 /// introspection; primarily for tests.
402 pub fn live_committed_checkpoint(&self) -> Option<u64> {
403 cohort_committed(self.store.db(), LIVE_COHORT)
404 .ok()
405 .flatten()
406 }
407
408 /// The highest checkpoint the history cohort has committed, i.e. how
409 /// far the ledger-history backfill has progressed. `None` until every
410 /// history pipeline has a watermark. Read-only introspection;
411 /// primarily for tests.
412 pub fn history_committed_checkpoint(&self) -> Option<u64> {
413 cohort_committed(self.store.db(), HISTORY_COHORT)
414 .ok()
415 .flatten()
416 }
417
418 /// A clone of the store handle, for the pruner's history-cohort
419 /// pruning ([`sui_rpc_store::prune_history_cohort`]).
420 pub fn store(&self) -> RpcStore {
421 self.store.clone()
422 }
423
424 /// A clone of the read handle, for the rpc-api read path
425 /// ([`crate::storage::RpcStoreReadStore`]).
426 pub fn reader(&self) -> RpcStoreReader {
427 self.reader.clone()
428 }
429
430 /// A callback reading the highest checkpoint the live cohort has
431 /// committed, for the subscription service's index gate (so a
432 /// checkpoint is not delivered to clients until its indexed state is
433 /// readable).
434 ///
435 /// Reads only the live cohort: the history cohort backfills
436 /// independently from the lowest available checkpoint, so gating on it
437 /// would hold back delivery on a restored node for the duration of the
438 /// backfill. On a node indexing from genesis the synchronizer keeps the
439 /// cohorts in lockstep, so the live cohort's progress implies the
440 /// history cohort's.
441 pub fn indexed_checkpoint_fn(&self) -> Arc<dyn Fn() -> Option<u64> + Send + Sync> {
442 let db = self.store.db().clone();
443 Arc::new(move || cohort_committed(&db, LIVE_COHORT).ok().flatten())
444 }
445
446 /// Spawn a background task that builds and runs the tip-following
447 /// indexer over the embedded store.
448 ///
449 /// The indexer is built on a background task -- rather than inline --
450 /// because the framework reads the starting tip via
451 /// `latest_checkpoint_number`, which on a node booting from genesis
452 /// blocks until the first checkpoint has been *executed*. At this
453 /// point genesis is only *synced* (the executor has not run yet), so
454 /// the read retries until the checkpoint executor catches up; that
455 /// executor only starts once `start_async` returns, so building
456 /// inline would deadlock node startup against a checkpoint that
457 /// cannot arrive until startup completes. (A resuming node has an
458 /// executed tip on disk and would not block, but the genesis case
459 /// forces the deferral unconditionally.) The follower catches up once
460 /// checkpoints begin to flow.
461 ///
462 /// The background task is held as a single-task [`Service`] so the
463 /// node owns the indexer's lifetime: dropping [`EmbeddedRpcStore`]
464 /// drops this service, which aborts the task and, with it, the inner
465 /// indexer service it is joining.
466 ///
467 /// `checkpoint_sender` is the checkpoint executor's broadcast
468 /// stream; when present it drives a low-latency
469 /// [`BroadcastStreamingClient`], with the perpetual-store ingestion
470 /// client filling any gap. When absent (e.g. on a node that does
471 /// not run the rpc servers) the ingestion client polls the
472 /// perpetual store alone.
473 pub fn spawn_indexer(
474 &mut self,
475 checkpoint_sender: Option<broadcast::Sender<Arc<Checkpoint>>>,
476 registry: Registry,
477 ) {
478 let store = self.store.clone();
479 let ingestion_source = self.ingestion_source.clone();
480 let chain_id = self.chain_id;
481
482 let service = Service::new().spawn(async move {
483 let mut service = match build_indexer(
484 store,
485 ingestion_source,
486 chain_id,
487 checkpoint_sender,
488 ®istry,
489 )
490 .await
491 {
492 Ok(service) => service,
493 Err(e) => {
494 error!("failed to start the embedded rpc-store indexer: {e:#}");
495 return Ok(());
496 }
497 };
498 // Hold the service for the task's lifetime; `join` only
499 // returns if an indexer task exits (it otherwise runs for the
500 // node's lifetime), so surface any fatal error.
501 if let Err(e) = service.join().await {
502 error!("the embedded rpc-store indexer exited with an error: {e:#}");
503 }
504 Ok(())
505 });
506 self.indexer_service = Some(service);
507 }
508}
509
510/// Build the tip-following indexer over `store`, register the embedded
511/// cohort pipelines, and run it. Returns the composed [`Service`]
512/// driving ingestion, the synchronizer, and the committers.
513async fn build_indexer(
514 store: RpcStore,
515 ingestion_source: RocksDbStore,
516 chain_id: ChainIdentifier,
517 checkpoint_sender: Option<broadcast::Sender<Arc<Checkpoint>>>,
518 registry: &Registry,
519) -> anyhow::Result<Service> {
520 let ingestion_metrics = IngestionMetrics::new(Some(METRICS_PREFIX), registry);
521 let ingestion_client = IngestionClient::from_trait(
522 Arc::new(PerpetualStoreIngestionClient::new(
523 ingestion_source.clone(),
524 chain_id,
525 )),
526 ingestion_metrics,
527 );
528 // The broadcast streaming client follows the tip with low latency; it
529 // reads the current tip from the same local store the ingestion
530 // client uses (so the framework's `peek()` resolves immediately even
531 // on an idle chain), and the ingestion client backfills any gap.
532 let streaming_client: Option<BoxedStreamingClient> = checkpoint_sender.map(|sender| {
533 Box::new(BroadcastStreamingClient::new(
534 sender,
535 chain_id,
536 ingestion_source,
537 )) as BoxedStreamingClient
538 });
539
540 let mut indexer = Indexer::from_store(
541 store,
542 IndexerArgs::default(),
543 ingestion_client,
544 streaming_client,
545 ConsistencyConfig::default(),
546 // Pruning is driven by the validator's `AuthorityStorePruner`
547 // (history cohort only), not the rpc-store's own pruner.
548 None,
549 IngestionConfig::default(),
550 registry,
551 )
552 .await
553 .context("constructing the embedded rpc-store indexer")?;
554 indexer
555 .add_pipelines(PipelineLayer::embedded(), CommitterConfig::default())
556 .await
557 .context("registering embedded rpc-store pipelines")?;
558 indexer
559 .run()
560 .await
561 .context("starting the embedded rpc-store indexer")
562}
563
564/// The lowest checkpoint the perpetual store can still serve: one past
565/// the higher of the object-store and checkpoint-store pruned
566/// watermarks (both inclusive). `0` when nothing has been pruned.
567fn lowest_available_checkpoint(
568 perpetual: &AuthorityPerpetualTables,
569 checkpoint_store: &CheckpointStore,
570) -> anyhow::Result<u64> {
571 let object_pruned = perpetual
572 .get_highest_pruned_checkpoint()
573 .context("reading object store pruned watermark")?;
574 let checkpoint_pruned = checkpoint_store
575 .get_highest_pruned_checkpoint_seq_number()
576 .context("reading checkpoint store pruned watermark")?;
577 Ok(object_pruned
578 .into_iter()
579 .chain(checkpoint_pruned)
580 .max()
581 .map(|pruned| pruned + 1)
582 .unwrap_or(0))
583}
584
585/// The highest checkpoint every pipeline in `cohort` has committed
586/// (`min(checkpoint_hi_inclusive)`). `None` if any pipeline in the
587/// cohort has no committed watermark.
588fn cohort_committed(db: &Db, cohort: &[&str]) -> anyhow::Result<Option<u64>> {
589 let framework = db.framework();
590 let mut min_hi: Option<u64> = None;
591 for name in cohort {
592 let key = PipelineTaskKey::new(*name);
593 let Some(watermark) = framework
594 .watermarks
595 .get(&key)
596 .with_context(|| format!("reading watermark for {name}"))?
597 else {
598 return Ok(None);
599 };
600 min_hi = Some(match min_hi {
601 Some(hi) => hi.min(watermark.checkpoint_hi_inclusive),
602 None => watermark.checkpoint_hi_inclusive,
603 });
604 }
605 Ok(min_hi)
606}
607
608/// The lowest checkpoint tip indexing would resume from across a
609/// cohort: `min(checkpoint_hi_inclusive) + 1`. `None` if any pipeline
610/// in the cohort has no committed watermark.
611fn cohort_resume(db: &Db, cohort: &[&str]) -> anyhow::Result<Option<u64>> {
612 Ok(cohort_committed(db, cohort)?.map(|hi| hi + 1))
613}
614
615/// The chain id the database is bound to, read from the first pipeline
616/// that has one recorded. All pipelines pin the same chain, so any one
617/// is representative.
618fn stored_chain_id(db: &Db) -> anyhow::Result<Option<ChainId>> {
619 let framework = db.framework();
620 for name in LIVE_COHORT.iter().chain(HISTORY_COHORT) {
621 let key = PipelineTaskKey::new(*name);
622 if let Some(id) = framework
623 .chain_ids
624 .get(&key)
625 .with_context(|| format!("reading chain id for {name}"))?
626 {
627 return Ok(Some(id));
628 }
629 }
630 Ok(None)
631}
632
633/// Bulk-load the live cohort from the perpetual store up to
634/// `target_checkpoint`, blocking until the restore completes.
635async fn restore_live(
636 db: Db,
637 schema: Arc<RpcStoreSchema>,
638 perpetual: Arc<AuthorityPerpetualTables>,
639 target_checkpoint: u64,
640 chain_id: ChainId,
641 registry: &Registry,
642) -> anyhow::Result<()> {
643 let source = PerpetualStoreRestoreSource::new(perpetual, target_checkpoint, chain_id);
644 let metrics = RestoreMetrics::new(Some(METRICS_PREFIX), registry);
645 let mut service = restore_indexes(
646 db,
647 schema,
648 source,
649 RestoreDriverConfig::default(),
650 RestoreLayer::indexes_only(),
651 metrics,
652 )
653 .context("starting the live-cohort restore")?;
654 service
655 .join()
656 .await
657 .context("restoring the live cohort from the perpetual store")?;
658 Ok(())
659}
660
661/// Seed the history cohort to `L - 1` so the backfill resumes at the
662/// lowest available checkpoint `L`. The seed watermark's `tx_hi`,
663/// epoch, and timestamp come from checkpoint `L - 1`'s summary, so the
664/// seeded pruning floor lines up with the first checkpoint the backfill
665/// will index.
666fn seed_history(
667 db: &Db,
668 schema: &RpcStoreSchema,
669 perpetual: &AuthorityPerpetualTables,
670 checkpoint_store: &CheckpointStore,
671 lowest_available: u64,
672 chain_id: ChainId,
673) -> anyhow::Result<()> {
674 debug_assert!(lowest_available > 0, "seed_history requires L > 0");
675 let anchor = lowest_available - 1;
676 let checkpoint = checkpoint_store
677 .get_checkpoint_by_sequence_number(anchor)
678 .context("reading the history seed-anchor checkpoint")?
679 .with_context(|| format!("history seed-anchor checkpoint {anchor} is unavailable"))?;
680 let summary = checkpoint.data();
681 let watermark = Watermark {
682 epoch_hi_inclusive: summary.epoch,
683 checkpoint_hi_inclusive: anchor,
684 tx_hi: summary.network_total_transactions,
685 timestamp_ms_hi_inclusive: summary.timestamp_ms,
686 };
687 seed_history_cohort(
688 db,
689 schema,
690 watermark,
691 chain_id,
692 Some(perpetual as &dyn ObjectStore),
693 )
694 .context("seeding the history cohort")
695}
696
697#[cfg(test)]
698mod tests {
699 use super::*;
700
701 // `L = 0` (nothing pruned): an unseeded history cohort backfills
702 // from genesis, so a complete live cohort is enough to resume.
703 #[test]
704 fn resumes_from_genesis_when_nothing_pruned() {
705 assert_eq!(decide(Some(10), 0, Some(true), 0), Bootstrap::Resume);
706 // History never seeded (resume 0) is fine at L = 0.
707 assert_eq!(decide(Some(10), 0, None, 0), Bootstrap::Resume);
708 }
709
710 // Both cohorts resume at or above the available floor.
711 #[test]
712 fn resumes_when_in_range() {
713 assert_eq!(decide(Some(100), 100, Some(true), 100), Bootstrap::Resume);
714 assert_eq!(decide(Some(200), 100, Some(true), 100), Bootstrap::Resume);
715 }
716
717 // The live cohort never finished restoring: resume the restore in
718 // place rather than clearing partial progress.
719 #[test]
720 fn restores_without_clearing_when_live_uninitialized() {
721 assert_eq!(
722 decide(None, 0, None, 0),
723 Bootstrap::Restore { clear: false }
724 );
725 assert_eq!(
726 decide(None, 50, Some(true), 100),
727 Bootstrap::Restore { clear: false }
728 );
729 }
730
731 // The live cohort references checkpoints the perpetual store has
732 // pruned away: wipe and rebuild.
733 #[test]
734 fn clears_and_restores_when_live_out_of_range() {
735 assert_eq!(
736 decide(Some(50), 200, Some(true), 100),
737 Bootstrap::Restore { clear: true }
738 );
739 }
740
741 // A database bound to a different chain is always wiped.
742 #[test]
743 fn clears_and_restores_on_chain_mismatch() {
744 assert_eq!(
745 decide(Some(200), 200, Some(false), 100),
746 Bootstrap::Restore { clear: true }
747 );
748 // Chain mismatch dominates even an otherwise-resumable state.
749 assert_eq!(
750 decide(Some(200), 200, Some(false), 0),
751 Bootstrap::Restore { clear: true }
752 );
753 }
754
755 // The live cohort is in range but the history cohort is missing or
756 // has fallen behind the floor: re-seed history alone.
757 #[test]
758 fn seeds_history_when_history_behind_floor() {
759 // History never seeded (resume 0) but L > 0.
760 assert_eq!(
761 decide(Some(200), 0, Some(true), 100),
762 Bootstrap::SeedHistory
763 );
764 // History seeded but below the (advanced) floor.
765 assert_eq!(
766 decide(Some(200), 50, Some(true), 100),
767 Bootstrap::SeedHistory
768 );
769 // History exactly at the floor resumes.
770 assert_eq!(decide(Some(200), 100, Some(true), 100), Bootstrap::Resume);
771 }
772}