sui_indexer/
indexer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::env;
5
6use anyhow::Result;
7use futures::future::try_join_all;
8use mysten_metrics::spawn_monitored_task;
9use prometheus::Registry;
10use sui_data_ingestion_core::{
11    DataIngestionMetrics, IndexerExecutor, ReaderOptions, ShimIndexerProgressStore, WorkerPool,
12};
13use tokio::sync::oneshot;
14use tokio_util::sync::CancellationToken;
15use tracing::info;
16
17use crate::build_json_rpc_server;
18use crate::config::{IngestionConfig, JsonRpcConfig, RetentionConfig, SnapshotLagConfig};
19use crate::database::ConnectionPool;
20use crate::errors::IndexerError;
21use crate::handlers::checkpoint_handler::new_handlers;
22use crate::handlers::objects_snapshot_handler::start_objects_snapshot_handler;
23use crate::handlers::pruner::Pruner;
24use crate::indexer_reader::IndexerReader;
25use crate::metrics::IndexerMetrics;
26use crate::store::{IndexerStore, PgIndexerStore};
27
28pub struct Indexer;
29
30impl Indexer {
31    pub async fn start_writer(
32        config: IngestionConfig,
33        store: PgIndexerStore,
34        metrics: IndexerMetrics,
35        snapshot_config: SnapshotLagConfig,
36        retention_config: Option<RetentionConfig>,
37        cancel: CancellationToken,
38    ) -> Result<(), IndexerError> {
39        info!(
40            "Sui Indexer Writer (version {:?}) started...",
41            env!("CARGO_PKG_VERSION")
42        );
43        info!("Sui Indexer Writer config: {config:?}",);
44
45        let extra_reader_options = ReaderOptions {
46            batch_size: config.checkpoint_download_queue_size,
47            timeout_secs: config.checkpoint_download_timeout,
48            data_limit: config.checkpoint_download_queue_size_bytes,
49            gc_checkpoint_files: config.gc_checkpoint_files,
50            ..Default::default()
51        };
52
53        // Start objects snapshot processor, which is a separate pipeline with its ingestion pipeline.
54        let (object_snapshot_worker, object_snapshot_watermark) = start_objects_snapshot_handler(
55            store.clone(),
56            metrics.clone(),
57            snapshot_config,
58            cancel.clone(),
59            config.start_checkpoint,
60            config.end_checkpoint,
61        )
62        .await?;
63
64        if let Some(retention_config) = retention_config {
65            let pruner = Pruner::new(store.clone(), retention_config, metrics.clone())?;
66            let cancel_clone = cancel.clone();
67            spawn_monitored_task!(pruner.start(cancel_clone));
68        }
69
70        // If we already have chain identifier indexed (i.e. the first checkpoint has been indexed),
71        // then we persist protocol configs for protocol versions not yet in the db.
72        // Otherwise, we would do the persisting in `commit_checkpoint` while the first cp is
73        // being indexed.
74        if let Some(chain_id) = IndexerStore::get_chain_identifier(&store).await? {
75            store
76                .persist_protocol_configs_and_feature_flags(chain_id)
77                .await?;
78        }
79
80        let mut exit_senders = vec![];
81        let mut executors = vec![];
82
83        let (worker, primary_watermark) = new_handlers(
84            store,
85            metrics,
86            cancel.clone(),
87            config.start_checkpoint,
88            config.end_checkpoint,
89        )
90        .await?;
91        // Ingestion task watermarks are snapshotted once on indexer startup based on the
92        // corresponding watermark table before being handed off to the ingestion task.
93        let progress_store = ShimIndexerProgressStore::new(
94            vec![
95                ("primary".to_string(), primary_watermark),
96                ("object_snapshot".to_string(), object_snapshot_watermark),
97            ]
98            .into_iter()
99            .collect(),
100        );
101        let mut executor = IndexerExecutor::new(
102            progress_store.clone(),
103            2,
104            DataIngestionMetrics::new(&Registry::new()),
105        );
106
107        let worker_pool = WorkerPool::new(
108            worker,
109            "primary".to_string(),
110            config.checkpoint_download_queue_size,
111        );
112        executor.register(worker_pool).await?;
113        let (exit_sender, exit_receiver) = oneshot::channel();
114        executors.push((executor, exit_receiver));
115        exit_senders.push(exit_sender);
116
117        // in a non-colocated setup, start a separate indexer for processing object snapshots
118        if config.sources.data_ingestion_path.is_none() {
119            let executor = IndexerExecutor::new(
120                progress_store,
121                1,
122                DataIngestionMetrics::new(&Registry::new()),
123            );
124            let (exit_sender, exit_receiver) = oneshot::channel();
125            exit_senders.push(exit_sender);
126            executors.push((executor, exit_receiver));
127        }
128
129        let worker_pool = WorkerPool::new(
130            object_snapshot_worker,
131            "object_snapshot".to_string(),
132            config.checkpoint_download_queue_size,
133        );
134        let executor = executors.last_mut().expect("executors is not empty");
135        executor.0.register(worker_pool).await?;
136
137        // Spawn a task that links the cancellation token to the exit sender
138        spawn_monitored_task!(async move {
139            cancel.cancelled().await;
140            for exit_sender in exit_senders {
141                let _ = exit_sender.send(());
142            }
143        });
144
145        info!("Starting data ingestion executor...");
146        let futures = executors.into_iter().map(|(executor, exit_receiver)| {
147            executor.run(
148                config
149                    .sources
150                    .data_ingestion_path
151                    .clone()
152                    .unwrap_or(tempfile::tempdir().unwrap().keep()),
153                config
154                    .sources
155                    .remote_store_url
156                    .as_ref()
157                    .map(|url| url.as_str().to_owned()),
158                vec![],
159                extra_reader_options.clone(),
160                exit_receiver,
161            )
162        });
163        try_join_all(futures).await?;
164        Ok(())
165    }
166
167    pub async fn start_reader(
168        config: &JsonRpcConfig,
169        registry: &Registry,
170        pool: ConnectionPool,
171        cancel: CancellationToken,
172    ) -> Result<(), IndexerError> {
173        info!(
174            "Sui Indexer Reader (version {:?}) started...",
175            env!("CARGO_PKG_VERSION")
176        );
177        let indexer_reader = IndexerReader::new(pool);
178        let handle = build_json_rpc_server(registry, indexer_reader, config, cancel)
179            .await
180            .expect("Json rpc server should not run into errors upon start.");
181        tokio::spawn(async move { handle.stopped().await })
182            .await
183            .expect("Rpc server task failed");
184
185        Ok(())
186    }
187}