sui_core/
rpc_store_embed.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Startup orchestration for the embedded `sui-rpc-store` indexer.
5//!
6//! When a fullnode is configured with
7//! [`RpcConfig::use_experimental_rpc_store`](sui_config::RpcConfig::use_experimental_rpc_store),
8//! it serves the rpc-api
9//! index surface from an embedded [`sui_rpc_store`] instance instead of
10//! the legacy `rpc-index`. This module owns the lifecycle of that
11//! instance:
12//!
13//! 1. Open the rpc-store database under the node's `db_path()`.
14//! 2. Compare its persisted per-pipeline watermarks against the
15//!    perpetual store's currently-available checkpoint range `[L, T]`
16//!    (`L` = lowest available, `T` = highest executed) and `decide`
17//!    what to do: resume as-is, (re)seed the history cohort, or
18//!    (re)restore the live cohort.
19//! 3. Bulk-load the live cohort from the perpetual store and seed the
20//!    history cohort when needed (blocking, before the node starts
21//!    executing).
22//! 4. Build the read handle the rpc-api serves through, hand the store
23//!    to the pruner, and spawn the tip-following indexer fed by the
24//!    perpetual store ([`PerpetualStoreIngestionClient`]) and the
25//!    checkpoint executor's broadcast stream
26//!    ([`BroadcastStreamingClient`]).
27//!
28//! The live cohort (live-object-derivable indexes) is restored to the
29//! tip and follows forward. The history cohort (ledger-history bitmaps,
30//! `tx_seq` maps, per-epoch metadata) is seeded to the lowest available
31//! checkpoint and backfilled upward; the synchronizer's dynamic cohort
32//! lets it catch up to the live frontier without stalling tip
33//! snapshots.
34
35use std::sync::Arc;
36
37use anyhow::Context as _;
38use prometheus::Registry;
39use sui_config::NodeConfig;
40use sui_consistent_store::ChainId;
41use sui_consistent_store::Db;
42use sui_consistent_store::DbOptions;
43use sui_consistent_store::PipelineTaskKey;
44use sui_consistent_store::Watermark;
45use sui_consistent_store::metrics::ColumnFamilyStatsCollector;
46use sui_consistent_store::restore::RestoreDriverConfig;
47use sui_consistent_store::restore::metrics::RestoreMetrics;
48use sui_indexer_alt_framework::IndexerArgs;
49use sui_indexer_alt_framework::ingestion::BoxedStreamingClient;
50use sui_indexer_alt_framework::ingestion::IngestionConfig;
51use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClient;
52use sui_indexer_alt_framework::metrics::IngestionMetrics;
53use sui_indexer_alt_framework::pipeline::CommitterConfig;
54use sui_indexer_alt_framework::service::Service;
55use sui_rpc_store::ConsistencyConfig;
56use sui_rpc_store::HISTORY_COHORT;
57use sui_rpc_store::Indexer;
58use sui_rpc_store::LIVE_COHORT;
59use sui_rpc_store::METRICS_PREFIX;
60use sui_rpc_store::PipelineLayer;
61use sui_rpc_store::RestoreLayer;
62use sui_rpc_store::RpcStoreReader;
63use sui_rpc_store::RpcStoreSchema;
64use sui_rpc_store::Store as RpcStore;
65use sui_rpc_store::default_rocksdb_config;
66use sui_rpc_store::restore_indexes;
67use sui_rpc_store::seed_history_cohort;
68use sui_types::digests::ChainIdentifier;
69use sui_types::full_checkpoint_content::Checkpoint;
70use sui_types::storage::ObjectStore;
71use tokio::sync::broadcast;
72use tracing::error;
73use tracing::info;
74
75use crate::authority::authority_store::AuthorityStore;
76use crate::authority::authority_store_tables::AuthorityPerpetualTables;
77use crate::checkpoints::CheckpointStore;
78use crate::rpc_store_ingestion_client::PerpetualStoreIngestionClient;
79use crate::rpc_store_restore_source::PerpetualStoreRestoreSource;
80use crate::rpc_store_streaming_client::BroadcastStreamingClient;
81use crate::storage::RocksDbStore;
82
83/// Subdirectory of the node's `db_path()` holding the rpc-store.
84const RPC_STORE_DIR: &str = "rpc_store";
85
86/// Number of in-memory snapshots retained for consistent reads.
87/// Mirrors the standalone `sui-rpc-node` default; since a snapshot is
88/// taken at every checkpoint boundary this is roughly a 32-checkpoint
89/// consistency window.
90const SNAPSHOT_CAPACITY: usize = 32;
91
92fn db_options() -> DbOptions {
93    DbOptions {
94        rocksdb: default_rocksdb_config(),
95        snapshot_capacity: SNAPSHOT_CAPACITY,
96    }
97}
98
99/// Open the rpc-store database at `path`.
100#[cfg(not(msim))]
101async fn open_db(path: &std::path::Path) -> anyhow::Result<(Db, RpcStoreSchema)> {
102    Db::open::<RpcStoreSchema>(path, db_options())
103        .context("opening the embedded rpc-store database")
104}
105
106/// Open the rpc-store database at `path`, retrying a transient lock
107/// conflict.
108///
109/// A node restarted on the same `db_path` (the simtest restart path) can
110/// briefly observe the path as locked: the previous instance's RocksDB
111/// teardown frees its in-process lock registry entry inside RocksDB's
112/// native close, on its own threads, and that release is not synchronized
113/// with the drop of the last `Db` handle -- so even after every strong
114/// handle has dropped, the reopen can race the unfinished native teardown.
115/// Retrying is robust where a wait is not (the only operation that can
116/// observe "the path is openable again" is asking RocksDB to open it); a
117/// genuine, persistent failure surfaces once attempts are exhausted.
118/// Mirrors `typed_store::safe_drop_rocksdb`'s retry on the inverse
119/// (destroy-vs-teardown) race.
120///
121/// Simulation-only: under a real runtime `Node::stop` joins the node's
122/// thread, so the prior instance's teardown completes before the restart
123/// and the open never races.
124#[cfg(msim)]
125async fn open_db(path: &std::path::Path) -> anyhow::Result<(Db, RpcStoreSchema)> {
126    const OPEN_ATTEMPTS: usize = 60;
127    const OPEN_RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(100);
128
129    let mut attempt = 1;
130    loop {
131        match Db::open::<RpcStoreSchema>(path, db_options()) {
132            Ok(opened) => return Ok(opened),
133            Err(e) if attempt < OPEN_ATTEMPTS => {
134                tracing::warn!(
135                    attempt,
136                    "opening the embedded rpc-store database failed, retrying: {e:?}"
137                );
138                attempt += 1;
139                tokio::time::sleep(OPEN_RETRY_DELAY).await;
140            }
141            Err(e) => return Err(e).context("opening the embedded rpc-store database"),
142        }
143    }
144}
145
146/// What the startup orchestration does with the on-disk rpc-store.
147///
148/// The action chosen at startup is retained on [`EmbeddedRpcStore`] and
149/// exposed via [`EmbeddedRpcStore::bootstrap_action`] so tests (and
150/// future introspection surfaces) can tell whether a restart resumed the
151/// existing indexes or rebuilt them.
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum Bootstrap {
154    /// The on-disk state resumes within the available range; open it
155    /// and follow the tip with no blocking work.
156    Resume,
157
158    /// The live cohort is fine, but the history cohort is missing or
159    /// has fallen below the available floor; (re)seed it in place
160    /// without disturbing the live cohort.
161    SeedHistory,
162
163    /// (Re)bulk-load the live cohort from the perpetual store, then
164    /// seed the history cohort. `clear` wipes the database first (for
165    /// out-of-range or wrong-chain data); otherwise the restore
166    /// resumes from any in-progress per-shard cursors.
167    Restore { clear: bool },
168}
169
170/// Decide what bootstrap action the embedded store needs from the
171/// persisted framework state and the perpetual store's available
172/// range.
173///
174/// - `live_resume` is `Some(c)` when every [`LIVE_COHORT`] pipeline
175///   has a committed watermark, where `c` is the lowest checkpoint tip
176///   indexing would resume from across them
177///   (`min(checkpoint_hi_inclusive) + 1`); `None` when any live
178///   pipeline lacks a watermark (never restored, or a restore that did
179///   not finish).
180/// - `history_resume` is the same for the [`HISTORY_COHORT`], but a
181///   missing watermark maps to `0`: an unwatermarked pipeline resumes
182///   at `first_checkpoint`, which the embedded path leaves at its `0`
183///   default, so the history cohort backfills from genesis.
184/// - `chain_matches` is `Some(false)` when the database is bound to a
185///   different chain, `Some(true)` when it matches, and `None` when no
186///   chain id has been recorded yet.
187/// - `lowest_available` is `L`, the lowest checkpoint the perpetual
188///   store can still serve.
189fn decide(
190    live_resume: Option<u64>,
191    history_resume: u64,
192    chain_matches: Option<bool>,
193    lowest_available: u64,
194) -> Bootstrap {
195    // A database bound to another chain is unusable; wipe and rebuild.
196    if chain_matches == Some(false) {
197        return Bootstrap::Restore { clear: true };
198    }
199
200    let Some(live_resume) = live_resume else {
201        // The live cohort never finished restoring. Resume the restore
202        // in place -- the driver picks up from its per-shard cursors --
203        // rather than clearing partial progress.
204        return Bootstrap::Restore { clear: false };
205    };
206
207    // The live cohort's indexes reference checkpoints the perpetual
208    // store has since pruned; the bulk-loaded data is unusable.
209    if live_resume < lowest_available {
210        return Bootstrap::Restore { clear: true };
211    }
212
213    // The live cohort is in range; the history cohort either was never
214    // seeded above the floor or fell behind it. Re-seed it alone.
215    if history_resume < lowest_available {
216        return Bootstrap::SeedHistory;
217    }
218
219    Bootstrap::Resume
220}
221
222/// A bootstrapped embedded rpc-store, ready to hand to the pruner and
223/// the rpc-api read path and to start tip indexing.
224pub struct EmbeddedRpcStore {
225    /// Shared store handle. Cloned for the pruner (via [`Self::store`])
226    /// and for the tip indexer.
227    store: RpcStore,
228
229    /// Read handle exposing the rpc-store's index surface to
230    /// `sui-rpc-api`.
231    reader: RpcStoreReader,
232
233    /// Local checkpoint source for the tip ingestion client.
234    ingestion_source: RocksDbStore,
235
236    chain_id: ChainIdentifier,
237
238    /// The bootstrap action [`decide`] selected for the on-disk store at
239    /// startup. Retained for introspection (see
240    /// [`Self::bootstrap_action`]); does not affect runtime behavior.
241    action: Bootstrap,
242
243    /// Service wrapping the background task that builds and runs the tip
244    /// indexer, populated by [`Self::spawn_indexer`]. Its single primary
245    /// task builds the indexer's inner [`Service`] and joins it; dropping
246    /// this service on node shutdown aborts that task, which in turn drops
247    /// and aborts the inner service's pipeline tasks. This is what keeps
248    /// the indexer from leaking (notably across e2e tests sharing a
249    /// process), so no explicit [`Drop`] is needed.
250    indexer_service: Option<Service>,
251}
252
253impl EmbeddedRpcStore {
254    /// Open the rpc-store, bring it in line with the perpetual store's
255    /// available range (restoring / seeding as needed), and build the
256    /// store and read handles.
257    ///
258    /// Blocks while restoring the live cohort. Call before the node
259    /// starts executing checkpoints, so the perpetual store's range is
260    /// stable for the duration of the restore.
261    pub async fn bootstrap(
262        config: &NodeConfig,
263        authority_store: &Arc<AuthorityStore>,
264        checkpoint_store: &Arc<CheckpointStore>,
265        ingestion_source: RocksDbStore,
266        chain_identifier: ChainIdentifier,
267        registry: &Registry,
268    ) -> anyhow::Result<Self> {
269        let perpetual = authority_store.perpetual_tables.clone();
270        let path = config.db_path().join(RPC_STORE_DIR);
271        let (db, schema) = open_db(&path).await?;
272        let schema = Arc::new(schema);
273
274        // Expose per-CF RocksDB stats (sizes, compaction backlog,
275        // write-stall state) for the store's database.
276        registry
277            .register(Box::new(ColumnFamilyStatsCollector::new(
278                Some(METRICS_PREFIX),
279                &db,
280            )))
281            .context("registering the embedded rpc-store RocksDB stats collector")?;
282
283        // The highest checkpoint whose transaction outputs are durably
284        // committed to the perpetual store. This is the live cohort's restore
285        // target: the bulk restore reads the live object set, so the target
286        // must match the checkpoint that set reflects. We use the perpetual
287        // store's `highest_committed` watermark (written atomically with the
288        // objects) rather than the checkpoint store's `highest_executed`
289        // (bumped separately afterward), so an unclean stop cannot leave the
290        // restore reading objects beyond its target and double-counting them
291        // against the forward indexer. `None` only on a node's very first boot
292        // (genesis is executed later in startup), in which case there is
293        // nothing to bulk-load and the indexer builds both cohorts from genesis
294        // as the node executes.
295        let highest_committed = perpetual
296            .get_highest_committed_checkpoint()
297            .context("reading highest committed checkpoint")?
298            // Fall back to the checkpoint store's executed watermark for a
299            // database written before the atomic `highest_committed` watermark
300            // existed: it has no stamp yet, so this preserves the prior restore
301            // target until the next committed checkpoint stamps the consistent
302            // one. In normal operation `highest_committed` is written before
303            // `highest_executed` is bumped, so it is never absent while the
304            // executed watermark is present.
305            .or(checkpoint_store
306                .get_highest_executed_checkpoint_seq_number()
307                .context("reading highest executed checkpoint")?);
308        let lowest_available = lowest_available_checkpoint(&perpetual, checkpoint_store)?;
309
310        let chain_id = ChainId(*chain_identifier.as_bytes());
311        let live_resume = cohort_resume(&db, LIVE_COHORT)?;
312        let history_resume = cohort_resume(&db, HISTORY_COHORT)?.unwrap_or(0);
313        let chain_matches = stored_chain_id(&db)?.map(|stored| stored == chain_id);
314
315        let action = decide(live_resume, history_resume, chain_matches, lowest_available);
316        info!(
317            ?action,
318            lowest_available,
319            ?highest_committed,
320            "bootstrapping embedded rpc-store",
321        );
322
323        match action {
324            Bootstrap::Resume => {}
325            Bootstrap::SeedHistory => {
326                seed_history(
327                    &db,
328                    &schema,
329                    &perpetual,
330                    checkpoint_store,
331                    lowest_available,
332                    chain_id,
333                )?;
334            }
335            Bootstrap::Restore { clear } => {
336                if clear {
337                    db.clear_all()
338                        .context("clearing the out-of-range embedded rpc-store")?;
339                }
340                // A synced node enabling the embedded store for the
341                // first time (or recovering an out-of-range one):
342                // bulk-load the live cohort, then seed the history cohort
343                // so it backfills `(L, T]`. When `highest_committed` is
344                // `None` (a fresh node, nothing committed yet) there is
345                // nothing to load -- every pipeline stays unwatermarked
346                // so the indexer builds both cohorts from genesis as
347                // checkpoints execute.
348                if let Some(target) = highest_committed {
349                    restore_live(
350                        db.clone(),
351                        schema.clone(),
352                        perpetual.clone(),
353                        target,
354                        chain_id,
355                        registry,
356                    )
357                    .await?;
358                    // `L == 0` means genesis is still available, so the
359                    // history cohort backfills from checkpoint 0 with no
360                    // seed (an unwatermarked pipeline resumes at
361                    // `first_checkpoint = 0`).
362                    if lowest_available > 0 {
363                        seed_history(
364                            &db,
365                            &schema,
366                            &perpetual,
367                            checkpoint_store,
368                            lowest_available,
369                            chain_id,
370                        )?;
371                    }
372                }
373            }
374        }
375
376        let store = sui_consistent_store::Store::new(db.clone(), schema.clone());
377        let reader = RpcStoreReader::new(db, schema);
378
379        Ok(Self {
380            store,
381            reader,
382            ingestion_source,
383            chain_id: chain_identifier,
384            action,
385            indexer_service: None,
386        })
387    }
388
389    /// The bootstrap action selected for the on-disk store at startup:
390    /// whether this run resumed the existing indexes, re-seeded the
391    /// history cohort, or rebuilt the live cohort. Read-only
392    /// introspection; primarily for tests.
393    pub fn bootstrap_action(&self) -> Bootstrap {
394        self.action
395    }
396
397    /// The highest checkpoint the live cohort has committed
398    /// (`min(checkpoint_hi_inclusive)` across its pipelines), i.e. how
399    /// far the live-object indexes have caught up to the tip. `None`
400    /// until every live pipeline has a watermark. Read-only
401    /// introspection; primarily for tests.
402    pub fn live_committed_checkpoint(&self) -> Option<u64> {
403        cohort_committed(self.store.db(), LIVE_COHORT)
404            .ok()
405            .flatten()
406    }
407
408    /// The highest checkpoint the history cohort has committed, i.e. how
409    /// far the ledger-history backfill has progressed. `None` until every
410    /// history pipeline has a watermark. Read-only introspection;
411    /// primarily for tests.
412    pub fn history_committed_checkpoint(&self) -> Option<u64> {
413        cohort_committed(self.store.db(), HISTORY_COHORT)
414            .ok()
415            .flatten()
416    }
417
418    /// A clone of the store handle, for the pruner's history-cohort
419    /// pruning ([`sui_rpc_store::prune_history_cohort`]).
420    pub fn store(&self) -> RpcStore {
421        self.store.clone()
422    }
423
424    /// A clone of the read handle, for the rpc-api read path
425    /// ([`crate::storage::RpcStoreReadStore`]).
426    pub fn reader(&self) -> RpcStoreReader {
427        self.reader.clone()
428    }
429
430    /// A callback reading the highest checkpoint the live cohort has
431    /// committed, for the subscription service's index gate (so a
432    /// checkpoint is not delivered to clients until its indexed state is
433    /// readable).
434    ///
435    /// Reads only the live cohort: the history cohort backfills
436    /// independently from the lowest available checkpoint, so gating on it
437    /// would hold back delivery on a restored node for the duration of the
438    /// backfill. On a node indexing from genesis the synchronizer keeps the
439    /// cohorts in lockstep, so the live cohort's progress implies the
440    /// history cohort's.
441    pub fn indexed_checkpoint_fn(&self) -> Arc<dyn Fn() -> Option<u64> + Send + Sync> {
442        let db = self.store.db().clone();
443        Arc::new(move || cohort_committed(&db, LIVE_COHORT).ok().flatten())
444    }
445
446    /// Spawn a background task that builds and runs the tip-following
447    /// indexer over the embedded store.
448    ///
449    /// The indexer is built on a background task -- rather than inline --
450    /// because the framework reads the starting tip via
451    /// `latest_checkpoint_number`, which on a node booting from genesis
452    /// blocks until the first checkpoint has been *executed*. At this
453    /// point genesis is only *synced* (the executor has not run yet), so
454    /// the read retries until the checkpoint executor catches up; that
455    /// executor only starts once `start_async` returns, so building
456    /// inline would deadlock node startup against a checkpoint that
457    /// cannot arrive until startup completes. (A resuming node has an
458    /// executed tip on disk and would not block, but the genesis case
459    /// forces the deferral unconditionally.) The follower catches up once
460    /// checkpoints begin to flow.
461    ///
462    /// The background task is held as a single-task [`Service`] so the
463    /// node owns the indexer's lifetime: dropping [`EmbeddedRpcStore`]
464    /// drops this service, which aborts the task and, with it, the inner
465    /// indexer service it is joining.
466    ///
467    /// `checkpoint_sender` is the checkpoint executor's broadcast
468    /// stream; when present it drives a low-latency
469    /// [`BroadcastStreamingClient`], with the perpetual-store ingestion
470    /// client filling any gap. When absent (e.g. on a node that does
471    /// not run the rpc servers) the ingestion client polls the
472    /// perpetual store alone.
473    pub fn spawn_indexer(
474        &mut self,
475        checkpoint_sender: Option<broadcast::Sender<Arc<Checkpoint>>>,
476        registry: Registry,
477    ) {
478        let store = self.store.clone();
479        let ingestion_source = self.ingestion_source.clone();
480        let chain_id = self.chain_id;
481
482        let service = Service::new().spawn(async move {
483            let mut service = match build_indexer(
484                store,
485                ingestion_source,
486                chain_id,
487                checkpoint_sender,
488                &registry,
489            )
490            .await
491            {
492                Ok(service) => service,
493                Err(e) => {
494                    error!("failed to start the embedded rpc-store indexer: {e:#}");
495                    return Ok(());
496                }
497            };
498            // Hold the service for the task's lifetime; `join` only
499            // returns if an indexer task exits (it otherwise runs for the
500            // node's lifetime), so surface any fatal error.
501            if let Err(e) = service.join().await {
502                error!("the embedded rpc-store indexer exited with an error: {e:#}");
503            }
504            Ok(())
505        });
506        self.indexer_service = Some(service);
507    }
508}
509
510/// Build the tip-following indexer over `store`, register the embedded
511/// cohort pipelines, and run it. Returns the composed [`Service`]
512/// driving ingestion, the synchronizer, and the committers.
513async fn build_indexer(
514    store: RpcStore,
515    ingestion_source: RocksDbStore,
516    chain_id: ChainIdentifier,
517    checkpoint_sender: Option<broadcast::Sender<Arc<Checkpoint>>>,
518    registry: &Registry,
519) -> anyhow::Result<Service> {
520    let ingestion_metrics = IngestionMetrics::new(Some(METRICS_PREFIX), registry);
521    let ingestion_client = IngestionClient::from_trait(
522        Arc::new(PerpetualStoreIngestionClient::new(
523            ingestion_source.clone(),
524            chain_id,
525        )),
526        ingestion_metrics,
527    );
528    // The broadcast streaming client follows the tip with low latency; it
529    // reads the current tip from the same local store the ingestion
530    // client uses (so the framework's `peek()` resolves immediately even
531    // on an idle chain), and the ingestion client backfills any gap.
532    let streaming_client: Option<BoxedStreamingClient> = checkpoint_sender.map(|sender| {
533        Box::new(BroadcastStreamingClient::new(
534            sender,
535            chain_id,
536            ingestion_source,
537        )) as BoxedStreamingClient
538    });
539
540    let mut indexer = Indexer::from_store(
541        store,
542        IndexerArgs::default(),
543        ingestion_client,
544        streaming_client,
545        ConsistencyConfig::default(),
546        // Pruning is driven by the validator's `AuthorityStorePruner`
547        // (history cohort only), not the rpc-store's own pruner.
548        None,
549        IngestionConfig::default(),
550        registry,
551    )
552    .await
553    .context("constructing the embedded rpc-store indexer")?;
554    indexer
555        .add_pipelines(PipelineLayer::embedded(), CommitterConfig::default())
556        .await
557        .context("registering embedded rpc-store pipelines")?;
558    indexer
559        .run()
560        .await
561        .context("starting the embedded rpc-store indexer")
562}
563
564/// The lowest checkpoint the perpetual store can still serve: one past
565/// the higher of the object-store and checkpoint-store pruned
566/// watermarks (both inclusive). `0` when nothing has been pruned.
567fn lowest_available_checkpoint(
568    perpetual: &AuthorityPerpetualTables,
569    checkpoint_store: &CheckpointStore,
570) -> anyhow::Result<u64> {
571    let object_pruned = perpetual
572        .get_highest_pruned_checkpoint()
573        .context("reading object store pruned watermark")?;
574    let checkpoint_pruned = checkpoint_store
575        .get_highest_pruned_checkpoint_seq_number()
576        .context("reading checkpoint store pruned watermark")?;
577    Ok(object_pruned
578        .into_iter()
579        .chain(checkpoint_pruned)
580        .max()
581        .map(|pruned| pruned + 1)
582        .unwrap_or(0))
583}
584
585/// The highest checkpoint every pipeline in `cohort` has committed
586/// (`min(checkpoint_hi_inclusive)`). `None` if any pipeline in the
587/// cohort has no committed watermark.
588fn cohort_committed(db: &Db, cohort: &[&str]) -> anyhow::Result<Option<u64>> {
589    let framework = db.framework();
590    let mut min_hi: Option<u64> = None;
591    for name in cohort {
592        let key = PipelineTaskKey::new(*name);
593        let Some(watermark) = framework
594            .watermarks
595            .get(&key)
596            .with_context(|| format!("reading watermark for {name}"))?
597        else {
598            return Ok(None);
599        };
600        min_hi = Some(match min_hi {
601            Some(hi) => hi.min(watermark.checkpoint_hi_inclusive),
602            None => watermark.checkpoint_hi_inclusive,
603        });
604    }
605    Ok(min_hi)
606}
607
608/// The lowest checkpoint tip indexing would resume from across a
609/// cohort: `min(checkpoint_hi_inclusive) + 1`. `None` if any pipeline
610/// in the cohort has no committed watermark.
611fn cohort_resume(db: &Db, cohort: &[&str]) -> anyhow::Result<Option<u64>> {
612    Ok(cohort_committed(db, cohort)?.map(|hi| hi + 1))
613}
614
615/// The chain id the database is bound to, read from the first pipeline
616/// that has one recorded. All pipelines pin the same chain, so any one
617/// is representative.
618fn stored_chain_id(db: &Db) -> anyhow::Result<Option<ChainId>> {
619    let framework = db.framework();
620    for name in LIVE_COHORT.iter().chain(HISTORY_COHORT) {
621        let key = PipelineTaskKey::new(*name);
622        if let Some(id) = framework
623            .chain_ids
624            .get(&key)
625            .with_context(|| format!("reading chain id for {name}"))?
626        {
627            return Ok(Some(id));
628        }
629    }
630    Ok(None)
631}
632
633/// Bulk-load the live cohort from the perpetual store up to
634/// `target_checkpoint`, blocking until the restore completes.
635async fn restore_live(
636    db: Db,
637    schema: Arc<RpcStoreSchema>,
638    perpetual: Arc<AuthorityPerpetualTables>,
639    target_checkpoint: u64,
640    chain_id: ChainId,
641    registry: &Registry,
642) -> anyhow::Result<()> {
643    let source = PerpetualStoreRestoreSource::new(perpetual, target_checkpoint, chain_id);
644    let metrics = RestoreMetrics::new(Some(METRICS_PREFIX), registry);
645    let mut service = restore_indexes(
646        db,
647        schema,
648        source,
649        RestoreDriverConfig::default(),
650        RestoreLayer::indexes_only(),
651        metrics,
652    )
653    .context("starting the live-cohort restore")?;
654    service
655        .join()
656        .await
657        .context("restoring the live cohort from the perpetual store")?;
658    Ok(())
659}
660
661/// Seed the history cohort to `L - 1` so the backfill resumes at the
662/// lowest available checkpoint `L`. The seed watermark's `tx_hi`,
663/// epoch, and timestamp come from checkpoint `L - 1`'s summary, so the
664/// seeded pruning floor lines up with the first checkpoint the backfill
665/// will index.
666fn seed_history(
667    db: &Db,
668    schema: &RpcStoreSchema,
669    perpetual: &AuthorityPerpetualTables,
670    checkpoint_store: &CheckpointStore,
671    lowest_available: u64,
672    chain_id: ChainId,
673) -> anyhow::Result<()> {
674    debug_assert!(lowest_available > 0, "seed_history requires L > 0");
675    let anchor = lowest_available - 1;
676    let checkpoint = checkpoint_store
677        .get_checkpoint_by_sequence_number(anchor)
678        .context("reading the history seed-anchor checkpoint")?
679        .with_context(|| format!("history seed-anchor checkpoint {anchor} is unavailable"))?;
680    let summary = checkpoint.data();
681    let watermark = Watermark {
682        epoch_hi_inclusive: summary.epoch,
683        checkpoint_hi_inclusive: anchor,
684        tx_hi: summary.network_total_transactions,
685        timestamp_ms_hi_inclusive: summary.timestamp_ms,
686    };
687    seed_history_cohort(
688        db,
689        schema,
690        watermark,
691        chain_id,
692        Some(perpetual as &dyn ObjectStore),
693    )
694    .context("seeding the history cohort")
695}
696
697#[cfg(test)]
698mod tests {
699    use super::*;
700
701    // `L = 0` (nothing pruned): an unseeded history cohort backfills
702    // from genesis, so a complete live cohort is enough to resume.
703    #[test]
704    fn resumes_from_genesis_when_nothing_pruned() {
705        assert_eq!(decide(Some(10), 0, Some(true), 0), Bootstrap::Resume);
706        // History never seeded (resume 0) is fine at L = 0.
707        assert_eq!(decide(Some(10), 0, None, 0), Bootstrap::Resume);
708    }
709
710    // Both cohorts resume at or above the available floor.
711    #[test]
712    fn resumes_when_in_range() {
713        assert_eq!(decide(Some(100), 100, Some(true), 100), Bootstrap::Resume);
714        assert_eq!(decide(Some(200), 100, Some(true), 100), Bootstrap::Resume);
715    }
716
717    // The live cohort never finished restoring: resume the restore in
718    // place rather than clearing partial progress.
719    #[test]
720    fn restores_without_clearing_when_live_uninitialized() {
721        assert_eq!(
722            decide(None, 0, None, 0),
723            Bootstrap::Restore { clear: false }
724        );
725        assert_eq!(
726            decide(None, 50, Some(true), 100),
727            Bootstrap::Restore { clear: false }
728        );
729    }
730
731    // The live cohort references checkpoints the perpetual store has
732    // pruned away: wipe and rebuild.
733    #[test]
734    fn clears_and_restores_when_live_out_of_range() {
735        assert_eq!(
736            decide(Some(50), 200, Some(true), 100),
737            Bootstrap::Restore { clear: true }
738        );
739    }
740
741    // A database bound to a different chain is always wiped.
742    #[test]
743    fn clears_and_restores_on_chain_mismatch() {
744        assert_eq!(
745            decide(Some(200), 200, Some(false), 100),
746            Bootstrap::Restore { clear: true }
747        );
748        // Chain mismatch dominates even an otherwise-resumable state.
749        assert_eq!(
750            decide(Some(200), 200, Some(false), 0),
751            Bootstrap::Restore { clear: true }
752        );
753    }
754
755    // The live cohort is in range but the history cohort is missing or
756    // has fallen behind the floor: re-seed history alone.
757    #[test]
758    fn seeds_history_when_history_behind_floor() {
759        // History never seeded (resume 0) but L > 0.
760        assert_eq!(
761            decide(Some(200), 0, Some(true), 100),
762            Bootstrap::SeedHistory
763        );
764        // History seeded but below the (advanced) floor.
765        assert_eq!(
766            decide(Some(200), 50, Some(true), 100),
767            Bootstrap::SeedHistory
768        );
769        // History exactly at the floor resumes.
770        assert_eq!(decide(Some(200), 100, Some(true), 100), Bootstrap::Resume);
771    }
772}