sui_indexer/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::db::ConnectionPoolConfig;
5use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable};
6use clap::{Args, Parser, Subcommand};
7use serde::{Deserialize, Serialize};
8use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
9use strum::IntoEnumIterator;
10use sui_name_service::NameServiceConfig;
11use sui_types::base_types::{ObjectID, SuiAddress};
12use url::Url;
13
14/// The primary purpose of objects_history is to serve consistency query.
15/// A short retention is sufficient.
16const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2;
17
18#[derive(Parser, Clone, Debug)]
19#[clap(
20    name = "Sui indexer",
21    about = "An off-fullnode service serving data from Sui protocol"
22)]
23pub struct IndexerConfig {
24    #[clap(long, alias = "db-url")]
25    pub database_url: Url,
26
27    #[clap(flatten)]
28    pub connection_pool_config: ConnectionPoolConfig,
29
30    #[clap(long, default_value = "0.0.0.0:9184")]
31    pub metrics_address: SocketAddr,
32
33    #[command(subcommand)]
34    pub command: Command,
35}
36
37#[derive(Args, Debug, Clone)]
38pub struct NameServiceOptions {
39    #[arg(default_value_t = NameServiceConfig::default().package_address)]
40    #[arg(long = "name-service-package-address")]
41    pub package_address: SuiAddress,
42    #[arg(default_value_t = NameServiceConfig::default().registry_id)]
43    #[arg(long = "name-service-registry-id")]
44    pub registry_id: ObjectID,
45    #[arg(default_value_t = NameServiceConfig::default().reverse_registry_id)]
46    #[arg(long = "name-service-reverse-registry-id")]
47    pub reverse_registry_id: ObjectID,
48}
49
50impl NameServiceOptions {
51    pub fn to_config(&self) -> NameServiceConfig {
52        let Self {
53            package_address,
54            registry_id,
55            reverse_registry_id,
56        } = self.clone();
57        NameServiceConfig {
58            package_address,
59            registry_id,
60            reverse_registry_id,
61        }
62    }
63}
64
65impl Default for NameServiceOptions {
66    fn default() -> Self {
67        let NameServiceConfig {
68            package_address,
69            registry_id,
70            reverse_registry_id,
71        } = NameServiceConfig::default();
72        Self {
73            package_address,
74            registry_id,
75            reverse_registry_id,
76        }
77    }
78}
79
80#[derive(Args, Debug, Clone)]
81pub struct JsonRpcConfig {
82    #[command(flatten)]
83    pub name_service_options: NameServiceOptions,
84
85    #[clap(long, default_value = "0.0.0.0:9000")]
86    pub rpc_address: SocketAddr,
87
88    #[clap(long)]
89    pub rpc_client_url: String,
90}
91
92#[derive(Args, Debug, Default, Clone)]
93#[group(required = true, multiple = true)]
94pub struct IngestionSources {
95    #[arg(long)]
96    pub data_ingestion_path: Option<PathBuf>,
97
98    #[arg(long)]
99    pub remote_store_url: Option<Url>,
100
101    #[arg(long)]
102    pub rpc_client_url: Option<Url>,
103}
104
105#[derive(Args, Debug, Clone)]
106pub struct IngestionConfig {
107    #[clap(flatten)]
108    pub sources: IngestionSources,
109
110    #[arg(
111        long,
112        default_value_t = Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE,
113        env = "DOWNLOAD_QUEUE_SIZE",
114    )]
115    pub checkpoint_download_queue_size: usize,
116
117    /// Start checkpoint to ingest from, this is optional and if not provided, the ingestion will
118    /// start from the next checkpoint after the latest committed checkpoint.
119    #[arg(long, env = "START_CHECKPOINT")]
120    pub start_checkpoint: Option<u64>,
121
122    /// End checkpoint to ingest until, this is optional and if not provided, the ingestion will
123    /// continue until u64::MAX.
124    #[arg(long, env = "END_CHECKPOINT")]
125    pub end_checkpoint: Option<u64>,
126
127    #[arg(
128        long,
129        default_value_t = Self::DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT,
130        env = "INGESTION_READER_TIMEOUT_SECS",
131    )]
132    pub checkpoint_download_timeout: u64,
133
134    /// Limit indexing parallelism on big checkpoints to avoid OOMing by limiting the total size of
135    /// the checkpoint download queue.
136    #[arg(
137        long,
138        default_value_t = Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE_BYTES,
139        env = "CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT",
140    )]
141    pub checkpoint_download_queue_size_bytes: usize,
142
143    /// Whether to delete processed checkpoint files from the local directory,
144    /// when running Fullnode-colocated indexer.
145    #[arg(
146        long,
147        default_value_t = true,
148        default_missing_value = "true",
149        action = clap::ArgAction::Set,
150        num_args = 0..=1,
151        require_equals = false,
152    )]
153    pub gc_checkpoint_files: bool,
154}
155
156impl IngestionConfig {
157    const DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE: usize = 200;
158    const DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE_BYTES: usize = 20_000_000;
159    const DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT: u64 = 20;
160}
161
162impl Default for IngestionConfig {
163    fn default() -> Self {
164        Self {
165            sources: Default::default(),
166            start_checkpoint: None,
167            end_checkpoint: None,
168            checkpoint_download_queue_size: Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE,
169            checkpoint_download_timeout: Self::DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT,
170            checkpoint_download_queue_size_bytes:
171                Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE_BYTES,
172            gc_checkpoint_files: true,
173        }
174    }
175}
176
177#[derive(Args, Debug, Clone)]
178pub struct BackFillConfig {
179    /// Maximum number of concurrent tasks to run.
180    #[arg(
181        long,
182        default_value_t = Self::DEFAULT_MAX_CONCURRENCY,
183    )]
184    pub max_concurrency: usize,
185    /// Number of checkpoints to backfill in a single SQL command.
186    #[arg(
187        long,
188        default_value_t = Self::DEFAULT_CHUNK_SIZE,
189    )]
190    pub chunk_size: usize,
191}
192
193impl BackFillConfig {
194    const DEFAULT_MAX_CONCURRENCY: usize = 10;
195    const DEFAULT_CHUNK_SIZE: usize = 1000;
196}
197
198#[allow(clippy::large_enum_variant)]
199#[derive(Subcommand, Clone, Debug)]
200pub enum Command {
201    Indexer {
202        #[command(flatten)]
203        ingestion_config: IngestionConfig,
204        #[command(flatten)]
205        snapshot_config: SnapshotLagConfig,
206        #[command(flatten)]
207        pruning_options: PruningOptions,
208        #[command(flatten)]
209        upload_options: UploadOptions,
210    },
211    JsonRpcService(JsonRpcConfig),
212    ResetDatabase {
213        #[clap(long)]
214        force: bool,
215        /// If true, only drop all tables but do not run the migrations.
216        /// That is, no tables will exist in the DB after the reset.
217        #[clap(long, default_value_t = false)]
218        skip_migrations: bool,
219    },
220    /// Run through the migration scripts.
221    RunMigrations,
222    /// Backfill DB tables for some ID range [\start, \end].
223    /// The tool will automatically slice it into smaller ranges and for each range,
224    /// it first makes a read query to the DB to get data needed for backfil if needed,
225    /// which then can be processed and written back to the DB.
226    /// To add a new backfill, add a new module and implement the `BackfillTask` trait.
227    /// full_objects_history.rs provides an example to do SQL-only backfills.
228    /// system_state_summary_json.rs provides an example to do SQL + processing backfills.
229    RunBackFill {
230        /// Start of the range to backfill, inclusive.
231        /// It can be a checkpoint number or an epoch or any other identifier that can be used to
232        /// slice the backfill range.
233        start: usize,
234        /// End of the range to backfill, inclusive.
235        end: usize,
236        #[clap(subcommand)]
237        runner_kind: BackfillTaskKind,
238        #[command(flatten)]
239        backfill_config: BackFillConfig,
240    },
241}
242
243#[derive(Args, Default, Debug, Clone)]
244pub struct PruningOptions {
245    /// Path to TOML file containing configuration for retention policies.
246    #[arg(long)]
247    pub pruning_config_path: Option<PathBuf>,
248}
249
250/// Represents the default retention policy and overrides for prunable tables. Instantiated only if
251/// `PruningOptions` is provided on indexer start.
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct RetentionConfig {
254    /// Default retention policy for all tables.
255    pub epochs_to_keep: u64,
256    /// A map of tables to their respective retention policies that will override the default.
257    /// Prunable tables not named here will use the default retention policy.
258    #[serde(default)]
259    pub overrides: HashMap<PrunableTable, u64>,
260}
261
262impl PruningOptions {
263    /// Load default retention policy and overrides from file.
264    pub fn load_from_file(&self) -> Option<RetentionConfig> {
265        let config_path = self.pruning_config_path.as_ref()?;
266
267        let contents = std::fs::read_to_string(config_path)
268            .expect("Failed to read default retention policy and overrides from file");
269        let retention_with_overrides = toml::de::from_str::<RetentionConfig>(&contents)
270            .expect("Failed to parse into RetentionConfig struct");
271
272        let default_retention = retention_with_overrides.epochs_to_keep;
273
274        assert!(
275            default_retention > 0,
276            "Default retention must be greater than 0"
277        );
278        assert!(
279            retention_with_overrides
280                .overrides
281                .values()
282                .all(|&policy| policy > 0),
283            "All retention overrides must be greater than 0"
284        );
285
286        Some(retention_with_overrides)
287    }
288}
289
290impl RetentionConfig {
291    /// Create a new `RetentionConfig` with the specified default retention and overrides. Call
292    /// `finalize()` on the instance to update the `policies` field with the default retention
293    /// policy for all tables that do not have an override specified.
294    pub fn new(epochs_to_keep: u64, overrides: HashMap<PrunableTable, u64>) -> Self {
295        Self {
296            epochs_to_keep,
297            overrides,
298        }
299    }
300
301    pub fn new_with_default_retention_only_for_testing(epochs_to_keep: u64) -> Self {
302        let mut overrides = HashMap::new();
303        overrides.insert(
304            PrunableTable::ObjectsHistory,
305            OBJECTS_HISTORY_EPOCHS_TO_KEEP,
306        );
307
308        Self::new(epochs_to_keep, HashMap::new())
309    }
310
311    /// Consumes this struct to produce a full mapping of every prunable table and its retention
312    /// policy. By default, every prunable table will have the default retention policy from
313    /// `epochs_to_keep`. Some tables like `objects_history` will observe a different default
314    /// retention policy. These default values are overridden by any entries in `overrides`.
315    pub fn retention_policies(self) -> HashMap<PrunableTable, u64> {
316        let RetentionConfig {
317            epochs_to_keep,
318            mut overrides,
319        } = self;
320
321        for table in PrunableTable::iter() {
322            let default_retention = match table {
323                PrunableTable::ObjectsHistory => OBJECTS_HISTORY_EPOCHS_TO_KEEP,
324                _ => epochs_to_keep,
325            };
326
327            overrides.entry(table).or_insert(default_retention);
328        }
329
330        overrides
331    }
332}
333
334#[derive(Args, Debug, Clone)]
335pub struct SnapshotLagConfig {
336    #[arg(
337        long = "objects-snapshot-min-checkpoint-lag",
338        default_value_t = Self::DEFAULT_MIN_LAG,
339        env = "OBJECTS_SNAPSHOT_MIN_CHECKPOINT_LAG",
340    )]
341    pub snapshot_min_lag: usize,
342
343    #[arg(
344        long = "objects-snapshot-sleep-duration",
345        default_value_t = Self::DEFAULT_SLEEP_DURATION_SEC,
346    )]
347    pub sleep_duration: u64,
348}
349
350impl SnapshotLagConfig {
351    const DEFAULT_MIN_LAG: usize = 300;
352    const DEFAULT_SLEEP_DURATION_SEC: u64 = 5;
353}
354
355impl Default for SnapshotLagConfig {
356    fn default() -> Self {
357        SnapshotLagConfig {
358            snapshot_min_lag: Self::DEFAULT_MIN_LAG,
359            sleep_duration: Self::DEFAULT_SLEEP_DURATION_SEC,
360        }
361    }
362}
363
364#[derive(Args, Debug, Clone, Default)]
365pub struct UploadOptions {
366    #[arg(long, env = "GCS_DISPLAY_BUCKET")]
367    pub gcs_display_bucket: Option<String>,
368    #[arg(long, env = "GCS_CRED_PATH")]
369    pub gcs_cred_path: Option<String>,
370}
371
372#[derive(Args, Debug, Clone)]
373pub struct BenchmarkConfig {
374    #[arg(
375        long,
376        default_value_t = 200,
377        help = "Number of transactions in a checkpoint."
378    )]
379    pub checkpoint_size: u64,
380    #[arg(
381        long,
382        default_value_t = 2000,
383        help = "Total number of synthetic checkpoints to generate."
384    )]
385    pub num_checkpoints: u64,
386    #[arg(
387        long,
388        default_value_t = 1,
389        help = "Customize the first checkpoint sequence number to be committed, must be non-zero."
390    )]
391    pub starting_checkpoint: u64,
392    #[arg(
393        long,
394        default_value_t = false,
395        help = "Whether to reset the database before running."
396    )]
397    pub reset_db: bool,
398    #[arg(
399        long,
400        help = "Path to workload directory. If not provided, a temporary directory will be created.\
401        If provided, synthetic workload generator will either load data from it if it exists or generate new data.\
402        This avoids repeat generation of the same data."
403    )]
404    pub workload_dir: Option<PathBuf>,
405}
406
407#[cfg(test)]
408mod test {
409    use super::*;
410    use std::io::Write;
411    use tap::Pipe;
412    use tempfile::NamedTempFile;
413
414    fn parse_args<'a, T>(args: impl IntoIterator<Item = &'a str>) -> Result<T, clap::error::Error>
415    where
416        T: clap::Args + clap::FromArgMatches,
417    {
418        clap::Command::new("test")
419            .no_binary_name(true)
420            .pipe(T::augment_args)
421            .try_get_matches_from(args)
422            .and_then(|matches| T::from_arg_matches(&matches))
423    }
424
425    #[test]
426    fn name_service() {
427        parse_args::<NameServiceOptions>(["--name-service-registry-id=0x1"]).unwrap();
428        parse_args::<NameServiceOptions>([
429            "--name-service-package-address",
430            "0x0000000000000000000000000000000000000000000000000000000000000001",
431        ])
432        .unwrap();
433        parse_args::<NameServiceOptions>(["--name-service-reverse-registry-id=0x1"]).unwrap();
434        parse_args::<NameServiceOptions>([
435            "--name-service-registry-id=0x1",
436            "--name-service-package-address",
437            "0x0000000000000000000000000000000000000000000000000000000000000002",
438            "--name-service-reverse-registry-id=0x3",
439        ])
440        .unwrap();
441        parse_args::<NameServiceOptions>([]).unwrap();
442    }
443
444    #[test]
445    fn ingestion_sources() {
446        parse_args::<IngestionSources>(["--data-ingestion-path=/tmp/foo"]).unwrap();
447        parse_args::<IngestionSources>(["--remote-store-url=http://example.com"]).unwrap();
448        parse_args::<IngestionSources>(["--rpc-client-url=http://example.com"]).unwrap();
449
450        parse_args::<IngestionSources>([
451            "--data-ingestion-path=/tmp/foo",
452            "--remote-store-url=http://example.com",
453            "--rpc-client-url=http://example.com",
454        ])
455        .unwrap();
456
457        // At least one must be present
458        parse_args::<IngestionSources>([]).unwrap_err();
459    }
460
461    #[test]
462    fn json_rpc_config() {
463        parse_args::<JsonRpcConfig>(["--rpc-client-url=http://example.com"]).unwrap();
464
465        // Can include name service options and bind address
466        parse_args::<JsonRpcConfig>([
467            "--rpc-address=127.0.0.1:8080",
468            "--name-service-registry-id=0x1",
469            "--rpc-client-url=http://example.com",
470        ])
471        .unwrap();
472
473        // fullnode rpc url must be present
474        parse_args::<JsonRpcConfig>([]).unwrap_err();
475    }
476
477    #[test]
478    fn pruning_options_with_objects_history_override() {
479        let mut temp_file = NamedTempFile::new().unwrap();
480        let toml_content = r#"
481        epochs_to_keep = 5
482
483        [overrides]
484        objects_history = 10
485        transactions = 20
486        "#;
487        temp_file.write_all(toml_content.as_bytes()).unwrap();
488        let temp_path: PathBuf = temp_file.path().to_path_buf();
489        let pruning_options = PruningOptions {
490            pruning_config_path: Some(temp_path.clone()),
491        };
492        let retention_config = pruning_options.load_from_file().unwrap();
493
494        // Assert the parsed values
495        assert_eq!(retention_config.epochs_to_keep, 5);
496        assert_eq!(
497            retention_config
498                .overrides
499                .get(&PrunableTable::ObjectsHistory)
500                .copied(),
501            Some(10)
502        );
503        assert_eq!(
504            retention_config
505                .overrides
506                .get(&PrunableTable::Transactions)
507                .copied(),
508            Some(20)
509        );
510        assert_eq!(retention_config.overrides.len(), 2);
511
512        let retention_policies = retention_config.retention_policies();
513
514        for table in PrunableTable::iter() {
515            let Some(retention) = retention_policies.get(&table).copied() else {
516                panic!("Expected a retention policy for table {:?}", table);
517            };
518
519            match table {
520                PrunableTable::ObjectsHistory => assert_eq!(retention, 10),
521                PrunableTable::Transactions => assert_eq!(retention, 20),
522                _ => assert_eq!(retention, 5),
523            };
524        }
525    }
526
527    #[test]
528    fn pruning_options_no_objects_history_override() {
529        let mut temp_file = NamedTempFile::new().unwrap();
530        let toml_content = r#"
531        epochs_to_keep = 5
532
533        [overrides]
534        tx_affected_addresses = 10
535        transactions = 20
536        "#;
537        temp_file.write_all(toml_content.as_bytes()).unwrap();
538        let temp_path: PathBuf = temp_file.path().to_path_buf();
539        let pruning_options = PruningOptions {
540            pruning_config_path: Some(temp_path.clone()),
541        };
542        let retention_config = pruning_options.load_from_file().unwrap();
543
544        // Assert the parsed values
545        assert_eq!(retention_config.epochs_to_keep, 5);
546        assert_eq!(
547            retention_config
548                .overrides
549                .get(&PrunableTable::TxAffectedAddresses)
550                .copied(),
551            Some(10)
552        );
553        assert_eq!(
554            retention_config
555                .overrides
556                .get(&PrunableTable::Transactions)
557                .copied(),
558            Some(20)
559        );
560        assert_eq!(retention_config.overrides.len(), 2);
561
562        let retention_policies = retention_config.retention_policies();
563
564        for table in PrunableTable::iter() {
565            let Some(retention) = retention_policies.get(&table).copied() else {
566                panic!("Expected a retention policy for table {:?}", table);
567            };
568
569            match table {
570                PrunableTable::ObjectsHistory => {
571                    assert_eq!(retention, OBJECTS_HISTORY_EPOCHS_TO_KEEP)
572                }
573                PrunableTable::TxAffectedAddresses => assert_eq!(retention, 10),
574                PrunableTable::Transactions => assert_eq!(retention, 20),
575                _ => assert_eq!(retention, 5),
576            };
577        }
578    }
579
580    #[test]
581    fn test_invalid_pruning_config_file() {
582        let toml_str = r#"
583        epochs_to_keep = 5
584
585        [overrides]
586        objects_history = 10
587        transactions = 20
588        invalid_table = 30
589        "#;
590
591        let result = toml::from_str::<RetentionConfig>(toml_str);
592        assert!(result.is_err(), "Expected an error, but parsing succeeded");
593
594        if let Err(e) = result {
595            assert!(
596                e.to_string().contains("unknown variant `invalid_table`"),
597                "Error message doesn't mention the invalid table"
598            );
599        }
600    }
601}