sui_analytics_indexer/
indexer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Analytics indexer builder.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::Result;
11use object_store::ClientOptions;
12use object_store::aws::AmazonS3Builder;
13use object_store::azure::MicrosoftAzureBuilder;
14use object_store::gcp::GoogleCloudStorageBuilder;
15use object_store::local::LocalFileSystem;
16use reqwest::header::HeaderMap;
17use reqwest::header::HeaderName;
18use reqwest::header::HeaderValue;
19use tokio_util::sync::CancellationToken;
20use tracing::info;
21
22use sui_indexer_alt_framework::Indexer;
23use sui_indexer_alt_framework::IndexerArgs;
24use sui_indexer_alt_framework::ingestion::{ClientArgs, IngestionConfig};
25use sui_indexer_alt_framework::pipeline::CommitterConfig;
26use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
27use sui_indexer_alt_framework::service::Service;
28
29use crate::config::IndexerConfig;
30use crate::config::OutputStoreConfig;
31use crate::metrics::Metrics;
32use crate::package_store::PackageCache;
33use crate::progress_monitoring::spawn_snowflake_monitors;
34use crate::store::AnalyticsStore;
35
36/// Build and run an analytics indexer, returning a Service handle.
37///
38/// The returned Service integrates store shutdown - when the service shuts down
39/// gracefully, it will wait for all pending uploads to complete.
40pub async fn build_analytics_indexer(
41    config: IndexerConfig,
42    indexer_args: IndexerArgs,
43    client_args: ClientArgs,
44    metrics: Metrics,
45    registry: prometheus::Registry,
46) -> Result<Service> {
47    // Validate config (checks for duplicate pipelines, batch_size requirements, etc.)
48    config.validate()?;
49
50    let object_store = create_object_store(&config.output_store)?;
51    let store = AnalyticsStore::new(object_store.clone(), config.clone(), metrics.clone());
52
53    // Find checkpoint range (snaps to file boundaries in migration mode)
54    let (adjusted_first_checkpoint, adjusted_last_checkpoint) = store
55        .find_checkpoint_range(indexer_args.first_checkpoint, indexer_args.last_checkpoint)
56        .await?;
57
58    let work_dir = if let Some(ref work_dir) = config.work_dir {
59        tempfile::Builder::new()
60            .prefix("sui-analytics-indexer-")
61            .tempdir_in(work_dir)?
62            .keep()
63    } else {
64        tempfile::Builder::new()
65            .prefix("sui-analytics-indexer-")
66            .tempdir()?
67            .keep()
68    };
69
70    let rpc_url = client_args
71        .ingestion
72        .rpc_api_url
73        .as_ref()
74        .map(|u| u.to_string())
75        .unwrap_or_default();
76    let package_cache_path = work_dir.join("package_cache");
77    let package_cache = Arc::new(PackageCache::new(&package_cache_path, &rpc_url));
78
79    let adjusted_indexer_args = IndexerArgs {
80        first_checkpoint: adjusted_first_checkpoint,
81        last_checkpoint: adjusted_last_checkpoint,
82        pipeline: indexer_args.pipeline,
83        task: indexer_args.task,
84    };
85
86    let ingestion_config = config.ingestion.clone().finish(IngestionConfig::default());
87
88    let mut indexer = Indexer::new(
89        store.clone(),
90        adjusted_indexer_args,
91        client_args,
92        ingestion_config,
93        None,
94        &registry,
95    )
96    .await?;
97
98    let base_committer = config.committer.clone().finish(CommitterConfig::default());
99    let base_sequential = SequentialConfig {
100        committer: base_committer,
101        ..Default::default()
102    };
103
104    for pipeline_config in config.pipeline_configs() {
105        info!("Registering pipeline: {}", pipeline_config.pipeline);
106        let sequential = pipeline_config
107            .sequential
108            .clone()
109            .finish(base_sequential.clone());
110        pipeline_config
111            .pipeline
112            .register(
113                &mut indexer,
114                pipeline_config,
115                package_cache.clone(),
116                metrics.clone(),
117                sequential,
118            )
119            .await?;
120    }
121
122    // Spawn Snowflake monitors (if configured)
123    let cancel = CancellationToken::new();
124    let sf_handles = spawn_snowflake_monitors(&config, metrics, cancel.clone())?;
125
126    // Run the indexer and register shutdown signals
127    let service = indexer.run().await?;
128    Ok(service
129        .with_shutdown_signal(async move {
130            store.shutdown().await;
131        })
132        .with_shutdown_signal(async move {
133            cancel.cancel();
134            for handle in sf_handles {
135                let _ = handle.await;
136            }
137        }))
138}
139
140fn create_object_store(config: &OutputStoreConfig) -> Result<Arc<dyn object_store::ObjectStore>> {
141    match config {
142        OutputStoreConfig::Gcs {
143            bucket,
144            service_account_path,
145            custom_headers,
146            request_timeout_secs,
147        } => {
148            let mut client_options =
149                ClientOptions::default().with_timeout(Duration::from_secs(*request_timeout_secs));
150
151            // Apply custom headers (e.g., for requester-pays buckets)
152            if let Some(headers_map) = custom_headers {
153                let mut headers = HeaderMap::new();
154                for (key, value) in headers_map {
155                    headers.insert(
156                        HeaderName::try_from(key.as_str())?,
157                        HeaderValue::from_str(value)?,
158                    );
159                }
160                client_options = client_options.with_default_headers(headers);
161            }
162
163            GoogleCloudStorageBuilder::new()
164                .with_client_options(client_options)
165                .with_bucket_name(bucket)
166                .with_service_account_path(service_account_path.to_string_lossy())
167                .build()
168                .map(|s| Arc::new(s) as Arc<dyn object_store::ObjectStore>)
169                .context("Failed to create GCS store")
170        }
171        OutputStoreConfig::S3 {
172            bucket,
173            region,
174            access_key_id,
175            secret_access_key,
176            endpoint,
177            request_timeout_secs,
178        } => {
179            let client_options =
180                ClientOptions::default().with_timeout(Duration::from_secs(*request_timeout_secs));
181            let mut builder = AmazonS3Builder::new()
182                .with_client_options(client_options)
183                .with_bucket_name(bucket)
184                .with_region(region);
185            if let Some(key) = access_key_id {
186                builder = builder.with_access_key_id(key);
187            }
188            if let Some(secret) = secret_access_key {
189                builder = builder.with_secret_access_key(secret);
190            }
191            if let Some(ep) = endpoint {
192                builder = builder.with_endpoint(ep);
193            }
194            builder
195                .build()
196                .map(|s| Arc::new(s) as Arc<dyn object_store::ObjectStore>)
197                .context("Failed to create S3 store")
198        }
199        OutputStoreConfig::Azure {
200            container,
201            account,
202            access_key,
203            request_timeout_secs,
204        } => {
205            let client_options =
206                ClientOptions::default().with_timeout(Duration::from_secs(*request_timeout_secs));
207            MicrosoftAzureBuilder::new()
208                .with_client_options(client_options)
209                .with_container_name(container)
210                .with_account(account)
211                .with_access_key(access_key)
212                .build()
213                .map(|s| Arc::new(s) as Arc<dyn object_store::ObjectStore>)
214                .context("Failed to create Azure store")
215        }
216        OutputStoreConfig::File { path } => LocalFileSystem::new_with_prefix(path)
217            .map(|s| Arc::new(s) as Arc<dyn object_store::ObjectStore>)
218            .context("Failed to create file store"),
219        OutputStoreConfig::Custom(store) => Ok(store.clone()),
220    }
221}