sui_indexer_alt_framework/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::ops::Deref;
5use std::ops::DerefMut;
6use std::sync::Arc;
7
8use anyhow::Context;
9use diesel_migrations::EmbeddedMigrations;
10use prometheus::Registry;
11use sui_futures::service::Service;
12use sui_indexer_alt_metrics::MetricsArgs;
13use sui_indexer_alt_metrics::MetricsService;
14use url::Url;
15
16use crate::Indexer;
17use crate::IndexerArgs;
18use crate::ingestion::ClientArgs;
19use crate::ingestion::IngestionConfig;
20use crate::metrics::IndexerMetrics;
21use crate::metrics::IngestionMetrics;
22use crate::postgres::Db;
23use crate::postgres::DbArgs;
24
25/// Bundle of arguments for setting up an indexer cluster (an Indexer and its associated Metrics
26/// service). This struct is offered as a convenience for the common case of parsing command-line
27/// arguments for a binary running a standalone indexer and its metrics service.
28#[derive(clap::Parser, Debug, Default)]
29pub struct Args {
30    /// What to index and in what time range.
31    #[clap(flatten)]
32    pub indexer_args: IndexerArgs,
33
34    /// Where to get checkpoint data from.
35    #[clap(flatten)]
36    pub client_args: ClientArgs,
37
38    /// How to expose metrics.
39    #[clap(flatten)]
40    pub metrics_args: MetricsArgs,
41}
42
43/// An opinionated [IndexerCluster] that spins up an [Indexer] implementation using Postgres as its
44/// store, along with a [MetricsService] and a tracing subscriber (outputting to stderr) to provide
45/// observability. It is a useful starting point for an indexer binary.
46pub struct IndexerCluster {
47    indexer: Indexer<Db>,
48    metrics: MetricsService,
49}
50
51/// Builder for creating an IndexerCluster with a fluent API
52#[derive(Default)]
53pub struct IndexerClusterBuilder {
54    database_url: Option<Url>,
55    db_args: DbArgs,
56    args: Args,
57    ingestion_config: IngestionConfig,
58    migrations: Option<&'static EmbeddedMigrations>,
59    metrics_prefix: Option<String>,
60}
61
62impl IndexerClusterBuilder {
63    /// Create a new builder instance
64    pub fn new() -> Self {
65        Self::default()
66    }
67
68    /// Set the PostgreSQL database connection URL (required).
69    ///
70    /// This should be a valid PostgreSQL connection urls, e.g.:
71    /// - `postgres://user:password@host:5432/mydb`
72    pub fn with_database_url(mut self, url: Url) -> Self {
73        self.database_url = Some(url);
74        self
75    }
76
77    /// Configure database connection parameters such as pool size, connection timeout, etc.
78    ///
79    /// Defaults to [`DbArgs::default()`] if not specified, which provides reasonable defaults
80    /// for most use cases.
81    pub fn with_db_args(mut self, args: DbArgs) -> Self {
82        self.db_args = args;
83        self
84    }
85
86    /// Set the main indexer cluster's configuration arguments (required).
87    ///
88    /// This bundles all configuration needed for the indexer:
89    /// - `IndexerArgs`: Controls what to index (checkpoint range, which pipelines to run, watermark behavior)
90    /// - `ClientArgs`: Specifies where to fetch checkpoint data from (remote store, local path, or RPC)
91    /// - `MetricsArgs`: Configures how to expose Prometheus metrics (address to serve on)
92    ///
93    /// This overwrites any previously set args.
94    pub fn with_args(mut self, args: Args) -> Self {
95        self.args = args;
96        self
97    }
98
99    /// Set indexer arguments (what to index and in what time range).
100    /// This overwrites any previously set indexer args.
101    pub fn with_indexer_args(mut self, args: IndexerArgs) -> Self {
102        self.args.indexer_args = args;
103        self
104    }
105
106    /// Set client arguments (where to get checkpoint data from).
107    /// This overwrites any previously set client args.
108    pub fn with_client_args(mut self, args: ClientArgs) -> Self {
109        self.args.client_args = args;
110        self
111    }
112
113    /// Set metrics arguments (how to expose metrics).
114    /// This overwrites any previously set metrics args.
115    pub fn with_metrics_args(mut self, args: MetricsArgs) -> Self {
116        self.args.metrics_args = args;
117        self
118    }
119
120    /// Set the ingestion configuration, which controls how the ingestion service is
121    /// set-up (its concurrency, polling, intervals, etc).
122    pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
123        self.ingestion_config = config;
124        self
125    }
126
127    /// Set database migrations to run.
128    ///
129    /// See the [Diesel migration guide](https://diesel.rs/guides/migration_guide.html) for more information.
130    pub fn with_migrations(mut self, migrations: &'static EmbeddedMigrations) -> Self {
131        self.migrations = Some(migrations);
132        self
133    }
134
135    /// Add a custom prefix to all metrics reported by this indexer instance.
136    pub fn with_metrics_prefix(mut self, label: impl Into<String>) -> Self {
137        self.metrics_prefix = Some(label.into());
138        self
139    }
140
141    /// Build the IndexerCluster instance.
142    ///
143    /// Returns an error if:
144    /// - Required fields are missing
145    /// - Database connection cannot be established
146    /// - Metrics registry creation fails
147    pub async fn build(self) -> anyhow::Result<IndexerCluster> {
148        let database_url = self.database_url.context("database_url is required")?;
149
150        tracing_subscriber::fmt::init();
151
152        let registry = Registry::new();
153        let metrics = MetricsService::new(self.args.metrics_args, registry);
154        let client_args = self.args.client_args;
155
156        let indexer = Indexer::new_from_pg(
157            database_url,
158            self.db_args,
159            self.args.indexer_args,
160            client_args,
161            self.ingestion_config,
162            self.migrations,
163            self.metrics_prefix.as_deref(),
164            metrics.registry(),
165        )
166        .await?;
167
168        Ok(IndexerCluster { indexer, metrics })
169    }
170}
171
172impl IndexerCluster {
173    /// Create a new builder for constructing an IndexerCluster.
174    pub fn builder() -> IndexerClusterBuilder {
175        IndexerClusterBuilder::new()
176    }
177
178    /// Access to the indexer's metrics. This can be cloned before a call to [Self::run], to retain
179    /// shared access to the underlying metrics.
180    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
181        self.indexer.indexer_metrics()
182    }
183
184    /// Access to the ingestion service's metrics. This can be cloned before a call to [Self::run],
185    /// to retain shared access to the underlying metrics.
186    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
187        self.indexer.ingestion_metrics()
188    }
189
190    /// Starts the indexer and metrics service, returning a handle over the service's tasks.
191    /// The service will exit when the indexer has finished processing all the checkpoints it was
192    /// configured to process, or when it is instructed to shut down.
193    pub async fn run(self) -> anyhow::Result<Service> {
194        let s_indexer = self.indexer.run().await?;
195        let s_metrics = self.metrics.run().await?;
196
197        Ok(s_indexer.attach(s_metrics))
198    }
199}
200
201impl Deref for IndexerCluster {
202    type Target = Indexer<Db>;
203
204    fn deref(&self) -> &Self::Target {
205        &self.indexer
206    }
207}
208
209impl DerefMut for IndexerCluster {
210    fn deref_mut(&mut self) -> &mut Self::Target {
211        &mut self.indexer
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use std::net::IpAddr;
218    use std::net::Ipv4Addr;
219    use std::net::SocketAddr;
220
221    use async_trait::async_trait;
222    use diesel::Insertable;
223    use diesel::QueryDsl;
224    use diesel::Queryable;
225    use diesel_async::RunQueryDsl;
226    use sui_synthetic_ingestion::synthetic_ingestion;
227    use tempfile::tempdir;
228
229    use crate::FieldCount;
230    use crate::ingestion::ClientArgs;
231    use crate::ingestion::ingestion_client::IngestionClientArgs;
232    use crate::pipeline::Processor;
233    use crate::pipeline::concurrent::ConcurrentConfig;
234    use crate::postgres::Connection;
235    use crate::postgres::Db;
236    use crate::postgres::DbArgs;
237    use crate::postgres::temp::TempDb;
238    use crate::postgres::temp::get_available_port;
239    use crate::types::full_checkpoint_content::Checkpoint;
240
241    use super::*;
242
243    diesel::table! {
244        /// Table for storing transaction counts per checkpoint.
245        tx_counts (cp_sequence_number) {
246            cp_sequence_number -> BigInt,
247            count -> BigInt,
248        }
249    }
250
251    #[derive(Insertable, Queryable, FieldCount)]
252    #[diesel(table_name = tx_counts)]
253    struct StoredTxCount {
254        cp_sequence_number: i64,
255        count: i64,
256    }
257
258    /// Test concurrent pipeline for populating [tx_counts].
259    struct TxCounts;
260
261    #[async_trait]
262    impl Processor for TxCounts {
263        const NAME: &'static str = "tx_counts";
264        type Value = StoredTxCount;
265
266        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
267            Ok(vec![StoredTxCount {
268                cp_sequence_number: checkpoint.summary.sequence_number as i64,
269                count: checkpoint.transactions.len() as i64,
270            }])
271        }
272    }
273
274    #[async_trait]
275    impl crate::postgres::handler::Handler for TxCounts {
276        async fn commit<'a>(
277            values: &[Self::Value],
278            conn: &mut Connection<'a>,
279        ) -> anyhow::Result<usize> {
280            Ok(diesel::insert_into(tx_counts::table)
281                .values(values)
282                .on_conflict_do_nothing()
283                .execute(conn)
284                .await?)
285        }
286    }
287
288    #[tokio::test]
289    async fn test_indexer_cluster() {
290        let db = TempDb::new().expect("Failed to create temporary database");
291        let url = db.database().url();
292
293        // Generate test transactions to ingest.
294        let checkpoint_dir = tempdir().unwrap();
295        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
296            ingestion_dir: checkpoint_dir.path().to_owned(),
297            starting_checkpoint: 0,
298            num_checkpoints: 10,
299            checkpoint_size: 2,
300        })
301        .await;
302
303        let reader = Db::for_read(url.clone(), DbArgs::default()).await.unwrap();
304        let writer = Db::for_write(url.clone(), DbArgs::default()).await.unwrap();
305
306        {
307            // Create the table we are going to write to. We have to do this manually, because this
308            // table is not handled by migrations.
309            let mut conn = writer.connect().await.unwrap();
310            diesel::sql_query(
311                r#"
312                CREATE TABLE tx_counts (
313                    cp_sequence_number  BIGINT PRIMARY KEY,
314                    count               BIGINT NOT NULL
315                )
316                "#,
317            )
318            .execute(&mut conn)
319            .await
320            .unwrap();
321        }
322
323        let metrics_address =
324            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), get_available_port());
325
326        let args = Args {
327            client_args: ClientArgs {
328                ingestion: IngestionClientArgs {
329                    local_ingestion_path: Some(checkpoint_dir.path().to_owned()),
330                    ..Default::default()
331                },
332                ..Default::default()
333            },
334            indexer_args: IndexerArgs {
335                first_checkpoint: Some(0),
336                last_checkpoint: Some(9),
337                ..Default::default()
338            },
339            metrics_args: MetricsArgs { metrics_address },
340        };
341
342        let mut indexer = IndexerCluster::builder()
343            .with_database_url(url.clone())
344            .with_args(args)
345            .build()
346            .await
347            .unwrap();
348
349        indexer
350            .concurrent_pipeline(TxCounts, ConcurrentConfig::default())
351            .await
352            .unwrap();
353
354        let ingestion_metrics = indexer.ingestion_metrics().clone();
355        let indexer_metrics = indexer.indexer_metrics().clone();
356
357        // Run the indexer until it signals completion. We have configured it to stop after
358        // ingesting 10 checkpoints, so it should shut itself down.
359        indexer.run().await.unwrap().join().await.unwrap();
360
361        // Check that the results were all written out.
362        {
363            let mut conn = reader.connect().await.unwrap();
364            let counts: Vec<StoredTxCount> = tx_counts::table
365                .order_by(tx_counts::cp_sequence_number)
366                .load(&mut conn)
367                .await
368                .unwrap();
369
370            assert_eq!(counts.len(), 10);
371            for (i, count) in counts.iter().enumerate() {
372                assert_eq!(count.cp_sequence_number, i as i64);
373                assert_eq!(count.count, 3); // 2 user transactions + 1 settlement transaction
374            }
375        }
376
377        // Check that ingestion metrics were updated.
378        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 10);
379        // 10 checkpoints, 2 user transactions + 1 settlement transaction per checkpoint
380        assert_eq!(ingestion_metrics.total_ingested_transactions.get(), 30);
381        assert_eq!(ingestion_metrics.latest_ingested_checkpoint.get(), 9);
382
383        macro_rules! assert_pipeline_metric {
384            ($name:ident, $value:expr) => {
385                assert_eq!(
386                    indexer_metrics
387                        .$name
388                        .get_metric_with_label_values(&["tx_counts"])
389                        .unwrap()
390                        .get(),
391                    $value
392                );
393            };
394        }
395
396        assert_pipeline_metric!(total_handler_checkpoints_received, 10);
397        assert_pipeline_metric!(total_handler_checkpoints_processed, 10);
398        assert_pipeline_metric!(total_handler_rows_created, 10);
399        assert_pipeline_metric!(latest_processed_checkpoint, 9);
400        assert_pipeline_metric!(total_collector_checkpoints_received, 10);
401        assert_pipeline_metric!(total_collector_rows_received, 10);
402        assert_pipeline_metric!(latest_collected_checkpoint, 9);
403
404        // The watermark checkpoint is inclusive, but the transaction is exclusive
405        assert_pipeline_metric!(watermark_checkpoint, 9);
406        assert_pipeline_metric!(watermark_checkpoint_in_db, 9);
407        // 10 checkpoints, 2 user transactions + 1 settlement transaction per checkpoint
408        assert_pipeline_metric!(watermark_transaction, 30);
409        assert_pipeline_metric!(watermark_transaction_in_db, 30);
410    }
411
412    #[test]
413    fn test_individual_methods_override_bundled_args() {
414        let builder = IndexerClusterBuilder::new()
415            .with_args(Args {
416                indexer_args: IndexerArgs {
417                    first_checkpoint: Some(100),
418                    ..Default::default()
419                },
420                client_args: ClientArgs {
421                    ingestion: IngestionClientArgs {
422                        local_ingestion_path: Some("/bundled".into()),
423                        ..Default::default()
424                    },
425                    ..Default::default()
426                },
427                metrics_args: MetricsArgs {
428                    metrics_address: "127.0.0.1:8080".parse().unwrap(),
429                },
430            })
431            .with_indexer_args(IndexerArgs {
432                first_checkpoint: Some(200),
433                ..Default::default()
434            })
435            .with_client_args(ClientArgs {
436                ingestion: IngestionClientArgs {
437                    local_ingestion_path: Some("/individual".into()),
438                    ..Default::default()
439                },
440                ..Default::default()
441            })
442            .with_metrics_args(MetricsArgs {
443                metrics_address: "127.0.0.1:9090".parse().unwrap(),
444            });
445
446        assert_eq!(builder.args.indexer_args.first_checkpoint, Some(200));
447        assert_eq!(
448            builder
449                .args
450                .client_args
451                .ingestion
452                .local_ingestion_path
453                .as_ref()
454                .unwrap()
455                .to_string_lossy(),
456            "/individual"
457        );
458        assert_eq!(
459            builder.args.metrics_args.metrics_address.to_string(),
460            "127.0.0.1:9090"
461        );
462    }
463
464    #[test]
465    fn test_bundled_args_override_individual_methods() {
466        let builder = IndexerClusterBuilder::new()
467            .with_indexer_args(IndexerArgs {
468                first_checkpoint: Some(200),
469                ..Default::default()
470            })
471            .with_client_args(ClientArgs {
472                ingestion: IngestionClientArgs {
473                    local_ingestion_path: Some("/individual".into()),
474                    ..Default::default()
475                },
476                ..Default::default()
477            })
478            .with_metrics_args(MetricsArgs {
479                metrics_address: "127.0.0.1:9090".parse().unwrap(),
480            })
481            .with_args(Args {
482                indexer_args: IndexerArgs {
483                    first_checkpoint: Some(100),
484                    ..Default::default()
485                },
486                client_args: ClientArgs {
487                    ingestion: IngestionClientArgs {
488                        local_ingestion_path: Some("/bundled".into()),
489                        ..Default::default()
490                    },
491                    ..Default::default()
492                },
493                metrics_args: MetricsArgs {
494                    metrics_address: "127.0.0.1:8080".parse().unwrap(),
495                },
496            });
497
498        assert_eq!(builder.args.indexer_args.first_checkpoint, Some(100));
499        assert_eq!(
500            builder
501                .args
502                .client_args
503                .ingestion
504                .local_ingestion_path
505                .as_ref()
506                .unwrap()
507                .to_string_lossy(),
508            "/bundled"
509        );
510        assert_eq!(
511            builder.args.metrics_args.metrics_address.to_string(),
512            "127.0.0.1:8080"
513        );
514    }
515}