1pub use crate::bootstrap::BootstrapGenesis;
5use anyhow::Context;
6use bootstrap::bootstrap;
7use config::{IndexerConfig, PipelineLayer};
8use handlers::{
9 coin_balance_buckets::CoinBalanceBuckets, cp_sequence_numbers::CpSequenceNumbers,
10 ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
11 kv_epoch_ends::KvEpochEnds, kv_epoch_starts::KvEpochStarts, kv_feature_flags::KvFeatureFlags,
12 kv_objects::KvObjects, kv_packages::KvPackages, kv_protocol_configs::KvProtocolConfigs,
13 kv_transactions::KvTransactions, obj_info::ObjInfo, obj_versions::ObjVersions,
14 sum_displays::SumDisplays, tx_affected_addresses::TxAffectedAddresses,
15 tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges,
16 tx_calls::TxCalls, tx_digests::TxDigests, tx_kinds::TxKinds,
17};
18use prometheus::Registry;
19use sui_indexer_alt_framework::{
20 Indexer, IndexerArgs,
21 ingestion::{ClientArgs, IngestionConfig},
22 pipeline::{
23 CommitterConfig,
24 concurrent::{ConcurrentConfig, PrunerConfig},
25 sequential::SequentialConfig,
26 },
27 postgres::{Db, DbArgs},
28};
29use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
30use sui_indexer_alt_schema::MIGRATIONS;
31use url::Url;
32
33pub mod args;
34#[cfg(feature = "benchmark")]
35pub mod benchmark;
36pub(crate) mod bootstrap;
37pub mod config;
38pub(crate) mod handlers;
39
40pub async fn setup_indexer(
41 database_url: Url,
42 db_args: DbArgs,
43 indexer_args: IndexerArgs,
44 client_args: ClientArgs,
45 indexer_config: IndexerConfig,
46 bootstrap_genesis: Option<BootstrapGenesis>,
47 registry: &Registry,
48) -> anyhow::Result<Indexer<Db>> {
49 let IndexerConfig {
50 ingestion,
51 committer,
52 pruner,
53 pipeline,
54 } = indexer_config;
55
56 let PipelineLayer {
57 sum_displays,
58 coin_balance_buckets,
59 cp_sequence_numbers,
60 ev_emit_mod,
61 ev_struct_inst,
62 kv_checkpoints,
63 kv_epoch_ends,
64 kv_epoch_starts,
65 kv_feature_flags,
66 kv_objects,
67 kv_packages,
68 kv_protocol_configs,
69 kv_transactions,
70 obj_info,
71 obj_versions,
72 tx_affected_addresses,
73 tx_affected_objects,
74 tx_balance_changes,
75 tx_calls,
76 tx_digests,
77 tx_kinds,
78 } = pipeline;
79
80 let ingestion = ingestion.finish(IngestionConfig::default())?;
81 let committer = committer.finish(CommitterConfig::default())?;
82 let pruner = pruner.finish(PrunerConfig::default())?;
83
84 let retry_interval = ingestion.retry_interval();
85
86 let store = Db::for_write(database_url, db_args)
88 .await
89 .context("Failed to connect to database")?;
90
91 store
93 .run_migrations(Some(&MIGRATIONS))
94 .await
95 .context("Failed to run pending migrations")?;
96
97 registry.register(Box::new(DbConnectionStatsCollector::new(
98 Some("indexer_db"),
99 store.clone(),
100 )))?;
101
102 let metrics_prefix = None;
103 let mut indexer = Indexer::new(
104 store,
105 indexer_args,
106 client_args,
107 ingestion,
108 metrics_prefix,
109 registry,
110 )
111 .await?;
112
113 macro_rules! add_concurrent {
126 ($handler:expr, $config:expr) => {
127 if let Some(layer) = $config {
128 indexer
129 .concurrent_pipeline(
130 $handler,
131 layer.finish(ConcurrentConfig {
132 committer: committer.clone(),
133 pruner: Some(pruner.clone()),
134 })?,
135 )
136 .await?
137 }
138 };
139 }
140
141 macro_rules! add_sequential {
142 ($handler:expr, $config:expr) => {
143 if let Some(layer) = $config {
144 indexer
145 .sequential_pipeline(
146 $handler,
147 layer.finish(SequentialConfig {
148 committer: committer.clone(),
149 ..Default::default()
150 })?,
151 )
152 .await?
153 }
154 };
155 }
156
157 let genesis = bootstrap(&indexer, retry_interval, bootstrap_genesis).await?;
158
159 add_concurrent!(KvFeatureFlags(genesis.clone()), kv_feature_flags);
161 add_concurrent!(KvProtocolConfigs(genesis.clone()), kv_protocol_configs);
162
163 add_sequential!(SumDisplays, sum_displays);
165
166 add_concurrent!(CoinBalanceBuckets, coin_balance_buckets);
168 add_concurrent!(ObjInfo, obj_info);
169
170 add_concurrent!(CpSequenceNumbers, cp_sequence_numbers);
172 add_concurrent!(EvEmitMod, ev_emit_mod);
173 add_concurrent!(EvStructInst, ev_struct_inst);
174 add_concurrent!(KvCheckpoints, kv_checkpoints);
175 add_concurrent!(KvEpochEnds, kv_epoch_ends);
176 add_concurrent!(KvEpochStarts, kv_epoch_starts);
177 add_concurrent!(KvObjects, kv_objects);
178 add_concurrent!(KvPackages, kv_packages);
179 add_concurrent!(KvTransactions, kv_transactions);
180 add_concurrent!(ObjVersions, obj_versions);
181 add_concurrent!(TxAffectedAddresses, tx_affected_addresses);
182 add_concurrent!(TxAffectedObjects, tx_affected_objects);
183 add_concurrent!(TxBalanceChanges, tx_balance_changes);
184 add_concurrent!(TxCalls, tx_calls);
185 add_concurrent!(TxDigests, tx_digests);
186 add_concurrent!(TxKinds, tx_kinds);
187
188 Ok(indexer)
189}