use bootstrap::bootstrap;
use config::{IndexerConfig, PipelineLayer};
use handlers::{
coin_balance_buckets::CoinBalanceBuckets,
cp_sequence_numbers::CpSequenceNumbers,
ev_emit_mod::EvEmitMod,
ev_struct_inst::EvStructInst,
kv_checkpoints::KvCheckpoints,
kv_epoch_ends::KvEpochEnds,
kv_epoch_starts::KvEpochStarts,
kv_feature_flags::KvFeatureFlags,
kv_objects::KvObjects,
kv_protocol_configs::KvProtocolConfigs,
kv_transactions::KvTransactions,
obj_info::ObjInfo,
obj_info_temp::ObjInfoTemp,
obj_versions::{ObjVersions, ObjVersionsSentinelBackfill},
sum_displays::SumDisplays,
sum_packages::SumPackages,
tx_affected_addresses::TxAffectedAddresses,
tx_affected_objects::TxAffectedObjects,
tx_balance_changes::TxBalanceChanges,
tx_calls::TxCalls,
tx_digests::TxDigests,
tx_kinds::TxKinds,
};
use prometheus::Registry;
use sui_indexer_alt_framework::{
db::DbArgs,
ingestion::{ClientArgs, IngestionConfig},
pipeline::{
concurrent::{ConcurrentConfig, PrunerConfig},
sequential::SequentialConfig,
CommitterConfig,
},
Indexer, IndexerArgs,
};
use sui_indexer_alt_schema::MIGRATIONS;
use tokio_util::sync::CancellationToken;
use url::Url;
pub mod args;
#[cfg(feature = "benchmark")]
pub mod benchmark;
pub(crate) mod bootstrap;
pub mod config;
pub(crate) mod consistent_pruning;
pub(crate) mod handlers;
pub async fn setup_indexer(
database_url: Url,
db_args: DbArgs,
indexer_args: IndexerArgs,
client_args: ClientArgs,
indexer_config: IndexerConfig,
with_genesis: bool,
registry: &Registry,
cancel: CancellationToken,
) -> anyhow::Result<Indexer> {
let IndexerConfig {
ingestion,
consistency,
committer,
pruner,
pipeline,
extra: _,
} = indexer_config.finish();
let PipelineLayer {
sum_displays,
sum_packages,
coin_balance_buckets,
cp_sequence_numbers,
ev_emit_mod,
ev_struct_inst,
kv_checkpoints,
kv_epoch_ends,
kv_epoch_starts,
kv_feature_flags,
kv_objects,
kv_protocol_configs,
kv_transactions,
obj_info,
obj_info_temp,
obj_versions,
obj_versions_sentinel_backfill,
tx_affected_addresses,
tx_affected_objects,
tx_balance_changes,
tx_calls,
tx_digests,
tx_kinds,
extra: _,
} = pipeline.finish();
let ingestion = ingestion.finish(IngestionConfig::default());
let consistency = consistency.finish(PrunerConfig::default());
let committer = committer.finish(CommitterConfig::default());
let pruner = pruner.finish(PrunerConfig::default());
let retry_interval = ingestion.retry_interval();
let mut indexer = Indexer::new(
database_url,
db_args,
indexer_args,
client_args,
ingestion,
Some(&MIGRATIONS),
registry,
cancel.clone(),
)
.await?;
macro_rules! add_consistent {
($handler:expr, $config:expr) => {
if let Some(layer) = $config {
indexer
.concurrent_pipeline(
$handler,
ConcurrentConfig {
committer: layer.finish(committer.clone()),
pruner: Some(consistency.clone()),
},
)
.await?
}
};
}
macro_rules! add_concurrent {
($handler:expr, $config:expr) => {
if let Some(layer) = $config {
indexer
.concurrent_pipeline(
$handler,
layer.finish(ConcurrentConfig {
committer: committer.clone(),
pruner: Some(pruner.clone()),
}),
)
.await?
}
};
}
macro_rules! add_sequential {
($handler:expr, $config:expr) => {
if let Some(layer) = $config {
indexer
.sequential_pipeline(
$handler,
layer.finish(SequentialConfig {
committer: committer.clone(),
..Default::default()
}),
)
.await?
}
};
}
if with_genesis {
let genesis = bootstrap(&indexer, retry_interval, cancel.clone()).await?;
add_concurrent!(KvFeatureFlags(genesis.clone()), kv_feature_flags);
add_concurrent!(KvProtocolConfigs(genesis.clone()), kv_protocol_configs);
}
add_consistent!(CoinBalanceBuckets::default(), coin_balance_buckets);
add_consistent!(ObjInfo::default(), obj_info);
add_consistent!(ObjInfoTemp::default(), obj_info_temp);
add_sequential!(SumDisplays, sum_displays);
add_sequential!(SumPackages, sum_packages);
add_concurrent!(CpSequenceNumbers, cp_sequence_numbers);
add_concurrent!(EvEmitMod, ev_emit_mod);
add_concurrent!(EvStructInst, ev_struct_inst);
add_concurrent!(KvCheckpoints, kv_checkpoints);
add_concurrent!(KvEpochEnds, kv_epoch_ends);
add_concurrent!(KvEpochStarts, kv_epoch_starts);
add_concurrent!(KvObjects, kv_objects);
add_concurrent!(KvTransactions, kv_transactions);
add_concurrent!(ObjVersions, obj_versions);
add_concurrent!(ObjVersionsSentinelBackfill, obj_versions_sentinel_backfill);
add_concurrent!(TxAffectedAddresses, tx_affected_addresses);
add_concurrent!(TxAffectedObjects, tx_affected_objects);
add_concurrent!(TxBalanceChanges, tx_balance_changes);
add_concurrent!(TxCalls, tx_calls);
add_concurrent!(TxDigests, tx_digests);
add_concurrent!(TxKinds, tx_kinds);
Ok(indexer)
}