1use std::env;
5
6use anyhow::Result;
7use futures::future::try_join_all;
8use mysten_metrics::spawn_monitored_task;
9use prometheus::Registry;
10use sui_data_ingestion_core::{
11 DataIngestionMetrics, IndexerExecutor, ReaderOptions, ShimIndexerProgressStore, WorkerPool,
12};
13use tokio::sync::oneshot;
14use tokio_util::sync::CancellationToken;
15use tracing::info;
16
17use crate::build_json_rpc_server;
18use crate::config::{IngestionConfig, JsonRpcConfig, RetentionConfig, SnapshotLagConfig};
19use crate::database::ConnectionPool;
20use crate::errors::IndexerError;
21use crate::handlers::checkpoint_handler::new_handlers;
22use crate::handlers::objects_snapshot_handler::start_objects_snapshot_handler;
23use crate::handlers::pruner::Pruner;
24use crate::indexer_reader::IndexerReader;
25use crate::metrics::IndexerMetrics;
26use crate::store::{IndexerStore, PgIndexerStore};
27
28pub struct Indexer;
29
30impl Indexer {
31 pub async fn start_writer(
32 config: IngestionConfig,
33 store: PgIndexerStore,
34 metrics: IndexerMetrics,
35 snapshot_config: SnapshotLagConfig,
36 retention_config: Option<RetentionConfig>,
37 cancel: CancellationToken,
38 ) -> Result<(), IndexerError> {
39 info!(
40 "Sui Indexer Writer (version {:?}) started...",
41 env!("CARGO_PKG_VERSION")
42 );
43 info!("Sui Indexer Writer config: {config:?}",);
44
45 let extra_reader_options = ReaderOptions {
46 batch_size: config.checkpoint_download_queue_size,
47 timeout_secs: config.checkpoint_download_timeout,
48 data_limit: config.checkpoint_download_queue_size_bytes,
49 gc_checkpoint_files: config.gc_checkpoint_files,
50 ..Default::default()
51 };
52
53 let (object_snapshot_worker, object_snapshot_watermark) = start_objects_snapshot_handler(
55 store.clone(),
56 metrics.clone(),
57 snapshot_config,
58 cancel.clone(),
59 config.start_checkpoint,
60 config.end_checkpoint,
61 )
62 .await?;
63
64 if let Some(retention_config) = retention_config {
65 let pruner = Pruner::new(store.clone(), retention_config, metrics.clone())?;
66 let cancel_clone = cancel.clone();
67 spawn_monitored_task!(pruner.start(cancel_clone));
68 }
69
70 if let Some(chain_id) = IndexerStore::get_chain_identifier(&store).await? {
75 store
76 .persist_protocol_configs_and_feature_flags(chain_id)
77 .await?;
78 }
79
80 let mut exit_senders = vec![];
81 let mut executors = vec![];
82
83 let (worker, primary_watermark) = new_handlers(
84 store,
85 metrics,
86 cancel.clone(),
87 config.start_checkpoint,
88 config.end_checkpoint,
89 )
90 .await?;
91 let progress_store = ShimIndexerProgressStore::new(
94 vec![
95 ("primary".to_string(), primary_watermark),
96 ("object_snapshot".to_string(), object_snapshot_watermark),
97 ]
98 .into_iter()
99 .collect(),
100 );
101 let mut executor = IndexerExecutor::new(
102 progress_store.clone(),
103 2,
104 DataIngestionMetrics::new(&Registry::new()),
105 );
106
107 let worker_pool = WorkerPool::new(
108 worker,
109 "primary".to_string(),
110 config.checkpoint_download_queue_size,
111 );
112 executor.register(worker_pool).await?;
113 let (exit_sender, exit_receiver) = oneshot::channel();
114 executors.push((executor, exit_receiver));
115 exit_senders.push(exit_sender);
116
117 if config.sources.data_ingestion_path.is_none() {
119 let executor = IndexerExecutor::new(
120 progress_store,
121 1,
122 DataIngestionMetrics::new(&Registry::new()),
123 );
124 let (exit_sender, exit_receiver) = oneshot::channel();
125 exit_senders.push(exit_sender);
126 executors.push((executor, exit_receiver));
127 }
128
129 let worker_pool = WorkerPool::new(
130 object_snapshot_worker,
131 "object_snapshot".to_string(),
132 config.checkpoint_download_queue_size,
133 );
134 let executor = executors.last_mut().expect("executors is not empty");
135 executor.0.register(worker_pool).await?;
136
137 spawn_monitored_task!(async move {
139 cancel.cancelled().await;
140 for exit_sender in exit_senders {
141 let _ = exit_sender.send(());
142 }
143 });
144
145 info!("Starting data ingestion executor...");
146 let futures = executors.into_iter().map(|(executor, exit_receiver)| {
147 executor.run(
148 config
149 .sources
150 .data_ingestion_path
151 .clone()
152 .unwrap_or(tempfile::tempdir().unwrap().keep()),
153 config
154 .sources
155 .remote_store_url
156 .as_ref()
157 .map(|url| url.as_str().to_owned()),
158 vec![],
159 extra_reader_options.clone(),
160 exit_receiver,
161 )
162 });
163 try_join_all(futures).await?;
164 Ok(())
165 }
166
167 pub async fn start_reader(
168 config: &JsonRpcConfig,
169 registry: &Registry,
170 pool: ConnectionPool,
171 cancel: CancellationToken,
172 ) -> Result<(), IndexerError> {
173 info!(
174 "Sui Indexer Reader (version {:?}) started...",
175 env!("CARGO_PKG_VERSION")
176 );
177 let indexer_reader = IndexerReader::new(pool);
178 let handle = build_json_rpc_server(registry, indexer_reader, config, cancel)
179 .await
180 .expect("Json rpc server should not run into errors upon start.");
181 tokio::spawn(async move { handle.stopped().await })
182 .await
183 .expect("Rpc server task failed");
184
185 Ok(())
186 }
187}