1use std::collections::HashMap;
7use std::path::PathBuf;
8
9use serde::Deserialize;
10use serde::Serialize;
11use sui_indexer_alt_framework::config::ConcurrencyConfig;
12use sui_indexer_alt_framework::ingestion::{IngestConcurrencyConfig, IngestionConfig};
13use sui_indexer_alt_framework::pipeline::CommitterConfig;
14use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
15
16use crate::pipeline::Pipeline;
17
18fn default_file_format() -> FileFormat {
19 FileFormat::Parquet
20}
21
22fn default_request_timeout_secs() -> u64 {
23 30
24}
25
26fn default_max_pending_uploads() -> usize {
27 10
28}
29
30fn default_max_concurrent_serialization() -> usize {
31 3
32}
33
34fn default_watermark_update_interval_secs() -> u64 {
35 60
36}
37
38fn default_force_batch_cut_after_secs() -> u64 {
39 600 }
41
42#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
44#[serde(rename_all = "lowercase")]
45pub enum FileFormat {
46 Csv,
47 Parquet,
48}
49
50#[derive(Clone, Debug, Serialize, Deserialize)]
52#[serde(rename_all = "lowercase", tag = "type")]
53pub enum OutputStoreConfig {
54 Gcs {
55 bucket: String,
56 service_account_path: PathBuf,
58 #[serde(default)]
60 custom_headers: Option<HashMap<String, String>>,
61 #[serde(default = "default_request_timeout_secs")]
62 request_timeout_secs: u64,
63 },
64 S3 {
65 bucket: String,
66 region: String,
67 access_key_id: Option<String>,
68 secret_access_key: Option<String>,
69 endpoint: Option<String>,
70 #[serde(default = "default_request_timeout_secs")]
71 request_timeout_secs: u64,
72 },
73 Azure {
74 container: String,
75 account: String,
76 access_key: String,
77 #[serde(default = "default_request_timeout_secs")]
78 request_timeout_secs: u64,
79 },
80 File {
81 path: PathBuf,
82 },
83 #[serde(skip)]
85 Custom(std::sync::Arc<dyn object_store::ObjectStore>),
86}
87
88#[derive(Debug, Clone, Default, Serialize, Deserialize)]
89pub struct CommitterLayer {
90 pub write_concurrency: Option<usize>,
91 pub collect_interval_ms: Option<u64>,
92 pub watermark_interval_ms: Option<u64>,
93}
94
95impl CommitterLayer {
96 pub fn finish(self, base: CommitterConfig) -> CommitterConfig {
97 CommitterConfig {
98 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
99 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
100 watermark_interval_ms: self
101 .watermark_interval_ms
102 .unwrap_or(base.watermark_interval_ms),
103 watermark_interval_jitter_ms: 0,
104 }
105 }
106}
107
108#[derive(Debug, Clone, Default, Serialize, Deserialize)]
109pub struct SequentialLayer {
110 pub committer: Option<CommitterLayer>,
111 pub checkpoint_lag: Option<u64>,
112 pub fanout: Option<ConcurrencyConfig>,
113 pub min_eager_rows: Option<usize>,
114 pub max_batch_checkpoints: Option<usize>,
115 pub processor_channel_size: Option<usize>,
116}
117
118#[derive(Debug, Clone, Default, Serialize, Deserialize)]
119pub struct IngestionLayer {
120 pub checkpoint_buffer_size: Option<usize>,
121 pub ingest_concurrency: Option<IngestConcurrencyConfig>,
122 pub retry_interval_ms: Option<u64>,
123 pub streaming_backoff_initial_batch_size: Option<usize>,
124 pub streaming_backoff_max_batch_size: Option<usize>,
125 pub streaming_connection_timeout_ms: Option<u64>,
126 pub streaming_statement_timeout_ms: Option<u64>,
127}
128
129impl IngestionLayer {
130 pub fn finish(self, base: IngestionConfig) -> IngestionConfig {
131 IngestionConfig {
132 checkpoint_buffer_size: self
133 .checkpoint_buffer_size
134 .unwrap_or(base.checkpoint_buffer_size),
135 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
136 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
137 streaming_backoff_initial_batch_size: self
138 .streaming_backoff_initial_batch_size
139 .unwrap_or(base.streaming_backoff_initial_batch_size),
140 streaming_backoff_max_batch_size: self
141 .streaming_backoff_max_batch_size
142 .unwrap_or(base.streaming_backoff_max_batch_size),
143 streaming_connection_timeout_ms: self
144 .streaming_connection_timeout_ms
145 .unwrap_or(base.streaming_connection_timeout_ms),
146 streaming_statement_timeout_ms: self
147 .streaming_statement_timeout_ms
148 .unwrap_or(base.streaming_statement_timeout_ms),
149 }
150 }
151}
152
153impl SequentialLayer {
154 pub fn finish(self, base: SequentialConfig) -> SequentialConfig {
155 SequentialConfig {
156 committer: if let Some(c) = self.committer {
157 c.finish(base.committer)
158 } else {
159 base.committer
160 },
161 checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
162 fanout: self.fanout.or(base.fanout),
163 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
164 max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
165 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
166 }
167 }
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct IndexerConfig {
173 pub output_store: OutputStoreConfig,
175 pub work_dir: Option<PathBuf>,
177 pub sf_account_identifier: Option<String>,
178 pub sf_warehouse: Option<String>,
179 pub sf_database: Option<String>,
180 pub sf_schema: Option<String>,
181 pub sf_username: Option<String>,
182 pub sf_role: Option<String>,
183 pub sf_password_file: Option<String>,
184
185 #[serde(default)]
190 pub migration_id: Option<String>,
191
192 #[serde(default = "default_file_format")]
194 pub file_format: FileFormat,
195
196 #[serde(rename = "pipelines")]
197 pub pipeline_configs: Vec<PipelineConfig>,
198
199 #[serde(default)]
200 pub ingestion: IngestionLayer,
201
202 #[serde(default)]
203 pub committer: CommitterLayer,
204
205 #[serde(default = "default_max_pending_uploads")]
208 pub max_pending_uploads: usize,
209
210 #[serde(default = "default_max_concurrent_serialization")]
213 pub max_concurrent_serialization: usize,
214
215 #[serde(default = "default_watermark_update_interval_secs")]
219 pub watermark_update_interval_secs: u64,
220}
221
222impl IndexerConfig {
223 pub fn validate(&self) -> anyhow::Result<()> {
229 let mut seen = std::collections::HashSet::new();
231 for config in &self.pipeline_configs {
232 let name = config.pipeline.name();
233 if !seen.insert(name) {
234 anyhow::bail!(
235 "Duplicate pipeline type '{}' in config. Each pipeline type can only be configured once.",
236 name
237 );
238 }
239 }
240
241 let is_migration_mode = self.migration_id.is_some();
243 for config in &self.pipeline_configs {
244 config.validate(is_migration_mode)?;
245 }
246
247 Ok(())
248 }
249
250 pub fn pipeline_configs(&self) -> &[PipelineConfig] {
251 &self.pipeline_configs
252 }
253
254 pub fn get_pipeline_config(&self, name: &str) -> Option<&PipelineConfig> {
255 self.pipeline_configs
256 .iter()
257 .find(|p| p.pipeline.name() == name)
258 }
259}
260
261#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
263#[serde(rename_all = "snake_case")]
264pub enum BatchSizeConfig {
265 Checkpoints(usize),
267 Rows(usize),
269}
270
271impl BatchSizeConfig {
272 pub fn validate(&self) -> anyhow::Result<()> {
274 match self {
275 BatchSizeConfig::Checkpoints(0) => {
276 anyhow::bail!("batch_size.checkpoints must be > 0")
277 }
278 BatchSizeConfig::Rows(0) => {
279 anyhow::bail!("batch_size.rows must be > 0")
280 }
281 _ => Ok(()),
282 }
283 }
284}
285
286#[derive(Debug, Clone, Serialize, Deserialize)]
288pub struct PipelineConfig {
289 pub pipeline: Pipeline,
291 #[serde(default = "default_file_format")]
293 pub file_format: FileFormat,
294 pub package_id_filter: Option<String>,
295 pub sf_table_id: Option<String>,
297 pub sf_checkpoint_col_id: Option<String>,
299 #[serde(default)]
301 pub report_sf_max_table_checkpoint: bool,
302 #[serde(default)]
306 pub batch_size: Option<BatchSizeConfig>,
307 #[serde(default)]
309 pub output_prefix: Option<String>,
310 #[serde(default = "default_force_batch_cut_after_secs")]
313 pub force_batch_cut_after_secs: u64,
314 #[serde(default)]
315 pub sequential: SequentialLayer,
316}
317
318impl PipelineConfig {
319 pub fn validate(&self, is_migration_mode: bool) -> anyhow::Result<()> {
325 if !is_migration_mode {
326 match &self.batch_size {
327 None => anyhow::bail!(
328 "batch_size is required for pipeline '{}' (not in migration mode)",
329 self.pipeline
330 ),
331 Some(batch_size) => batch_size.validate()?,
332 }
333 }
334 Ok(())
335 }
336
337 pub fn output_prefix(&self) -> &str {
341 self.output_prefix
342 .as_deref()
343 .unwrap_or_else(|| self.pipeline.default_path())
344 }
345}