sui_indexer_alt/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4pub use crate::bootstrap::BootstrapGenesis;
5use anyhow::Context;
6use bootstrap::bootstrap;
7use config::{IndexerConfig, PipelineLayer};
8use handlers::{
9    coin_balance_buckets::CoinBalanceBuckets, cp_sequence_numbers::CpSequenceNumbers,
10    ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
11    kv_epoch_ends::KvEpochEnds, kv_epoch_starts::KvEpochStarts, kv_feature_flags::KvFeatureFlags,
12    kv_objects::KvObjects, kv_packages::KvPackages, kv_protocol_configs::KvProtocolConfigs,
13    kv_transactions::KvTransactions, obj_info::ObjInfo, obj_versions::ObjVersions,
14    sum_displays::SumDisplays, tx_affected_addresses::TxAffectedAddresses,
15    tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges,
16    tx_calls::TxCalls, tx_digests::TxDigests, tx_kinds::TxKinds,
17};
18use prometheus::Registry;
19use sui_indexer_alt_framework::{
20    Indexer, IndexerArgs,
21    ingestion::{ClientArgs, IngestionConfig},
22    pipeline::{
23        CommitterConfig,
24        concurrent::{ConcurrentConfig, PrunerConfig},
25        sequential::SequentialConfig,
26    },
27    postgres::{Db, DbArgs},
28};
29use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
30use sui_indexer_alt_schema::MIGRATIONS;
31use url::Url;
32
33pub mod args;
34#[cfg(feature = "benchmark")]
35pub mod benchmark;
36pub(crate) mod bootstrap;
37pub mod config;
38pub(crate) mod handlers;
39
40pub async fn setup_indexer(
41    database_url: Url,
42    db_args: DbArgs,
43    indexer_args: IndexerArgs,
44    client_args: ClientArgs,
45    indexer_config: IndexerConfig,
46    bootstrap_genesis: Option<BootstrapGenesis>,
47    registry: &Registry,
48) -> anyhow::Result<Indexer<Db>> {
49    let IndexerConfig {
50        ingestion,
51        committer,
52        pruner,
53        pipeline,
54    } = indexer_config;
55
56    let PipelineLayer {
57        sum_displays,
58        coin_balance_buckets,
59        cp_sequence_numbers,
60        ev_emit_mod,
61        ev_struct_inst,
62        kv_checkpoints,
63        kv_epoch_ends,
64        kv_epoch_starts,
65        kv_feature_flags,
66        kv_objects,
67        kv_packages,
68        kv_protocol_configs,
69        kv_transactions,
70        obj_info,
71        obj_versions,
72        tx_affected_addresses,
73        tx_affected_objects,
74        tx_balance_changes,
75        tx_calls,
76        tx_digests,
77        tx_kinds,
78    } = pipeline;
79
80    let ingestion = ingestion.finish(IngestionConfig::default())?;
81    let committer = committer.finish(CommitterConfig::default())?;
82    let pruner = pruner.finish(PrunerConfig::default())?;
83
84    let retry_interval = ingestion.retry_interval();
85
86    // Prepare the store for the indexer
87    let store = Db::for_write(database_url, db_args)
88        .await
89        .context("Failed to connect to database")?;
90
91    // we want to merge &MIGRATIONS with the migrations from the store
92    store
93        .run_migrations(Some(&MIGRATIONS))
94        .await
95        .context("Failed to run pending migrations")?;
96
97    registry.register(Box::new(DbConnectionStatsCollector::new(
98        Some("indexer_db"),
99        store.clone(),
100    )))?;
101
102    let metrics_prefix = None;
103    let mut indexer = Indexer::new(
104        store,
105        indexer_args,
106        client_args,
107        ingestion,
108        metrics_prefix,
109        registry,
110    )
111    .await?;
112
113    // These macros are responsible for registering pipelines with the indexer. It is responsible
114    // for:
115    //
116    //  - Checking whether the pipeline is enabled in the file-based configuration.
117    //  - Checking for unexpected parameters in the config.
118    //  - Combining shared and per-pipeline configurations.
119    //  - Registering the pipeline with the indexer.
120    //
121    // There are two kinds of pipelines, each with their own macro: `add_concurrent` and
122    // `add_sequential`. They map directly to `Indexer::concurrent_pipeline` and
123    // `Indexer::sequential_pipeline` respectively.
124
125    macro_rules! add_concurrent {
126        ($handler:expr, $config:expr) => {
127            if let Some(layer) = $config {
128                indexer
129                    .concurrent_pipeline(
130                        $handler,
131                        layer.finish(ConcurrentConfig {
132                            committer: committer.clone(),
133                            pruner: Some(pruner.clone()),
134                        })?,
135                    )
136                    .await?
137            }
138        };
139    }
140
141    macro_rules! add_sequential {
142        ($handler:expr, $config:expr) => {
143            if let Some(layer) = $config {
144                indexer
145                    .sequential_pipeline(
146                        $handler,
147                        layer.finish(SequentialConfig {
148                            committer: committer.clone(),
149                            ..Default::default()
150                        })?,
151                    )
152                    .await?
153            }
154        };
155    }
156
157    let genesis = bootstrap(&indexer, retry_interval, bootstrap_genesis).await?;
158
159    // Pipelines that rely on genesis information
160    add_concurrent!(KvFeatureFlags(genesis.clone()), kv_feature_flags);
161    add_concurrent!(KvProtocolConfigs(genesis.clone()), kv_protocol_configs);
162
163    // Summary tables (without write-ahead log)
164    add_sequential!(SumDisplays, sum_displays);
165
166    // Concurrent pipelines with retention
167    add_concurrent!(CoinBalanceBuckets, coin_balance_buckets);
168    add_concurrent!(ObjInfo, obj_info);
169
170    // Unpruned concurrent pipelines
171    add_concurrent!(CpSequenceNumbers, cp_sequence_numbers);
172    add_concurrent!(EvEmitMod, ev_emit_mod);
173    add_concurrent!(EvStructInst, ev_struct_inst);
174    add_concurrent!(KvCheckpoints, kv_checkpoints);
175    add_concurrent!(KvEpochEnds, kv_epoch_ends);
176    add_concurrent!(KvEpochStarts, kv_epoch_starts);
177    add_concurrent!(KvObjects, kv_objects);
178    add_concurrent!(KvPackages, kv_packages);
179    add_concurrent!(KvTransactions, kv_transactions);
180    add_concurrent!(ObjVersions, obj_versions);
181    add_concurrent!(TxAffectedAddresses, tx_affected_addresses);
182    add_concurrent!(TxAffectedObjects, tx_affected_objects);
183    add_concurrent!(TxBalanceChanges, tx_balance_changes);
184    add_concurrent!(TxCalls, tx_calls);
185    add_concurrent!(TxDigests, tx_digests);
186    add_concurrent!(TxKinds, tx_kinds);
187
188    Ok(indexer)
189}