sui_rpc_store/indexer/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Indexer pipelines that populate the `sui-rpc-store` schema
5//! from observed [`Checkpoint`]s, plus the orchestrator
6//! ([`Indexer`]) that wires them up against a shared
7//! [`Synchronizer`].
8//!
9//! Each pipeline submodule implements the
10//! `Processor` + `sequential::Handler` pair the
11//! `sui-indexer-alt-framework` drives: `process` turns a checkpoint
12//! into a `Vec<Value>` (with the heavy lifting done in the
13//! processor-pool, off the commit hot path), `batch` folds many
14//! values into a single `Batch`, and `commit` stages the batch's
15//! writes against a [`sui_consistent_store::Connection`] from
16//! [`sui_consistent_store::Store`].
17//!
18//! Every pipeline targets the same backing [`RpcStoreSchema`].
19
20pub mod balance;
21pub mod checkpoint_broadcast;
22pub mod checkpoint_contents;
23pub mod checkpoint_seq_by_digest;
24pub mod checkpoint_summary;
25pub mod effects;
26pub mod epochs;
27pub mod event_bitmap;
28pub mod events;
29pub mod object_by_owner;
30pub mod object_by_type;
31pub mod object_version_by_checkpoint;
32pub mod objects;
33pub mod package_versions;
34pub mod pruner;
35pub mod restore;
36pub mod transaction_bitmap;
37pub mod transactions;
38pub mod tx_metadata_by_seq;
39pub mod tx_seq_by_digest;
40
41use std::collections::BTreeMap;
42use std::collections::HashSet;
43use std::collections::btree_map::Entry;
44use std::path::Path;
45use std::sync::Arc;
46
47use anyhow::Context as _;
48use prometheus::Registry;
49use sui_consistent_store::Db;
50use sui_consistent_store::DbOptions;
51use sui_consistent_store::PipelineTaskKey;
52use sui_consistent_store::Synchronizer;
53use sui_consistent_store::restore_state;
54use sui_indexer_alt_framework as framework;
55use sui_indexer_alt_framework::IndexerArgs;
56use sui_indexer_alt_framework::ingestion::BoxedStreamingClient;
57use sui_indexer_alt_framework::ingestion::IngestionConfig;
58use sui_indexer_alt_framework::ingestion::IngestionService;
59use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClient;
60use sui_indexer_alt_framework::pipeline::CommitterConfig;
61use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
62use sui_indexer_alt_framework::pipeline::sequential::{self};
63use sui_indexer_alt_framework::service::Service;
64use sui_types::base_types::ObjectID;
65use sui_types::digests::ObjectDigest;
66use sui_types::effects::TransactionEffectsAPI;
67use sui_types::full_checkpoint_content::Checkpoint;
68use sui_types::object::Object;
69
70use crate::RpcStoreSchema;
71use crate::config::PipelineLayer;
72use crate::config::PrunerConfig;
73use crate::indexer::pruner::PrunerMetrics;
74
75/// Metrics prefix used for both the framework indexer and the
76/// underlying ingestion service. Surfaced as a constant so the
77/// prefix is consistent across the metrics built in [`Indexer::new`]
78/// and the ones the standalone-binary entry point builds when it
79/// constructs the [`IngestionClient`] / [`IngestionService`] from
80/// `ClientArgs`.
81pub const METRICS_PREFIX: &str = "rpc_store_indexer";
82
83/// The schema parameter the framework's `Store` / pipelines bind
84/// to.
85pub type Schema = RpcStoreSchema;
86
87/// The store type pipelines commit through.
88pub type Store = sui_consistent_store::Store<Schema>;
89
90/// The sequence number of the first transaction in `checkpoint`.
91///
92/// `network_total_transactions` is the cumulative network-wide tx
93/// count *after* this checkpoint executes, so subtracting the
94/// number of transactions the checkpoint contains gives the
95/// `tx_seq` of its first entry.
96pub fn first_tx_seq(checkpoint: &Checkpoint) -> u64 {
97    checkpoint.summary.network_total_transactions - checkpoint.transactions.len() as u64
98}
99
100/// The `tx_seq` of the transaction at index `i` within
101/// `checkpoint`.
102pub fn tx_seq_at(checkpoint: &Checkpoint, i: usize) -> u64 {
103    first_tx_seq(checkpoint) + i as u64
104}
105
106/// First-seen input version of every object that existed before
107/// the checkpoint and was used as an input to some transaction in
108/// it. Mirrors the helper of the same name in
109/// `sui-indexer-alt-consistent-store::handlers`.
110///
111/// Objects created or unwrapped within the checkpoint are
112/// excluded. Used by the diff-based indexes
113/// ([`object_by_owner`] etc.) to
114/// remove the rows that the *prior* state contributed before
115/// re-inserting the rows that the *posterior* state contributes.
116pub fn checkpoint_input_objects(
117    checkpoint: &Checkpoint,
118) -> anyhow::Result<BTreeMap<ObjectID, (&Object, ObjectDigest)>> {
119    let mut from_this_checkpoint = HashSet::new();
120    let mut input_objects = BTreeMap::new();
121    for tx in &checkpoint.transactions {
122        let input_objects_map: BTreeMap<_, _> = tx
123            .input_objects(&checkpoint.object_set)
124            .map(|obj| ((obj.id(), obj.version()), obj))
125            .collect();
126
127        for change in tx.effects.object_changes() {
128            let id = change.id;
129
130            let Some(version) = change.input_version else {
131                continue;
132            };
133
134            if from_this_checkpoint.contains(&id) {
135                continue;
136            }
137
138            let Entry::Vacant(entry) = input_objects.entry(id) else {
139                continue;
140            };
141
142            let input_object = *input_objects_map
143                .get(&(id, version))
144                .with_context(|| format!("{id} at {version} in effects, not in input_objects"))?;
145
146            // Input digests are only populated in Effects V2. For Effects V1, we need to
147            // compute the digest from the input object's contents.
148            let digest = change.input_digest.unwrap_or_else(|| input_object.digest());
149            entry.insert((input_object, digest));
150        }
151
152        for change in tx.effects.object_changes() {
153            if change.output_version.is_some() {
154                from_this_checkpoint.insert(change.id);
155            }
156        }
157    }
158    Ok(input_objects)
159}
160
161/// Last-seen output version of every object that was created or
162/// modified by some transaction in the checkpoint and is still
163/// live at the end. Mirrors the helper of the same name in
164/// `sui-indexer-alt-consistent-store::handlers`.
165///
166/// Used to populate the checkpoint-pinned
167/// [`object_version_by_checkpoint`] index and the diff-based indexes
168/// once the prior state has been retracted.
169pub fn checkpoint_output_objects(
170    checkpoint: &Checkpoint,
171) -> anyhow::Result<BTreeMap<ObjectID, (&Object, ObjectDigest)>> {
172    let mut output_objects = BTreeMap::new();
173    for tx in &checkpoint.transactions {
174        let output_objects_map: BTreeMap<_, _> = tx
175            .output_objects(&checkpoint.object_set)
176            .map(|obj| ((obj.id(), obj.version()), obj))
177            .collect();
178
179        for change in tx.effects.object_changes() {
180            let id = change.id;
181
182            // Clear the previous entry, in case it was created within this checkpoint.
183            output_objects.remove(&id);
184
185            let (Some(version), Some(digest)) = (change.output_version, change.output_digest)
186            else {
187                continue;
188            };
189
190            let output_object = *output_objects_map
191                .get(&(id, version))
192                .with_context(|| format!("{id} at {version} in effects, not in output_objects"))?;
193
194            output_objects.insert(id, (output_object, digest));
195        }
196    }
197    Ok(output_objects)
198}
199
200/// Top-level orchestrator. Wraps a [`framework::Indexer`] over the
201/// [`Store`] for [`RpcStoreSchema`] together with a
202/// [`Synchronizer`] coordinating cross-pipeline snapshots, and
203/// exposes the per-pipeline registration shape this crate needs.
204///
205/// Construct one of two ways:
206///
207/// - [`Indexer::new`] opens the [`Db`] / [`Store`] internally —
208///   typical for the standalone binary path.
209/// - [`Indexer::from_store`] takes an already-opened [`Store`] —
210///   typical for the embedded-fullnode path where the fullnode
211///   shares the underlying database with this indexer for direct
212///   reads (and possibly for its own raw-chain-data writes).
213///
214/// Pipelines are registered through [`Self::add_pipelines`], which
215/// honours the per-pipeline enable/disable knobs encoded in a
216/// [`PipelineLayer`]. Disabled pipelines are skipped entirely —
217/// the [`Synchronizer`] only barriers across pipelines that were
218/// actually registered, so leaving the raw-chain-data pipelines
219/// off does not stall snapshots.
220///
221/// After pipelines are registered, [`Self::run`] installs the
222/// synchronizer onto the store and starts the framework indexer.
223pub struct Indexer {
224    indexer: framework::Indexer<Store>,
225
226    /// Synchronizer coordinating per-pipeline writes against
227    /// cross-pipeline snapshots. Owned here until [`Self::run`]
228    /// hands it to [`sui_consistent_store::Store::install_sync`].
229    sync: Synchronizer,
230
231    /// Pruning policy and its metrics, present when pruning is
232    /// enabled. [`Self::run`] starts the background pruner from
233    /// these and attaches it to the composed service. `None` leaves
234    /// pruning off (the embedded-fullnode and test defaults).
235    pruner: Option<(PrunerConfig, Arc<PrunerMetrics>)>,
236}
237
238impl Indexer {
239    /// Open the database at `path` with [`RpcStoreSchema`] and
240    /// construct an [`Indexer`] backed by it.
241    ///
242    /// `ingestion_client` is the pull-side checkpoint source; the
243    /// optional `streaming_client` is the live-tail source. Callers
244    /// (standalone binary, embedded fullnode) build the
245    /// [`IngestionClient`] via [`IngestionClient::new`] (driven by
246    /// `ClientArgs`) or [`IngestionClient::from_trait`] (wrapping
247    /// a custom [`IngestionClientTrait`]), depending on where
248    /// checkpoints come from. The [`IngestionMetrics`] handle the
249    /// service shares with the client is reused from
250    /// [`IngestionClient::metrics`], avoiding double-registration
251    /// against `registry`.
252    ///
253    /// [`IngestionClientTrait`]: sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientTrait
254    /// [`IngestionMetrics`]: sui_indexer_alt_framework::metrics::IngestionMetrics
255    #[allow(clippy::too_many_arguments)]
256    pub async fn new(
257        path: impl AsRef<Path>,
258        indexer_args: IndexerArgs,
259        ingestion_client: IngestionClient,
260        streaming_client: Option<BoxedStreamingClient>,
261        consistency_config: crate::config::ConsistencyConfig,
262        pruner_config: Option<PrunerConfig>,
263        ingestion_config: IngestionConfig,
264        db_options: DbOptions,
265        registry: &Registry,
266    ) -> anyhow::Result<Self> {
267        let (db, schema) = Db::open::<RpcStoreSchema>(path, db_options)
268            .context("Failed to open sui-rpc-store database")?;
269        let store = sui_consistent_store::Store::new(db, Arc::new(schema));
270        Self::from_store(
271            store,
272            indexer_args,
273            ingestion_client,
274            streaming_client,
275            consistency_config,
276            pruner_config,
277            ingestion_config,
278            registry,
279        )
280        .await
281    }
282
283    /// Variant of [`Self::new`] that takes an already-opened
284    /// [`Store`]. Useful when the caller wants to share the
285    /// underlying [`Db`] with other code in the same process (e.g.
286    /// a fullnode that reads through [`RpcStoreSchema`] directly,
287    /// or writes to the raw-chain-data CFs through a separate
288    /// path).
289    #[allow(clippy::too_many_arguments)]
290    pub async fn from_store(
291        store: Store,
292        indexer_args: IndexerArgs,
293        ingestion_client: IngestionClient,
294        streaming_client: Option<BoxedStreamingClient>,
295        consistency_config: crate::config::ConsistencyConfig,
296        pruner_config: Option<PrunerConfig>,
297        ingestion_config: IngestionConfig,
298        registry: &Registry,
299    ) -> anyhow::Result<Self> {
300        let metrics_prefix = Some(METRICS_PREFIX);
301
302        // Load the persisted pruning watermarks into the
303        // in-memory bitmap floor so the bitmap CFs' compaction
304        // filters resume against the on-disk watermark instead of
305        // the process-default zero (which would prune nothing).
306        store
307            .schema()
308            .refresh_pruning_atomics()
309            .context("Failed to refresh pruning watermarks")?;
310
311        let sync = Synchronizer::new(
312            store.db().clone(),
313            consistency_config.buffer_size,
314            indexer_args.first_checkpoint,
315        );
316
317        let ingestion_metrics = ingestion_client.metrics().clone();
318        let ingestion_service = IngestionService::with_clients(
319            ingestion_client,
320            streaming_client,
321            ingestion_config,
322            ingestion_metrics,
323        );
324
325        let indexer = framework::Indexer::with_ingestion_service(
326            store,
327            indexer_args,
328            ingestion_service,
329            metrics_prefix,
330            registry,
331        )
332        .await
333        .context("Failed to construct framework indexer")?;
334
335        // Register the pruner's metrics under its own prefix when
336        // pruning is enabled; `run` starts the task from this.
337        let pruner = pruner_config.map(|config| (config, PrunerMetrics::new(None, registry)));
338
339        Ok(Self {
340            indexer,
341            sync,
342            pruner,
343        })
344    }
345
346    /// Borrow the wrapped framework indexer's store. Useful for
347    /// embedded callers that want a read handle pointed at the
348    /// same [`RpcStoreSchema`] this orchestrator is writing to.
349    pub fn store(&self) -> &Store {
350        self.indexer.store()
351    }
352
353    /// Iterate over the names of every pipeline that has been
354    /// registered with this indexer and is enabled (i.e. not
355    /// filtered out by `IndexerArgs::pipeline`). Useful for
356    /// asserting which pipelines are active before [`Self::run`]
357    /// is called.
358    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
359        self.indexer.pipelines()
360    }
361
362    /// Register every pipeline that is `Some(_)` in `layer`. The
363    /// supplied [`CommitterConfig`] acts as the shared base; each
364    /// pipeline's [`CommitterLayer`] overrides individual fields.
365    ///
366    /// Skipped (`None`) pipelines are not registered with the
367    /// [`Synchronizer`] either, so its snapshot barrier still
368    /// proceeds without them.
369    ///
370    /// [`CommitterLayer`]: crate::config::CommitterLayer
371    pub async fn add_pipelines(
372        &mut self,
373        layer: PipelineLayer,
374        committer: CommitterConfig,
375    ) -> anyhow::Result<()> {
376        let PipelineLayer {
377            epochs,
378            checkpoint_summary,
379            checkpoint_contents,
380            checkpoint_seq_by_digest,
381            transactions,
382            tx_seq_by_digest,
383            tx_metadata_by_seq,
384            effects,
385            events,
386            objects,
387            object_version_by_checkpoint,
388            object_by_owner,
389            object_by_type,
390            balance,
391            package_versions,
392            transaction_bitmap,
393            event_bitmap,
394        } = layer;
395
396        macro_rules! add {
397            ($handler:expr, $cfg:expr) => {
398                if let Some(layer) = $cfg {
399                    self.sequential_pipeline(
400                        $handler,
401                        SequentialConfig {
402                            committer: layer.finish(committer.clone()),
403                            // The synchronizer requires one
404                            // checkpoint per write batch; folding
405                            // multiple checkpoints into one batch
406                            // trips its out-of-order check.
407                            max_batch_checkpoints: Some(1),
408                            ..Default::default()
409                        },
410                    )
411                    .await?
412                }
413            };
414        }
415
416        // Raw chain data.
417        add!(self::epochs::Epochs, epochs);
418        add!(
419            self::checkpoint_summary::CheckpointSummary,
420            checkpoint_summary
421        );
422        add!(
423            self::checkpoint_contents::CheckpointContents,
424            checkpoint_contents
425        );
426        add!(
427            self::checkpoint_seq_by_digest::CheckpointSeqByDigest,
428            checkpoint_seq_by_digest
429        );
430        add!(self::transactions::Transactions, transactions);
431        add!(self::tx_seq_by_digest::TxSeqByDigest, tx_seq_by_digest);
432        add!(
433            self::tx_metadata_by_seq::TxMetadataBySeq,
434            tx_metadata_by_seq
435        );
436        add!(self::effects::Effects, effects);
437        add!(self::events::Events, events);
438        add!(self::objects::Objects, objects);
439        // `object_version_by_checkpoint` needs its restore anchor `T` so
440        // its processor scopes floor candidates to the backfill window
441        // `[L, T]`. The restore (if any) has already run by the time
442        // pipelines are registered, so read it once here.
443        let ovbc_anchor = self::object_version_by_checkpoint::restored_anchor(self.store().db())?;
444        add!(
445            self::object_version_by_checkpoint::ObjectVersionByCheckpoint::with_anchor(ovbc_anchor),
446            object_version_by_checkpoint
447        );
448
449        // Indexes.
450        add!(self::object_by_owner::ObjectByOwner, object_by_owner);
451        add!(self::object_by_type::ObjectByType, object_by_type);
452        add!(self::balance::Balance, balance);
453        add!(self::package_versions::PackageVersions, package_versions);
454        add!(
455            self::transaction_bitmap::TransactionBitmap,
456            transaction_bitmap
457        );
458        add!(self::event_bitmap::EventBitmap, event_bitmap);
459
460        Ok(())
461    }
462
463    /// Register the checkpoint-broadcast pipeline, which re-publishes
464    /// each committed checkpoint to `sender` in checkpoint order (see
465    /// [`checkpoint_broadcast`]). Call alongside [`Self::add_pipelines`]
466    /// and before [`Self::run`].
467    ///
468    /// Kept separate from [`Self::add_pipelines`] because it carries a
469    /// runtime `broadcast::Sender` rather than a [`PipelineLayer`]
470    /// toggle — only the standalone `sui-rpc-node`, which hosts the
471    /// subscription service, registers it; the embedded fullnode feeds
472    /// its subscription service from the checkpoint executor instead.
473    ///
474    /// As with every synchronizer-coordinated pipeline, registers with
475    /// `max_batch_checkpoints = 1` so each `commit` (and thus each
476    /// broadcast) is exactly one checkpoint.
477    pub async fn add_checkpoint_broadcast(
478        &mut self,
479        sender: tokio::sync::broadcast::Sender<Arc<Checkpoint>>,
480        committer: CommitterConfig,
481    ) -> anyhow::Result<()> {
482        self.sequential_pipeline(
483            self::checkpoint_broadcast::CheckpointBroadcast::new(sender),
484            SequentialConfig {
485                committer,
486                max_batch_checkpoints: Some(1),
487                ..Default::default()
488            },
489        )
490        .await
491    }
492
493    /// Register a single sequential pipeline. The pipeline is
494    /// announced to the synchronizer before being handed to the
495    /// framework indexer so that, by the time the first batch
496    /// flows through, the synchronizer task is already waiting on
497    /// the pipeline's queue.
498    ///
499    /// Refuses to register a pipeline whose persisted [`RestoreState`]
500    /// is still `InProgress`: tip-mode indexing of a pipeline that
501    /// has not finished restoring would commit checkpoints atop a
502    /// partial bulk-load, producing an inconsistent CF. Pipelines
503    /// with no restore row (never restored) and pipelines marked
504    /// `Complete` are allowed.
505    ///
506    /// [`RestoreState`]: sui_consistent_store::RestoreState
507    async fn sequential_pipeline<H>(
508        &mut self,
509        handler: H,
510        config: SequentialConfig,
511    ) -> anyhow::Result<()>
512    where
513        H: sequential::Handler<Store = Store> + Send + Sync + 'static,
514    {
515        let restore_state = self
516            .store()
517            .db()
518            .framework()
519            .restore
520            .get(&PipelineTaskKey::new(H::NAME))
521            .with_context(|| format!("Reading restore state for pipeline {:?}", H::NAME))?;
522
523        if let Some(state) = restore_state.as_ref().and_then(|s| s.state.as_ref()) {
524            match state {
525                restore_state::State::InProgress(_) => {
526                    anyhow::bail!("Restoration in progress for pipeline {:?}", H::NAME);
527                }
528                restore_state::State::Complete(_) => {
529                    // Restore finished — tip indexing may proceed.
530                }
531            }
532        }
533
534        self.sync
535            .register_pipeline(H::NAME)
536            .with_context(|| format!("Failed to add pipeline {:?} to synchronizer", H::NAME))?;
537
538        self.indexer
539            .sequential_pipeline(handler, config)
540            .await
541            .with_context(|| format!("Failed to add pipeline {:?} to indexer", H::NAME))?;
542
543        Ok(())
544    }
545
546    /// Install the synchronizer onto the store and start the
547    /// framework indexer. Returns a composed [`Service`] handle
548    /// that drives both for the lifetime of the indexer.
549    pub async fn run(self) -> anyhow::Result<Service> {
550        let Self {
551            indexer,
552            sync,
553            pruner: pruner_setup,
554        } = self;
555
556        // Capture a `Db` handle for the pruner before `indexer.run`
557        // consumes the framework indexer (and with it the store).
558        let db = indexer.store().db().clone();
559
560        let mut sync_join_set = indexer
561            .store()
562            .install_sync(sync)
563            .context("Failed to install synchronizer onto store")?;
564
565        // Wrap the synchronizer's JoinSet in a `Service` task so it
566        // composes with the framework indexer's service via
567        // `attach`. Per-pipeline tasks exit naturally once their
568        // mpsc senders (held in the store's `Queue`) are dropped,
569        // which happens on the framework indexer's shutdown.
570        let s_sync = Service::new().spawn(async move {
571            while let Some(res) = sync_join_set.join_next().await {
572                res.context("Synchronizer task panicked")??;
573            }
574            Ok(())
575        });
576
577        let s_indexer = indexer.run().await?;
578        let mut service = s_indexer.attach(s_sync);
579
580        // Attach the background pruner as a secondary task when
581        // configured: it advances the retention floor and deletes
582        // history without extending the indexer's lifetime.
583        if let Some((config, metrics)) = pruner_setup {
584            let s_pruner = pruner::start_pruner(db, config, metrics)
585                .context("Failed to start the rpc-store pruner")?;
586            service = service.attach(s_pruner);
587        }
588
589        Ok(service)
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    use async_trait::async_trait;
596    use sui_indexer_alt_framework::ingestion::ingestion_client::CheckpointError;
597    use sui_indexer_alt_framework::ingestion::ingestion_client::CheckpointResult;
598    use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientTrait;
599    use sui_indexer_alt_framework::metrics::IngestionMetrics;
600    use sui_types::digests::ChainIdentifier;
601
602    use super::*;
603
604    /// Stub [`IngestionClientTrait`] for orchestrator wiring
605    /// tests. Reports a fixed chain id and latest checkpoint and
606    /// fails any actual fetch; suitable for tests that only need
607    /// `Indexer::from_store` to construct (which probes
608    /// `latest_checkpoint_number` once) and never run the
609    /// ingestion loop.
610    struct StubIngestionClient;
611
612    #[async_trait]
613    impl IngestionClientTrait for StubIngestionClient {
614        async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
615            Ok(ChainIdentifier::from(
616                sui_types::digests::CheckpointDigest::new([0u8; 32]),
617            ))
618        }
619
620        async fn checkpoint(&self, _checkpoint: u64) -> CheckpointResult {
621            Err(CheckpointError::NotFound)
622        }
623
624        async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
625            Ok(0)
626        }
627    }
628
629    async fn build_indexer(layer: PipelineLayer) -> Indexer {
630        let dir = tempfile::tempdir().unwrap();
631        let registry = Registry::new();
632        let ingestion_metrics = IngestionMetrics::new(Some(METRICS_PREFIX), &registry);
633        let ingestion_client =
634            IngestionClient::from_trait(Arc::new(StubIngestionClient), ingestion_metrics);
635        let mut indexer = Indexer::new(
636            dir.path().join("db"),
637            IndexerArgs::default(),
638            ingestion_client,
639            None,
640            crate::config::ConsistencyConfig::default(),
641            None,
642            IngestionConfig::default(),
643            DbOptions::default(),
644            &registry,
645        )
646        .await
647        .expect("Indexer::new");
648
649        indexer
650            .add_pipelines(layer, CommitterConfig::default())
651            .await
652            .expect("add_pipelines");
653
654        // Keep the tempdir alive for the duration of the test by
655        // leaking it — the Indexer holds the DB open, and we want
656        // the path to survive until the Indexer is dropped.
657        std::mem::forget(dir);
658        indexer
659    }
660
661    /// `embedded` registers exactly the ten embedded-cohort
662    /// pipelines (three live + seven history) and none of the
663    /// deactivated raw-chain-data ones, so the synchronizer's
664    /// snapshot cohort covers exactly those. Pinned to the
665    /// [`LIVE_COHORT`] / [`HISTORY_COHORT`] constants (via the real
666    /// `Processor::NAME`s the indexer registers) so the layer and the
667    /// restore/seed cohorts cannot drift apart.
668    ///
669    /// [`LIVE_COHORT`]: crate::indexer::restore::LIVE_COHORT
670    /// [`HISTORY_COHORT`]: crate::indexer::restore::HISTORY_COHORT
671    #[tokio::test]
672    async fn embedded_registers_only_cohort_pipelines() {
673        let indexer = build_indexer(PipelineLayer::embedded()).await;
674        let names: std::collections::BTreeSet<_> = indexer.pipelines().collect();
675        let expected: std::collections::BTreeSet<_> = crate::indexer::restore::LIVE_COHORT
676            .iter()
677            .chain(crate::indexer::restore::HISTORY_COHORT)
678            .copied()
679            .collect();
680        assert_eq!(names, expected);
681    }
682
683    /// `all` registers every pipeline (raw chain data + indexes).
684    #[tokio::test]
685    async fn all_registers_every_pipeline() {
686        let indexer = build_indexer(PipelineLayer::all()).await;
687        let names: std::collections::BTreeSet<_> = indexer.pipelines().collect();
688        assert_eq!(
689            names,
690            std::collections::BTreeSet::from([
691                // Raw chain data.
692                "epochs",
693                "checkpoint_summary",
694                "checkpoint_contents",
695                "checkpoint_seq_by_digest",
696                "transactions",
697                "tx_seq_by_digest",
698                "tx_metadata_by_seq",
699                "effects",
700                "events",
701                "objects",
702                "object_version_by_checkpoint",
703                // Indexes.
704                "object_by_owner",
705                "object_by_type",
706                "balance",
707                "package_versions",
708                "transaction_bitmap",
709                "event_bitmap",
710            ])
711        );
712    }
713
714    /// Open the rpc-store DB at `path` and write a single
715    /// pre-existing `RestoreState` entry for `pipeline` directly
716    /// to the framework's `__restore` CF. Returns the [`Store`]
717    /// the orchestrator should pick up. Used by the restore-guard
718    /// tests below to seed an `InProgress` or `Complete` state
719    /// before [`Indexer::from_store`] runs.
720    fn open_with_seeded_restore(
721        path: &std::path::Path,
722        pipeline: &str,
723        state: sui_consistent_store::RestoreState,
724    ) -> Store {
725        let (db, schema) = Db::open::<RpcStoreSchema>(path, DbOptions::default()).unwrap();
726        let framework = sui_consistent_store::FrameworkSchema::new(db.clone());
727        let mut batch = db.batch();
728        batch
729            .put(&framework.restore, &PipelineTaskKey::new(pipeline), &state)
730            .unwrap();
731        batch.commit().unwrap();
732        sui_consistent_store::Store::new(db, Arc::new(schema))
733    }
734
735    async fn build_indexer_with_store(store: Store) -> anyhow::Result<Indexer> {
736        let registry = Registry::new();
737        let ingestion_metrics = IngestionMetrics::new(Some(METRICS_PREFIX), &registry);
738        let ingestion_client =
739            IngestionClient::from_trait(Arc::new(StubIngestionClient), ingestion_metrics);
740        Indexer::from_store(
741            store,
742            IndexerArgs::default(),
743            ingestion_client,
744            None,
745            crate::config::ConsistencyConfig::default(),
746            None,
747            IngestionConfig::default(),
748            &registry,
749        )
750        .await
751    }
752
753    #[tokio::test]
754    async fn add_pipelines_refuses_pipeline_with_in_progress_restore() {
755        let dir = tempfile::tempdir().unwrap();
756        let in_progress = sui_consistent_store::RestoreState {
757            state: Some(restore_state::State::InProgress(
758                restore_state::InProgress::default(),
759            )),
760        };
761        let store = open_with_seeded_restore(&dir.path().join("db"), "balance", in_progress);
762
763        let mut indexer = build_indexer_with_store(store).await.unwrap();
764        let err = indexer
765            .add_pipelines(
766                PipelineLayer {
767                    balance: Some(crate::config::CommitterLayer::default()),
768                    ..PipelineLayer::default()
769                },
770                CommitterConfig::default(),
771            )
772            .await
773            .unwrap_err();
774        assert!(
775            format!("{err:#}").contains("Restoration in progress for pipeline"),
776            "expected restore-in-progress error, got: {err:#}",
777        );
778    }
779
780    #[tokio::test]
781    async fn add_pipelines_allows_pipeline_with_completed_restore() {
782        let dir = tempfile::tempdir().unwrap();
783        let complete = sui_consistent_store::RestoreState {
784            state: Some(restore_state::State::Complete(restore_state::Complete {
785                restored_at: 42,
786            })),
787        };
788        let store = open_with_seeded_restore(&dir.path().join("db"), "balance", complete);
789
790        let mut indexer = build_indexer_with_store(store).await.unwrap();
791        indexer
792            .add_pipelines(
793                PipelineLayer {
794                    balance: Some(crate::config::CommitterLayer::default()),
795                    ..PipelineLayer::default()
796                },
797                CommitterConfig::default(),
798            )
799            .await
800            .unwrap();
801        let names: std::collections::BTreeSet<_> = indexer.pipelines().collect();
802        assert_eq!(names, std::collections::BTreeSet::from(["balance"]));
803    }
804}