1pub mod balance;
21pub mod checkpoint_broadcast;
22pub mod checkpoint_contents;
23pub mod checkpoint_seq_by_digest;
24pub mod checkpoint_summary;
25pub mod effects;
26pub mod epochs;
27pub mod event_bitmap;
28pub mod events;
29pub mod object_by_owner;
30pub mod object_by_type;
31pub mod object_version_by_checkpoint;
32pub mod objects;
33pub mod package_versions;
34pub mod pruner;
35pub mod restore;
36pub mod transaction_bitmap;
37pub mod transactions;
38pub mod tx_metadata_by_seq;
39pub mod tx_seq_by_digest;
40
41use std::collections::BTreeMap;
42use std::collections::HashSet;
43use std::collections::btree_map::Entry;
44use std::path::Path;
45use std::sync::Arc;
46
47use anyhow::Context as _;
48use prometheus::Registry;
49use sui_consistent_store::Db;
50use sui_consistent_store::DbOptions;
51use sui_consistent_store::PipelineTaskKey;
52use sui_consistent_store::Synchronizer;
53use sui_consistent_store::restore_state;
54use sui_indexer_alt_framework as framework;
55use sui_indexer_alt_framework::IndexerArgs;
56use sui_indexer_alt_framework::ingestion::BoxedStreamingClient;
57use sui_indexer_alt_framework::ingestion::IngestionConfig;
58use sui_indexer_alt_framework::ingestion::IngestionService;
59use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClient;
60use sui_indexer_alt_framework::pipeline::CommitterConfig;
61use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
62use sui_indexer_alt_framework::pipeline::sequential::{self};
63use sui_indexer_alt_framework::service::Service;
64use sui_types::base_types::ObjectID;
65use sui_types::digests::ObjectDigest;
66use sui_types::effects::TransactionEffectsAPI;
67use sui_types::full_checkpoint_content::Checkpoint;
68use sui_types::object::Object;
69
70use crate::RpcStoreSchema;
71use crate::config::PipelineLayer;
72use crate::config::PrunerConfig;
73use crate::indexer::pruner::PrunerMetrics;
74
75pub const METRICS_PREFIX: &str = "rpc_store_indexer";
82
83pub type Schema = RpcStoreSchema;
86
87pub type Store = sui_consistent_store::Store<Schema>;
89
90pub fn first_tx_seq(checkpoint: &Checkpoint) -> u64 {
97 checkpoint.summary.network_total_transactions - checkpoint.transactions.len() as u64
98}
99
100pub fn tx_seq_at(checkpoint: &Checkpoint, i: usize) -> u64 {
103 first_tx_seq(checkpoint) + i as u64
104}
105
106pub fn checkpoint_input_objects(
117 checkpoint: &Checkpoint,
118) -> anyhow::Result<BTreeMap<ObjectID, (&Object, ObjectDigest)>> {
119 let mut from_this_checkpoint = HashSet::new();
120 let mut input_objects = BTreeMap::new();
121 for tx in &checkpoint.transactions {
122 let input_objects_map: BTreeMap<_, _> = tx
123 .input_objects(&checkpoint.object_set)
124 .map(|obj| ((obj.id(), obj.version()), obj))
125 .collect();
126
127 for change in tx.effects.object_changes() {
128 let id = change.id;
129
130 let Some(version) = change.input_version else {
131 continue;
132 };
133
134 if from_this_checkpoint.contains(&id) {
135 continue;
136 }
137
138 let Entry::Vacant(entry) = input_objects.entry(id) else {
139 continue;
140 };
141
142 let input_object = *input_objects_map
143 .get(&(id, version))
144 .with_context(|| format!("{id} at {version} in effects, not in input_objects"))?;
145
146 let digest = change.input_digest.unwrap_or_else(|| input_object.digest());
149 entry.insert((input_object, digest));
150 }
151
152 for change in tx.effects.object_changes() {
153 if change.output_version.is_some() {
154 from_this_checkpoint.insert(change.id);
155 }
156 }
157 }
158 Ok(input_objects)
159}
160
161pub fn checkpoint_output_objects(
170 checkpoint: &Checkpoint,
171) -> anyhow::Result<BTreeMap<ObjectID, (&Object, ObjectDigest)>> {
172 let mut output_objects = BTreeMap::new();
173 for tx in &checkpoint.transactions {
174 let output_objects_map: BTreeMap<_, _> = tx
175 .output_objects(&checkpoint.object_set)
176 .map(|obj| ((obj.id(), obj.version()), obj))
177 .collect();
178
179 for change in tx.effects.object_changes() {
180 let id = change.id;
181
182 output_objects.remove(&id);
184
185 let (Some(version), Some(digest)) = (change.output_version, change.output_digest)
186 else {
187 continue;
188 };
189
190 let output_object = *output_objects_map
191 .get(&(id, version))
192 .with_context(|| format!("{id} at {version} in effects, not in output_objects"))?;
193
194 output_objects.insert(id, (output_object, digest));
195 }
196 }
197 Ok(output_objects)
198}
199
200pub struct Indexer {
224 indexer: framework::Indexer<Store>,
225
226 sync: Synchronizer,
230
231 pruner: Option<(PrunerConfig, Arc<PrunerMetrics>)>,
236}
237
238impl Indexer {
239 #[allow(clippy::too_many_arguments)]
256 pub async fn new(
257 path: impl AsRef<Path>,
258 indexer_args: IndexerArgs,
259 ingestion_client: IngestionClient,
260 streaming_client: Option<BoxedStreamingClient>,
261 consistency_config: crate::config::ConsistencyConfig,
262 pruner_config: Option<PrunerConfig>,
263 ingestion_config: IngestionConfig,
264 db_options: DbOptions,
265 registry: &Registry,
266 ) -> anyhow::Result<Self> {
267 let (db, schema) = Db::open::<RpcStoreSchema>(path, db_options)
268 .context("Failed to open sui-rpc-store database")?;
269 let store = sui_consistent_store::Store::new(db, Arc::new(schema));
270 Self::from_store(
271 store,
272 indexer_args,
273 ingestion_client,
274 streaming_client,
275 consistency_config,
276 pruner_config,
277 ingestion_config,
278 registry,
279 )
280 .await
281 }
282
283 #[allow(clippy::too_many_arguments)]
290 pub async fn from_store(
291 store: Store,
292 indexer_args: IndexerArgs,
293 ingestion_client: IngestionClient,
294 streaming_client: Option<BoxedStreamingClient>,
295 consistency_config: crate::config::ConsistencyConfig,
296 pruner_config: Option<PrunerConfig>,
297 ingestion_config: IngestionConfig,
298 registry: &Registry,
299 ) -> anyhow::Result<Self> {
300 let metrics_prefix = Some(METRICS_PREFIX);
301
302 store
307 .schema()
308 .refresh_pruning_atomics()
309 .context("Failed to refresh pruning watermarks")?;
310
311 let sync = Synchronizer::new(
312 store.db().clone(),
313 consistency_config.buffer_size,
314 indexer_args.first_checkpoint,
315 );
316
317 let ingestion_metrics = ingestion_client.metrics().clone();
318 let ingestion_service = IngestionService::with_clients(
319 ingestion_client,
320 streaming_client,
321 ingestion_config,
322 ingestion_metrics,
323 );
324
325 let indexer = framework::Indexer::with_ingestion_service(
326 store,
327 indexer_args,
328 ingestion_service,
329 metrics_prefix,
330 registry,
331 )
332 .await
333 .context("Failed to construct framework indexer")?;
334
335 let pruner = pruner_config.map(|config| (config, PrunerMetrics::new(None, registry)));
338
339 Ok(Self {
340 indexer,
341 sync,
342 pruner,
343 })
344 }
345
346 pub fn store(&self) -> &Store {
350 self.indexer.store()
351 }
352
353 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
359 self.indexer.pipelines()
360 }
361
362 pub async fn add_pipelines(
372 &mut self,
373 layer: PipelineLayer,
374 committer: CommitterConfig,
375 ) -> anyhow::Result<()> {
376 let PipelineLayer {
377 epochs,
378 checkpoint_summary,
379 checkpoint_contents,
380 checkpoint_seq_by_digest,
381 transactions,
382 tx_seq_by_digest,
383 tx_metadata_by_seq,
384 effects,
385 events,
386 objects,
387 object_version_by_checkpoint,
388 object_by_owner,
389 object_by_type,
390 balance,
391 package_versions,
392 transaction_bitmap,
393 event_bitmap,
394 } = layer;
395
396 macro_rules! add {
397 ($handler:expr, $cfg:expr) => {
398 if let Some(layer) = $cfg {
399 self.sequential_pipeline(
400 $handler,
401 SequentialConfig {
402 committer: layer.finish(committer.clone()),
403 max_batch_checkpoints: Some(1),
408 ..Default::default()
409 },
410 )
411 .await?
412 }
413 };
414 }
415
416 add!(self::epochs::Epochs, epochs);
418 add!(
419 self::checkpoint_summary::CheckpointSummary,
420 checkpoint_summary
421 );
422 add!(
423 self::checkpoint_contents::CheckpointContents,
424 checkpoint_contents
425 );
426 add!(
427 self::checkpoint_seq_by_digest::CheckpointSeqByDigest,
428 checkpoint_seq_by_digest
429 );
430 add!(self::transactions::Transactions, transactions);
431 add!(self::tx_seq_by_digest::TxSeqByDigest, tx_seq_by_digest);
432 add!(
433 self::tx_metadata_by_seq::TxMetadataBySeq,
434 tx_metadata_by_seq
435 );
436 add!(self::effects::Effects, effects);
437 add!(self::events::Events, events);
438 add!(self::objects::Objects, objects);
439 let ovbc_anchor = self::object_version_by_checkpoint::restored_anchor(self.store().db())?;
444 add!(
445 self::object_version_by_checkpoint::ObjectVersionByCheckpoint::with_anchor(ovbc_anchor),
446 object_version_by_checkpoint
447 );
448
449 add!(self::object_by_owner::ObjectByOwner, object_by_owner);
451 add!(self::object_by_type::ObjectByType, object_by_type);
452 add!(self::balance::Balance, balance);
453 add!(self::package_versions::PackageVersions, package_versions);
454 add!(
455 self::transaction_bitmap::TransactionBitmap,
456 transaction_bitmap
457 );
458 add!(self::event_bitmap::EventBitmap, event_bitmap);
459
460 Ok(())
461 }
462
463 pub async fn add_checkpoint_broadcast(
478 &mut self,
479 sender: tokio::sync::broadcast::Sender<Arc<Checkpoint>>,
480 committer: CommitterConfig,
481 ) -> anyhow::Result<()> {
482 self.sequential_pipeline(
483 self::checkpoint_broadcast::CheckpointBroadcast::new(sender),
484 SequentialConfig {
485 committer,
486 max_batch_checkpoints: Some(1),
487 ..Default::default()
488 },
489 )
490 .await
491 }
492
493 async fn sequential_pipeline<H>(
508 &mut self,
509 handler: H,
510 config: SequentialConfig,
511 ) -> anyhow::Result<()>
512 where
513 H: sequential::Handler<Store = Store> + Send + Sync + 'static,
514 {
515 let restore_state = self
516 .store()
517 .db()
518 .framework()
519 .restore
520 .get(&PipelineTaskKey::new(H::NAME))
521 .with_context(|| format!("Reading restore state for pipeline {:?}", H::NAME))?;
522
523 if let Some(state) = restore_state.as_ref().and_then(|s| s.state.as_ref()) {
524 match state {
525 restore_state::State::InProgress(_) => {
526 anyhow::bail!("Restoration in progress for pipeline {:?}", H::NAME);
527 }
528 restore_state::State::Complete(_) => {
529 }
531 }
532 }
533
534 self.sync
535 .register_pipeline(H::NAME)
536 .with_context(|| format!("Failed to add pipeline {:?} to synchronizer", H::NAME))?;
537
538 self.indexer
539 .sequential_pipeline(handler, config)
540 .await
541 .with_context(|| format!("Failed to add pipeline {:?} to indexer", H::NAME))?;
542
543 Ok(())
544 }
545
546 pub async fn run(self) -> anyhow::Result<Service> {
550 let Self {
551 indexer,
552 sync,
553 pruner: pruner_setup,
554 } = self;
555
556 let db = indexer.store().db().clone();
559
560 let mut sync_join_set = indexer
561 .store()
562 .install_sync(sync)
563 .context("Failed to install synchronizer onto store")?;
564
565 let s_sync = Service::new().spawn(async move {
571 while let Some(res) = sync_join_set.join_next().await {
572 res.context("Synchronizer task panicked")??;
573 }
574 Ok(())
575 });
576
577 let s_indexer = indexer.run().await?;
578 let mut service = s_indexer.attach(s_sync);
579
580 if let Some((config, metrics)) = pruner_setup {
584 let s_pruner = pruner::start_pruner(db, config, metrics)
585 .context("Failed to start the rpc-store pruner")?;
586 service = service.attach(s_pruner);
587 }
588
589 Ok(service)
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 use async_trait::async_trait;
596 use sui_indexer_alt_framework::ingestion::ingestion_client::CheckpointError;
597 use sui_indexer_alt_framework::ingestion::ingestion_client::CheckpointResult;
598 use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientTrait;
599 use sui_indexer_alt_framework::metrics::IngestionMetrics;
600 use sui_types::digests::ChainIdentifier;
601
602 use super::*;
603
604 struct StubIngestionClient;
611
612 #[async_trait]
613 impl IngestionClientTrait for StubIngestionClient {
614 async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
615 Ok(ChainIdentifier::from(
616 sui_types::digests::CheckpointDigest::new([0u8; 32]),
617 ))
618 }
619
620 async fn checkpoint(&self, _checkpoint: u64) -> CheckpointResult {
621 Err(CheckpointError::NotFound)
622 }
623
624 async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
625 Ok(0)
626 }
627 }
628
629 async fn build_indexer(layer: PipelineLayer) -> Indexer {
630 let dir = tempfile::tempdir().unwrap();
631 let registry = Registry::new();
632 let ingestion_metrics = IngestionMetrics::new(Some(METRICS_PREFIX), ®istry);
633 let ingestion_client =
634 IngestionClient::from_trait(Arc::new(StubIngestionClient), ingestion_metrics);
635 let mut indexer = Indexer::new(
636 dir.path().join("db"),
637 IndexerArgs::default(),
638 ingestion_client,
639 None,
640 crate::config::ConsistencyConfig::default(),
641 None,
642 IngestionConfig::default(),
643 DbOptions::default(),
644 ®istry,
645 )
646 .await
647 .expect("Indexer::new");
648
649 indexer
650 .add_pipelines(layer, CommitterConfig::default())
651 .await
652 .expect("add_pipelines");
653
654 std::mem::forget(dir);
658 indexer
659 }
660
661 #[tokio::test]
672 async fn embedded_registers_only_cohort_pipelines() {
673 let indexer = build_indexer(PipelineLayer::embedded()).await;
674 let names: std::collections::BTreeSet<_> = indexer.pipelines().collect();
675 let expected: std::collections::BTreeSet<_> = crate::indexer::restore::LIVE_COHORT
676 .iter()
677 .chain(crate::indexer::restore::HISTORY_COHORT)
678 .copied()
679 .collect();
680 assert_eq!(names, expected);
681 }
682
683 #[tokio::test]
685 async fn all_registers_every_pipeline() {
686 let indexer = build_indexer(PipelineLayer::all()).await;
687 let names: std::collections::BTreeSet<_> = indexer.pipelines().collect();
688 assert_eq!(
689 names,
690 std::collections::BTreeSet::from([
691 "epochs",
693 "checkpoint_summary",
694 "checkpoint_contents",
695 "checkpoint_seq_by_digest",
696 "transactions",
697 "tx_seq_by_digest",
698 "tx_metadata_by_seq",
699 "effects",
700 "events",
701 "objects",
702 "object_version_by_checkpoint",
703 "object_by_owner",
705 "object_by_type",
706 "balance",
707 "package_versions",
708 "transaction_bitmap",
709 "event_bitmap",
710 ])
711 );
712 }
713
714 fn open_with_seeded_restore(
721 path: &std::path::Path,
722 pipeline: &str,
723 state: sui_consistent_store::RestoreState,
724 ) -> Store {
725 let (db, schema) = Db::open::<RpcStoreSchema>(path, DbOptions::default()).unwrap();
726 let framework = sui_consistent_store::FrameworkSchema::new(db.clone());
727 let mut batch = db.batch();
728 batch
729 .put(&framework.restore, &PipelineTaskKey::new(pipeline), &state)
730 .unwrap();
731 batch.commit().unwrap();
732 sui_consistent_store::Store::new(db, Arc::new(schema))
733 }
734
735 async fn build_indexer_with_store(store: Store) -> anyhow::Result<Indexer> {
736 let registry = Registry::new();
737 let ingestion_metrics = IngestionMetrics::new(Some(METRICS_PREFIX), ®istry);
738 let ingestion_client =
739 IngestionClient::from_trait(Arc::new(StubIngestionClient), ingestion_metrics);
740 Indexer::from_store(
741 store,
742 IndexerArgs::default(),
743 ingestion_client,
744 None,
745 crate::config::ConsistencyConfig::default(),
746 None,
747 IngestionConfig::default(),
748 ®istry,
749 )
750 .await
751 }
752
753 #[tokio::test]
754 async fn add_pipelines_refuses_pipeline_with_in_progress_restore() {
755 let dir = tempfile::tempdir().unwrap();
756 let in_progress = sui_consistent_store::RestoreState {
757 state: Some(restore_state::State::InProgress(
758 restore_state::InProgress::default(),
759 )),
760 };
761 let store = open_with_seeded_restore(&dir.path().join("db"), "balance", in_progress);
762
763 let mut indexer = build_indexer_with_store(store).await.unwrap();
764 let err = indexer
765 .add_pipelines(
766 PipelineLayer {
767 balance: Some(crate::config::CommitterLayer::default()),
768 ..PipelineLayer::default()
769 },
770 CommitterConfig::default(),
771 )
772 .await
773 .unwrap_err();
774 assert!(
775 format!("{err:#}").contains("Restoration in progress for pipeline"),
776 "expected restore-in-progress error, got: {err:#}",
777 );
778 }
779
780 #[tokio::test]
781 async fn add_pipelines_allows_pipeline_with_completed_restore() {
782 let dir = tempfile::tempdir().unwrap();
783 let complete = sui_consistent_store::RestoreState {
784 state: Some(restore_state::State::Complete(restore_state::Complete {
785 restored_at: 42,
786 })),
787 };
788 let store = open_with_seeded_restore(&dir.path().join("db"), "balance", complete);
789
790 let mut indexer = build_indexer_with_store(store).await.unwrap();
791 indexer
792 .add_pipelines(
793 PipelineLayer {
794 balance: Some(crate::config::CommitterLayer::default()),
795 ..PipelineLayer::default()
796 },
797 CommitterConfig::default(),
798 )
799 .await
800 .unwrap();
801 let names: std::collections::BTreeSet<_> = indexer.pipelines().collect();
802 assert_eq!(names, std::collections::BTreeSet::from(["balance"]));
803 }
804}