sui_analytics_indexer/
indexer.rs1use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::Result;
11use object_store::ClientOptions;
12use object_store::aws::AmazonS3Builder;
13use object_store::azure::MicrosoftAzureBuilder;
14use object_store::gcp::GoogleCloudStorageBuilder;
15use object_store::local::LocalFileSystem;
16use reqwest::header::HeaderMap;
17use reqwest::header::HeaderName;
18use reqwest::header::HeaderValue;
19use tokio_util::sync::CancellationToken;
20use tracing::info;
21
22use sui_indexer_alt_framework::Indexer;
23use sui_indexer_alt_framework::IndexerArgs;
24use sui_indexer_alt_framework::ingestion::{ClientArgs, IngestionConfig};
25use sui_indexer_alt_framework::pipeline::CommitterConfig;
26use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
27use sui_indexer_alt_framework::service::Service;
28
29use crate::config::IndexerConfig;
30use crate::config::OutputStoreConfig;
31use crate::metrics::Metrics;
32use crate::package_store::PackageCache;
33use crate::progress_monitoring::spawn_snowflake_monitors;
34use crate::store::AnalyticsStore;
35
36pub async fn build_analytics_indexer(
41 config: IndexerConfig,
42 indexer_args: IndexerArgs,
43 client_args: ClientArgs,
44 metrics: Metrics,
45 registry: prometheus::Registry,
46) -> Result<Service> {
47 config.validate()?;
49
50 let object_store = create_object_store(&config.output_store)?;
51 let store = AnalyticsStore::new(object_store.clone(), config.clone(), metrics.clone());
52
53 let (adjusted_first_checkpoint, adjusted_last_checkpoint) = store
55 .find_checkpoint_range(indexer_args.first_checkpoint, indexer_args.last_checkpoint)
56 .await?;
57
58 let work_dir = if let Some(ref work_dir) = config.work_dir {
59 tempfile::Builder::new()
60 .prefix("sui-analytics-indexer-")
61 .tempdir_in(work_dir)?
62 .keep()
63 } else {
64 tempfile::Builder::new()
65 .prefix("sui-analytics-indexer-")
66 .tempdir()?
67 .keep()
68 };
69
70 let rpc_url = client_args
71 .ingestion
72 .rpc_api_url
73 .as_ref()
74 .map(|u| u.to_string())
75 .unwrap_or_default();
76 let package_cache_path = work_dir.join("package_cache");
77 let package_cache = Arc::new(PackageCache::new(&package_cache_path, &rpc_url));
78
79 let adjusted_indexer_args = IndexerArgs {
80 first_checkpoint: adjusted_first_checkpoint,
81 last_checkpoint: adjusted_last_checkpoint,
82 pipeline: indexer_args.pipeline,
83 task: indexer_args.task,
84 };
85
86 let ingestion_config = config.ingestion.clone().finish(IngestionConfig::default());
87
88 let mut indexer = Indexer::new(
89 store.clone(),
90 adjusted_indexer_args,
91 client_args,
92 ingestion_config,
93 None,
94 ®istry,
95 )
96 .await?;
97
98 let base_committer = config.committer.clone().finish(CommitterConfig::default());
99 let base_sequential = SequentialConfig {
100 committer: base_committer,
101 ..Default::default()
102 };
103
104 for pipeline_config in config.pipeline_configs() {
105 info!("Registering pipeline: {}", pipeline_config.pipeline);
106 let sequential = pipeline_config
107 .sequential
108 .clone()
109 .finish(base_sequential.clone());
110 pipeline_config
111 .pipeline
112 .register(
113 &mut indexer,
114 pipeline_config,
115 package_cache.clone(),
116 metrics.clone(),
117 sequential,
118 )
119 .await?;
120 }
121
122 let cancel = CancellationToken::new();
124 let sf_handles = spawn_snowflake_monitors(&config, metrics, cancel.clone())?;
125
126 let service = indexer.run().await?;
128 Ok(service
129 .with_shutdown_signal(async move {
130 store.shutdown().await;
131 })
132 .with_shutdown_signal(async move {
133 cancel.cancel();
134 for handle in sf_handles {
135 let _ = handle.await;
136 }
137 }))
138}
139
140fn create_object_store(config: &OutputStoreConfig) -> Result<Arc<dyn object_store::ObjectStore>> {
141 match config {
142 OutputStoreConfig::Gcs {
143 bucket,
144 service_account_path,
145 custom_headers,
146 request_timeout_secs,
147 } => {
148 let mut client_options =
149 ClientOptions::default().with_timeout(Duration::from_secs(*request_timeout_secs));
150
151 if let Some(headers_map) = custom_headers {
153 let mut headers = HeaderMap::new();
154 for (key, value) in headers_map {
155 headers.insert(
156 HeaderName::try_from(key.as_str())?,
157 HeaderValue::from_str(value)?,
158 );
159 }
160 client_options = client_options.with_default_headers(headers);
161 }
162
163 GoogleCloudStorageBuilder::new()
164 .with_client_options(client_options)
165 .with_bucket_name(bucket)
166 .with_service_account_path(service_account_path.to_string_lossy())
167 .build()
168 .map(|s| Arc::new(s) as Arc<dyn object_store::ObjectStore>)
169 .context("Failed to create GCS store")
170 }
171 OutputStoreConfig::S3 {
172 bucket,
173 region,
174 access_key_id,
175 secret_access_key,
176 endpoint,
177 request_timeout_secs,
178 } => {
179 let client_options =
180 ClientOptions::default().with_timeout(Duration::from_secs(*request_timeout_secs));
181 let mut builder = AmazonS3Builder::new()
182 .with_client_options(client_options)
183 .with_bucket_name(bucket)
184 .with_region(region);
185 if let Some(key) = access_key_id {
186 builder = builder.with_access_key_id(key);
187 }
188 if let Some(secret) = secret_access_key {
189 builder = builder.with_secret_access_key(secret);
190 }
191 if let Some(ep) = endpoint {
192 builder = builder.with_endpoint(ep);
193 }
194 builder
195 .build()
196 .map(|s| Arc::new(s) as Arc<dyn object_store::ObjectStore>)
197 .context("Failed to create S3 store")
198 }
199 OutputStoreConfig::Azure {
200 container,
201 account,
202 access_key,
203 request_timeout_secs,
204 } => {
205 let client_options =
206 ClientOptions::default().with_timeout(Duration::from_secs(*request_timeout_secs));
207 MicrosoftAzureBuilder::new()
208 .with_client_options(client_options)
209 .with_container_name(container)
210 .with_account(account)
211 .with_access_key(access_key)
212 .build()
213 .map(|s| Arc::new(s) as Arc<dyn object_store::ObjectStore>)
214 .context("Failed to create Azure store")
215 }
216 OutputStoreConfig::File { path } => LocalFileSystem::new_with_prefix(path)
217 .map(|s| Arc::new(s) as Arc<dyn object_store::ObjectStore>)
218 .context("Failed to create file store"),
219 OutputStoreConfig::Custom(store) => Ok(store.clone()),
220 }
221}