sui_analytics_indexer/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Configuration types for the analytics indexer.
5
6use std::collections::HashMap;
7use std::path::PathBuf;
8
9use serde::Deserialize;
10use serde::Serialize;
11use sui_indexer_alt_framework::config::ConcurrencyConfig;
12use sui_indexer_alt_framework::ingestion::{IngestConcurrencyConfig, IngestionConfig};
13use sui_indexer_alt_framework::pipeline::CommitterConfig;
14use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
15
16use crate::pipeline::Pipeline;
17
18fn default_file_format() -> FileFormat {
19    FileFormat::Parquet
20}
21
22fn default_request_timeout_secs() -> u64 {
23    30
24}
25
26fn default_max_pending_uploads() -> usize {
27    10
28}
29
30fn default_max_concurrent_serialization() -> usize {
31    3
32}
33
34fn default_watermark_update_interval_secs() -> u64 {
35    60
36}
37
38fn default_force_batch_cut_after_secs() -> u64 {
39    600 // 10 minutes
40}
41
42/// Output file format for analytics data.
43#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
44#[serde(rename_all = "lowercase")]
45pub enum FileFormat {
46    Csv,
47    Parquet,
48}
49
50/// Object store configuration for analytics output.
51#[derive(Clone, Debug, Serialize, Deserialize)]
52#[serde(rename_all = "lowercase", tag = "type")]
53pub enum OutputStoreConfig {
54    Gcs {
55        bucket: String,
56        /// Path to service account JSON file
57        service_account_path: PathBuf,
58        /// Custom HTTP headers to include in all requests (e.g., for requester-pays buckets)
59        #[serde(default)]
60        custom_headers: Option<HashMap<String, String>>,
61        #[serde(default = "default_request_timeout_secs")]
62        request_timeout_secs: u64,
63    },
64    S3 {
65        bucket: String,
66        region: String,
67        access_key_id: Option<String>,
68        secret_access_key: Option<String>,
69        endpoint: Option<String>,
70        #[serde(default = "default_request_timeout_secs")]
71        request_timeout_secs: u64,
72    },
73    Azure {
74        container: String,
75        account: String,
76        access_key: String,
77        #[serde(default = "default_request_timeout_secs")]
78        request_timeout_secs: u64,
79    },
80    File {
81        path: PathBuf,
82    },
83    /// Custom object store for testing. Allows sharing a store instance across runs.
84    #[serde(skip)]
85    Custom(std::sync::Arc<dyn object_store::ObjectStore>),
86}
87
88#[derive(Debug, Clone, Default, Serialize, Deserialize)]
89pub struct CommitterLayer {
90    pub write_concurrency: Option<usize>,
91    pub collect_interval_ms: Option<u64>,
92    pub watermark_interval_ms: Option<u64>,
93}
94
95impl CommitterLayer {
96    pub fn finish(self, base: CommitterConfig) -> CommitterConfig {
97        CommitterConfig {
98            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
99            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
100            watermark_interval_ms: self
101                .watermark_interval_ms
102                .unwrap_or(base.watermark_interval_ms),
103            watermark_interval_jitter_ms: 0,
104        }
105    }
106}
107
108#[derive(Debug, Clone, Default, Serialize, Deserialize)]
109pub struct SequentialLayer {
110    pub committer: Option<CommitterLayer>,
111    pub checkpoint_lag: Option<u64>,
112    pub fanout: Option<ConcurrencyConfig>,
113    pub min_eager_rows: Option<usize>,
114    pub max_batch_checkpoints: Option<usize>,
115    pub processor_channel_size: Option<usize>,
116}
117
118#[derive(Debug, Clone, Default, Serialize, Deserialize)]
119pub struct IngestionLayer {
120    pub checkpoint_buffer_size: Option<usize>,
121    pub ingest_concurrency: Option<IngestConcurrencyConfig>,
122    pub retry_interval_ms: Option<u64>,
123    pub streaming_backoff_initial_batch_size: Option<usize>,
124    pub streaming_backoff_max_batch_size: Option<usize>,
125    pub streaming_connection_timeout_ms: Option<u64>,
126    pub streaming_statement_timeout_ms: Option<u64>,
127}
128
129impl IngestionLayer {
130    pub fn finish(self, base: IngestionConfig) -> IngestionConfig {
131        IngestionConfig {
132            checkpoint_buffer_size: self
133                .checkpoint_buffer_size
134                .unwrap_or(base.checkpoint_buffer_size),
135            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
136            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
137            streaming_backoff_initial_batch_size: self
138                .streaming_backoff_initial_batch_size
139                .unwrap_or(base.streaming_backoff_initial_batch_size),
140            streaming_backoff_max_batch_size: self
141                .streaming_backoff_max_batch_size
142                .unwrap_or(base.streaming_backoff_max_batch_size),
143            streaming_connection_timeout_ms: self
144                .streaming_connection_timeout_ms
145                .unwrap_or(base.streaming_connection_timeout_ms),
146            streaming_statement_timeout_ms: self
147                .streaming_statement_timeout_ms
148                .unwrap_or(base.streaming_statement_timeout_ms),
149        }
150    }
151}
152
153impl SequentialLayer {
154    pub fn finish(self, base: SequentialConfig) -> SequentialConfig {
155        SequentialConfig {
156            committer: if let Some(c) = self.committer {
157                c.finish(base.committer)
158            } else {
159                base.committer
160            },
161            checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
162            fanout: self.fanout.or(base.fanout),
163            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
164            max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
165            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
166        }
167    }
168}
169
170/// Main configuration for an analytics indexer job.
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct IndexerConfig {
173    /// Output object store configuration
174    pub output_store: OutputStoreConfig,
175    /// Optional working directory for temporary files (defaults to system temp dir)
176    pub work_dir: Option<PathBuf>,
177    pub sf_account_identifier: Option<String>,
178    pub sf_warehouse: Option<String>,
179    pub sf_database: Option<String>,
180    pub sf_schema: Option<String>,
181    pub sf_username: Option<String>,
182    pub sf_role: Option<String>,
183    pub sf_password_file: Option<String>,
184
185    /// Migration mode identifier. When set, the indexer operates in migration mode:
186    /// - Overwrites existing files matching target checkpoint ranges
187    /// - Uses conditional PUT with etag to prevent concurrent modification
188    /// - Uses per-file metadata to track migration progress separately from main pipeline
189    #[serde(default)]
190    pub migration_id: Option<String>,
191
192    /// File format for output files (csv or parquet).
193    #[serde(default = "default_file_format")]
194    pub file_format: FileFormat,
195
196    #[serde(rename = "pipelines")]
197    pub pipeline_configs: Vec<PipelineConfig>,
198
199    #[serde(default)]
200    pub ingestion: IngestionLayer,
201
202    #[serde(default)]
203    pub committer: CommitterLayer,
204
205    /// Maximum serialized files waiting in upload queue per pipeline.
206    /// When the queue is full, serialization blocks until uploads complete.
207    #[serde(default = "default_max_pending_uploads")]
208    pub max_pending_uploads: usize,
209
210    /// Maximum concurrent serialization tasks per pipeline.
211    /// Limits CPU usage from parallel parquet/csv encoding.
212    #[serde(default = "default_max_concurrent_serialization")]
213    pub max_concurrent_serialization: usize,
214
215    /// Minimum interval between watermark writes to object store (seconds).
216    /// Watermarks are updated after file uploads; this rate-limits those writes.
217    /// Default: 60 seconds.
218    #[serde(default = "default_watermark_update_interval_secs")]
219    pub watermark_update_interval_secs: u64,
220}
221
222impl IndexerConfig {
223    /// Validate the indexer configuration.
224    ///
225    /// Checks for:
226    /// - Duplicate pipeline types (each pipeline can only be configured once)
227    /// - Individual pipeline config validity (e.g., batch_size required in live mode)
228    pub fn validate(&self) -> anyhow::Result<()> {
229        // Check for duplicate pipeline types
230        let mut seen = std::collections::HashSet::new();
231        for config in &self.pipeline_configs {
232            let name = config.pipeline.name();
233            if !seen.insert(name) {
234                anyhow::bail!(
235                    "Duplicate pipeline type '{}' in config. Each pipeline type can only be configured once.",
236                    name
237                );
238            }
239        }
240
241        // Validate individual pipeline configs
242        let is_migration_mode = self.migration_id.is_some();
243        for config in &self.pipeline_configs {
244            config.validate(is_migration_mode)?;
245        }
246
247        Ok(())
248    }
249
250    pub fn pipeline_configs(&self) -> &[PipelineConfig] {
251        &self.pipeline_configs
252    }
253
254    pub fn get_pipeline_config(&self, name: &str) -> Option<&PipelineConfig> {
255        self.pipeline_configs
256            .iter()
257            .find(|p| p.pipeline.name() == name)
258    }
259}
260
261/// Batch size configuration for when to write files.
262#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
263#[serde(rename_all = "snake_case")]
264pub enum BatchSizeConfig {
265    /// Write a file after accumulating this many checkpoints.
266    Checkpoints(usize),
267    /// Write a file after accumulating this many rows.
268    Rows(usize),
269}
270
271impl BatchSizeConfig {
272    /// Validate the batch size configuration.
273    pub fn validate(&self) -> anyhow::Result<()> {
274        match self {
275            BatchSizeConfig::Checkpoints(0) => {
276                anyhow::bail!("batch_size.checkpoints must be > 0")
277            }
278            BatchSizeConfig::Rows(0) => {
279                anyhow::bail!("batch_size.rows must be > 0")
280            }
281            _ => Ok(()),
282        }
283    }
284}
285
286/// Configuration for a single analytics task/pipeline.
287#[derive(Debug, Clone, Serialize, Deserialize)]
288pub struct PipelineConfig {
289    /// Type of data to write i.e. checkpoint, object, transaction, etc
290    pub pipeline: Pipeline,
291    /// File format to use (csv or parquet)
292    #[serde(default = "default_file_format")]
293    pub file_format: FileFormat,
294    pub package_id_filter: Option<String>,
295    /// Snowflake table to monitor
296    pub sf_table_id: Option<String>,
297    /// Snowflake column containing checkpoint numbers
298    pub sf_checkpoint_col_id: Option<String>,
299    /// Whether to report max checkpoint from Snowflake table
300    #[serde(default)]
301    pub report_sf_max_table_checkpoint: bool,
302    /// Batch size configuration - determines when to write files.
303    /// Required for live mode (when top-level migration_id is None).
304    /// Ignored in migration mode (file boundaries come from existing files).
305    #[serde(default)]
306    pub batch_size: Option<BatchSizeConfig>,
307    /// Override the output path prefix. Defaults to the pipeline name.
308    #[serde(default)]
309    pub output_prefix: Option<String>,
310    /// Force a batch cut after this many seconds, even if size thresholds aren't met.
311    /// Default: 600 (10 minutes).
312    #[serde(default = "default_force_batch_cut_after_secs")]
313    pub force_batch_cut_after_secs: u64,
314    #[serde(default)]
315    pub sequential: SequentialLayer,
316}
317
318impl PipelineConfig {
319    /// Validate the configuration.
320    ///
321    /// Returns an error if batch_size is required but not set, or if batch_size is invalid.
322    /// In migration mode, batch_size is not required since file boundaries
323    /// come from existing files.
324    pub fn validate(&self, is_migration_mode: bool) -> anyhow::Result<()> {
325        if !is_migration_mode {
326            match &self.batch_size {
327                None => anyhow::bail!(
328                    "batch_size is required for pipeline '{}' (not in migration mode)",
329                    self.pipeline
330                ),
331                Some(batch_size) => batch_size.validate()?,
332            }
333        }
334        Ok(())
335    }
336
337    /// Get the output path prefix for this pipeline.
338    ///
339    /// Returns the configured `output_prefix` if set, otherwise the pipeline's default path.
340    pub fn output_prefix(&self) -> &str {
341        self.output_prefix
342            .as_deref()
343            .unwrap_or_else(|| self.pipeline.default_path())
344    }
345}