1use crate::db::ConnectionPoolConfig;
5use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable};
6use clap::{Args, Parser, Subcommand};
7use serde::{Deserialize, Serialize};
8use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
9use strum::IntoEnumIterator;
10use sui_name_service::NameServiceConfig;
11use sui_types::base_types::{ObjectID, SuiAddress};
12use url::Url;
13
14const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2;
17
18#[derive(Parser, Clone, Debug)]
19#[clap(
20 name = "Sui indexer",
21 about = "An off-fullnode service serving data from Sui protocol"
22)]
23pub struct IndexerConfig {
24 #[clap(long, alias = "db-url")]
25 pub database_url: Url,
26
27 #[clap(flatten)]
28 pub connection_pool_config: ConnectionPoolConfig,
29
30 #[clap(long, default_value = "0.0.0.0:9184")]
31 pub metrics_address: SocketAddr,
32
33 #[command(subcommand)]
34 pub command: Command,
35}
36
37#[derive(Args, Debug, Clone)]
38pub struct NameServiceOptions {
39 #[arg(default_value_t = NameServiceConfig::default().package_address)]
40 #[arg(long = "name-service-package-address")]
41 pub package_address: SuiAddress,
42 #[arg(default_value_t = NameServiceConfig::default().registry_id)]
43 #[arg(long = "name-service-registry-id")]
44 pub registry_id: ObjectID,
45 #[arg(default_value_t = NameServiceConfig::default().reverse_registry_id)]
46 #[arg(long = "name-service-reverse-registry-id")]
47 pub reverse_registry_id: ObjectID,
48}
49
50impl NameServiceOptions {
51 pub fn to_config(&self) -> NameServiceConfig {
52 let Self {
53 package_address,
54 registry_id,
55 reverse_registry_id,
56 } = self.clone();
57 NameServiceConfig {
58 package_address,
59 registry_id,
60 reverse_registry_id,
61 }
62 }
63}
64
65impl Default for NameServiceOptions {
66 fn default() -> Self {
67 let NameServiceConfig {
68 package_address,
69 registry_id,
70 reverse_registry_id,
71 } = NameServiceConfig::default();
72 Self {
73 package_address,
74 registry_id,
75 reverse_registry_id,
76 }
77 }
78}
79
80#[derive(Args, Debug, Clone)]
81pub struct JsonRpcConfig {
82 #[command(flatten)]
83 pub name_service_options: NameServiceOptions,
84
85 #[clap(long, default_value = "0.0.0.0:9000")]
86 pub rpc_address: SocketAddr,
87
88 #[clap(long)]
89 pub rpc_client_url: String,
90}
91
92#[derive(Args, Debug, Default, Clone)]
93#[group(required = true, multiple = true)]
94pub struct IngestionSources {
95 #[arg(long)]
96 pub data_ingestion_path: Option<PathBuf>,
97
98 #[arg(long)]
99 pub remote_store_url: Option<Url>,
100
101 #[arg(long)]
102 pub rpc_client_url: Option<Url>,
103}
104
105#[derive(Args, Debug, Clone)]
106pub struct IngestionConfig {
107 #[clap(flatten)]
108 pub sources: IngestionSources,
109
110 #[arg(
111 long,
112 default_value_t = Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE,
113 env = "DOWNLOAD_QUEUE_SIZE",
114 )]
115 pub checkpoint_download_queue_size: usize,
116
117 #[arg(long, env = "START_CHECKPOINT")]
120 pub start_checkpoint: Option<u64>,
121
122 #[arg(long, env = "END_CHECKPOINT")]
125 pub end_checkpoint: Option<u64>,
126
127 #[arg(
128 long,
129 default_value_t = Self::DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT,
130 env = "INGESTION_READER_TIMEOUT_SECS",
131 )]
132 pub checkpoint_download_timeout: u64,
133
134 #[arg(
137 long,
138 default_value_t = Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE_BYTES,
139 env = "CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT",
140 )]
141 pub checkpoint_download_queue_size_bytes: usize,
142
143 #[arg(
146 long,
147 default_value_t = true,
148 default_missing_value = "true",
149 action = clap::ArgAction::Set,
150 num_args = 0..=1,
151 require_equals = false,
152 )]
153 pub gc_checkpoint_files: bool,
154}
155
156impl IngestionConfig {
157 const DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE: usize = 200;
158 const DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE_BYTES: usize = 20_000_000;
159 const DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT: u64 = 20;
160}
161
162impl Default for IngestionConfig {
163 fn default() -> Self {
164 Self {
165 sources: Default::default(),
166 start_checkpoint: None,
167 end_checkpoint: None,
168 checkpoint_download_queue_size: Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE,
169 checkpoint_download_timeout: Self::DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT,
170 checkpoint_download_queue_size_bytes:
171 Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE_BYTES,
172 gc_checkpoint_files: true,
173 }
174 }
175}
176
177#[derive(Args, Debug, Clone)]
178pub struct BackFillConfig {
179 #[arg(
181 long,
182 default_value_t = Self::DEFAULT_MAX_CONCURRENCY,
183 )]
184 pub max_concurrency: usize,
185 #[arg(
187 long,
188 default_value_t = Self::DEFAULT_CHUNK_SIZE,
189 )]
190 pub chunk_size: usize,
191}
192
193impl BackFillConfig {
194 const DEFAULT_MAX_CONCURRENCY: usize = 10;
195 const DEFAULT_CHUNK_SIZE: usize = 1000;
196}
197
198#[allow(clippy::large_enum_variant)]
199#[derive(Subcommand, Clone, Debug)]
200pub enum Command {
201 Indexer {
202 #[command(flatten)]
203 ingestion_config: IngestionConfig,
204 #[command(flatten)]
205 snapshot_config: SnapshotLagConfig,
206 #[command(flatten)]
207 pruning_options: PruningOptions,
208 #[command(flatten)]
209 upload_options: UploadOptions,
210 },
211 JsonRpcService(JsonRpcConfig),
212 ResetDatabase {
213 #[clap(long)]
214 force: bool,
215 #[clap(long, default_value_t = false)]
218 skip_migrations: bool,
219 },
220 RunMigrations,
222 RunBackFill {
230 start: usize,
234 end: usize,
236 #[clap(subcommand)]
237 runner_kind: BackfillTaskKind,
238 #[command(flatten)]
239 backfill_config: BackFillConfig,
240 },
241}
242
243#[derive(Args, Default, Debug, Clone)]
244pub struct PruningOptions {
245 #[arg(long)]
247 pub pruning_config_path: Option<PathBuf>,
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct RetentionConfig {
254 pub epochs_to_keep: u64,
256 #[serde(default)]
259 pub overrides: HashMap<PrunableTable, u64>,
260}
261
262impl PruningOptions {
263 pub fn load_from_file(&self) -> Option<RetentionConfig> {
265 let config_path = self.pruning_config_path.as_ref()?;
266
267 let contents = std::fs::read_to_string(config_path)
268 .expect("Failed to read default retention policy and overrides from file");
269 let retention_with_overrides = toml::de::from_str::<RetentionConfig>(&contents)
270 .expect("Failed to parse into RetentionConfig struct");
271
272 let default_retention = retention_with_overrides.epochs_to_keep;
273
274 assert!(
275 default_retention > 0,
276 "Default retention must be greater than 0"
277 );
278 assert!(
279 retention_with_overrides
280 .overrides
281 .values()
282 .all(|&policy| policy > 0),
283 "All retention overrides must be greater than 0"
284 );
285
286 Some(retention_with_overrides)
287 }
288}
289
290impl RetentionConfig {
291 pub fn new(epochs_to_keep: u64, overrides: HashMap<PrunableTable, u64>) -> Self {
295 Self {
296 epochs_to_keep,
297 overrides,
298 }
299 }
300
301 pub fn new_with_default_retention_only_for_testing(epochs_to_keep: u64) -> Self {
302 let mut overrides = HashMap::new();
303 overrides.insert(
304 PrunableTable::ObjectsHistory,
305 OBJECTS_HISTORY_EPOCHS_TO_KEEP,
306 );
307
308 Self::new(epochs_to_keep, HashMap::new())
309 }
310
311 pub fn retention_policies(self) -> HashMap<PrunableTable, u64> {
316 let RetentionConfig {
317 epochs_to_keep,
318 mut overrides,
319 } = self;
320
321 for table in PrunableTable::iter() {
322 let default_retention = match table {
323 PrunableTable::ObjectsHistory => OBJECTS_HISTORY_EPOCHS_TO_KEEP,
324 _ => epochs_to_keep,
325 };
326
327 overrides.entry(table).or_insert(default_retention);
328 }
329
330 overrides
331 }
332}
333
334#[derive(Args, Debug, Clone)]
335pub struct SnapshotLagConfig {
336 #[arg(
337 long = "objects-snapshot-min-checkpoint-lag",
338 default_value_t = Self::DEFAULT_MIN_LAG,
339 env = "OBJECTS_SNAPSHOT_MIN_CHECKPOINT_LAG",
340 )]
341 pub snapshot_min_lag: usize,
342
343 #[arg(
344 long = "objects-snapshot-sleep-duration",
345 default_value_t = Self::DEFAULT_SLEEP_DURATION_SEC,
346 )]
347 pub sleep_duration: u64,
348}
349
350impl SnapshotLagConfig {
351 const DEFAULT_MIN_LAG: usize = 300;
352 const DEFAULT_SLEEP_DURATION_SEC: u64 = 5;
353}
354
355impl Default for SnapshotLagConfig {
356 fn default() -> Self {
357 SnapshotLagConfig {
358 snapshot_min_lag: Self::DEFAULT_MIN_LAG,
359 sleep_duration: Self::DEFAULT_SLEEP_DURATION_SEC,
360 }
361 }
362}
363
364#[derive(Args, Debug, Clone, Default)]
365pub struct UploadOptions {
366 #[arg(long, env = "GCS_DISPLAY_BUCKET")]
367 pub gcs_display_bucket: Option<String>,
368 #[arg(long, env = "GCS_CRED_PATH")]
369 pub gcs_cred_path: Option<String>,
370}
371
372#[derive(Args, Debug, Clone)]
373pub struct BenchmarkConfig {
374 #[arg(
375 long,
376 default_value_t = 200,
377 help = "Number of transactions in a checkpoint."
378 )]
379 pub checkpoint_size: u64,
380 #[arg(
381 long,
382 default_value_t = 2000,
383 help = "Total number of synthetic checkpoints to generate."
384 )]
385 pub num_checkpoints: u64,
386 #[arg(
387 long,
388 default_value_t = 1,
389 help = "Customize the first checkpoint sequence number to be committed, must be non-zero."
390 )]
391 pub starting_checkpoint: u64,
392 #[arg(
393 long,
394 default_value_t = false,
395 help = "Whether to reset the database before running."
396 )]
397 pub reset_db: bool,
398 #[arg(
399 long,
400 help = "Path to workload directory. If not provided, a temporary directory will be created.\
401 If provided, synthetic workload generator will either load data from it if it exists or generate new data.\
402 This avoids repeat generation of the same data."
403 )]
404 pub workload_dir: Option<PathBuf>,
405}
406
407#[cfg(test)]
408mod test {
409 use super::*;
410 use std::io::Write;
411 use tap::Pipe;
412 use tempfile::NamedTempFile;
413
414 fn parse_args<'a, T>(args: impl IntoIterator<Item = &'a str>) -> Result<T, clap::error::Error>
415 where
416 T: clap::Args + clap::FromArgMatches,
417 {
418 clap::Command::new("test")
419 .no_binary_name(true)
420 .pipe(T::augment_args)
421 .try_get_matches_from(args)
422 .and_then(|matches| T::from_arg_matches(&matches))
423 }
424
425 #[test]
426 fn name_service() {
427 parse_args::<NameServiceOptions>(["--name-service-registry-id=0x1"]).unwrap();
428 parse_args::<NameServiceOptions>([
429 "--name-service-package-address",
430 "0x0000000000000000000000000000000000000000000000000000000000000001",
431 ])
432 .unwrap();
433 parse_args::<NameServiceOptions>(["--name-service-reverse-registry-id=0x1"]).unwrap();
434 parse_args::<NameServiceOptions>([
435 "--name-service-registry-id=0x1",
436 "--name-service-package-address",
437 "0x0000000000000000000000000000000000000000000000000000000000000002",
438 "--name-service-reverse-registry-id=0x3",
439 ])
440 .unwrap();
441 parse_args::<NameServiceOptions>([]).unwrap();
442 }
443
444 #[test]
445 fn ingestion_sources() {
446 parse_args::<IngestionSources>(["--data-ingestion-path=/tmp/foo"]).unwrap();
447 parse_args::<IngestionSources>(["--remote-store-url=http://example.com"]).unwrap();
448 parse_args::<IngestionSources>(["--rpc-client-url=http://example.com"]).unwrap();
449
450 parse_args::<IngestionSources>([
451 "--data-ingestion-path=/tmp/foo",
452 "--remote-store-url=http://example.com",
453 "--rpc-client-url=http://example.com",
454 ])
455 .unwrap();
456
457 parse_args::<IngestionSources>([]).unwrap_err();
459 }
460
461 #[test]
462 fn json_rpc_config() {
463 parse_args::<JsonRpcConfig>(["--rpc-client-url=http://example.com"]).unwrap();
464
465 parse_args::<JsonRpcConfig>([
467 "--rpc-address=127.0.0.1:8080",
468 "--name-service-registry-id=0x1",
469 "--rpc-client-url=http://example.com",
470 ])
471 .unwrap();
472
473 parse_args::<JsonRpcConfig>([]).unwrap_err();
475 }
476
477 #[test]
478 fn pruning_options_with_objects_history_override() {
479 let mut temp_file = NamedTempFile::new().unwrap();
480 let toml_content = r#"
481 epochs_to_keep = 5
482
483 [overrides]
484 objects_history = 10
485 transactions = 20
486 "#;
487 temp_file.write_all(toml_content.as_bytes()).unwrap();
488 let temp_path: PathBuf = temp_file.path().to_path_buf();
489 let pruning_options = PruningOptions {
490 pruning_config_path: Some(temp_path.clone()),
491 };
492 let retention_config = pruning_options.load_from_file().unwrap();
493
494 assert_eq!(retention_config.epochs_to_keep, 5);
496 assert_eq!(
497 retention_config
498 .overrides
499 .get(&PrunableTable::ObjectsHistory)
500 .copied(),
501 Some(10)
502 );
503 assert_eq!(
504 retention_config
505 .overrides
506 .get(&PrunableTable::Transactions)
507 .copied(),
508 Some(20)
509 );
510 assert_eq!(retention_config.overrides.len(), 2);
511
512 let retention_policies = retention_config.retention_policies();
513
514 for table in PrunableTable::iter() {
515 let Some(retention) = retention_policies.get(&table).copied() else {
516 panic!("Expected a retention policy for table {:?}", table);
517 };
518
519 match table {
520 PrunableTable::ObjectsHistory => assert_eq!(retention, 10),
521 PrunableTable::Transactions => assert_eq!(retention, 20),
522 _ => assert_eq!(retention, 5),
523 };
524 }
525 }
526
527 #[test]
528 fn pruning_options_no_objects_history_override() {
529 let mut temp_file = NamedTempFile::new().unwrap();
530 let toml_content = r#"
531 epochs_to_keep = 5
532
533 [overrides]
534 tx_affected_addresses = 10
535 transactions = 20
536 "#;
537 temp_file.write_all(toml_content.as_bytes()).unwrap();
538 let temp_path: PathBuf = temp_file.path().to_path_buf();
539 let pruning_options = PruningOptions {
540 pruning_config_path: Some(temp_path.clone()),
541 };
542 let retention_config = pruning_options.load_from_file().unwrap();
543
544 assert_eq!(retention_config.epochs_to_keep, 5);
546 assert_eq!(
547 retention_config
548 .overrides
549 .get(&PrunableTable::TxAffectedAddresses)
550 .copied(),
551 Some(10)
552 );
553 assert_eq!(
554 retention_config
555 .overrides
556 .get(&PrunableTable::Transactions)
557 .copied(),
558 Some(20)
559 );
560 assert_eq!(retention_config.overrides.len(), 2);
561
562 let retention_policies = retention_config.retention_policies();
563
564 for table in PrunableTable::iter() {
565 let Some(retention) = retention_policies.get(&table).copied() else {
566 panic!("Expected a retention policy for table {:?}", table);
567 };
568
569 match table {
570 PrunableTable::ObjectsHistory => {
571 assert_eq!(retention, OBJECTS_HISTORY_EPOCHS_TO_KEEP)
572 }
573 PrunableTable::TxAffectedAddresses => assert_eq!(retention, 10),
574 PrunableTable::Transactions => assert_eq!(retention, 20),
575 _ => assert_eq!(retention, 5),
576 };
577 }
578 }
579
580 #[test]
581 fn test_invalid_pruning_config_file() {
582 let toml_str = r#"
583 epochs_to_keep = 5
584
585 [overrides]
586 objects_history = 10
587 transactions = 20
588 invalid_table = 30
589 "#;
590
591 let result = toml::from_str::<RetentionConfig>(toml_str);
592 assert!(result.is_err(), "Expected an error, but parsing succeeded");
593
594 if let Err(e) = result {
595 assert!(
596 e.to_string().contains("unknown variant `invalid_table`"),
597 "Error message doesn't mention the invalid table"
598 );
599 }
600 }
601}