sui_indexer_alt_framework/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    ops::{Deref, DerefMut},
6    sync::Arc,
7};
8
9use anyhow::Context;
10use diesel_migrations::EmbeddedMigrations;
11use prometheus::Registry;
12use sui_futures::service::Service;
13use sui_indexer_alt_metrics::{MetricsArgs, MetricsService};
14use url::Url;
15
16use crate::{
17    Indexer, IndexerArgs, Result,
18    ingestion::{ClientArgs, IngestionConfig},
19    metrics::{IndexerMetrics, IngestionMetrics},
20    postgres::{Db, DbArgs},
21};
22
23/// Bundle of arguments for setting up an indexer cluster (an Indexer and its associated Metrics
24/// service). This struct is offered as a convenience for the common case of parsing command-line
25/// arguments for a binary running a standalone indexer and its metrics service.
26#[derive(clap::Parser, Debug, Default)]
27pub struct Args {
28    /// What to index and in what time range.
29    #[clap(flatten)]
30    pub indexer_args: IndexerArgs,
31
32    /// Where to get checkpoint data from.
33    #[clap(flatten)]
34    pub client_args: Option<ClientArgs>,
35
36    /// How to expose metrics.
37    #[clap(flatten)]
38    pub metrics_args: MetricsArgs,
39}
40
41/// An opinionated [IndexerCluster] that spins up an [Indexer] implementation using Postgres as its
42/// store, along with a [MetricsService] and a tracing subscriber (outputting to stderr) to provide
43/// observability. It is a useful starting point for an indexer binary.
44pub struct IndexerCluster {
45    indexer: Indexer<Db>,
46    metrics: MetricsService,
47}
48
49/// Builder for creating an IndexerCluster with a fluent API
50#[derive(Default)]
51pub struct IndexerClusterBuilder {
52    database_url: Option<Url>,
53    db_args: DbArgs,
54    args: Args,
55    ingestion_config: IngestionConfig,
56    migrations: Option<&'static EmbeddedMigrations>,
57    metrics_prefix: Option<String>,
58}
59
60impl IndexerClusterBuilder {
61    /// Create a new builder instance
62    pub fn new() -> Self {
63        Self::default()
64    }
65
66    /// Set the PostgreSQL database connection URL (required).
67    ///
68    /// This should be a valid PostgreSQL connection urls, e.g.:
69    /// - `postgres://user:password@host:5432/mydb`
70    pub fn with_database_url(mut self, url: Url) -> Self {
71        self.database_url = Some(url);
72        self
73    }
74
75    /// Configure database connection parameters such as pool size, connection timeout, etc.
76    ///
77    /// Defaults to [`DbArgs::default()`] if not specified, which provides reasonable defaults
78    /// for most use cases.
79    pub fn with_db_args(mut self, args: DbArgs) -> Self {
80        self.db_args = args;
81        self
82    }
83
84    /// Set the main indexer cluster's configuration arguments (required).
85    ///
86    /// This bundles all configuration needed for the indexer:
87    /// - `IndexerArgs`: Controls what to index (checkpoint range, which pipelines to run, watermark behavior)
88    /// - `ClientArgs`: Specifies where to fetch checkpoint data from (remote store, local path, or RPC)
89    /// - `MetricsArgs`: Configures how to expose Prometheus metrics (address to serve on)
90    ///
91    /// This overwrites any previously set args.
92    pub fn with_args(mut self, args: Args) -> Self {
93        self.args = args;
94        self
95    }
96
97    /// Set indexer arguments (what to index and in what time range).
98    /// This overwrites any previously set indexer args.
99    pub fn with_indexer_args(mut self, args: IndexerArgs) -> Self {
100        self.args.indexer_args = args;
101        self
102    }
103
104    /// Set client arguments (where to get checkpoint data from).
105    /// This overwrites any previously set client args.
106    pub fn with_client_args(mut self, args: ClientArgs) -> Self {
107        self.args.client_args = Some(args);
108        self
109    }
110
111    /// Set metrics arguments (how to expose metrics).
112    /// This overwrites any previously set metrics args.
113    pub fn with_metrics_args(mut self, args: MetricsArgs) -> Self {
114        self.args.metrics_args = args;
115        self
116    }
117
118    /// Set the ingestion configuration, which controls how the ingestion service is
119    /// set-up (its concurrency, polling, intervals, etc).
120    pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
121        self.ingestion_config = config;
122        self
123    }
124
125    /// Set database migrations to run.
126    ///
127    /// See the [Diesel migration guide](https://diesel.rs/guides/migration_guide.html) for more information.
128    pub fn with_migrations(mut self, migrations: &'static EmbeddedMigrations) -> Self {
129        self.migrations = Some(migrations);
130        self
131    }
132
133    /// Add a custom prefix to all metrics reported by this indexer instance.
134    pub fn with_metrics_prefix(mut self, label: impl Into<String>) -> Self {
135        self.metrics_prefix = Some(label.into());
136        self
137    }
138
139    /// Build the IndexerCluster instance.
140    ///
141    /// Returns an error if:
142    /// - Required fields are missing
143    /// - Database connection cannot be established
144    /// - Metrics registry creation fails
145    pub async fn build(self) -> Result<IndexerCluster> {
146        let database_url = self.database_url.context("database_url is required")?;
147
148        tracing_subscriber::fmt::init();
149
150        let registry = Registry::new();
151        let metrics = MetricsService::new(self.args.metrics_args, registry);
152        let client_args = self.args.client_args.context("client_args is required")?;
153
154        let indexer = Indexer::new_from_pg(
155            database_url,
156            self.db_args,
157            self.args.indexer_args,
158            client_args,
159            self.ingestion_config,
160            self.migrations,
161            self.metrics_prefix.as_deref(),
162            metrics.registry(),
163        )
164        .await?;
165
166        Ok(IndexerCluster { indexer, metrics })
167    }
168}
169
170impl IndexerCluster {
171    /// Create a new builder for constructing an IndexerCluster.
172    pub fn builder() -> IndexerClusterBuilder {
173        IndexerClusterBuilder::new()
174    }
175
176    /// Access to the indexer's metrics. This can be cloned before a call to [Self::run], to retain
177    /// shared access to the underlying metrics.
178    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
179        self.indexer.indexer_metrics()
180    }
181
182    /// Access to the ingestion service's metrics. This can be cloned before a call to [Self::run],
183    /// to retain shared access to the underlying metrics.
184    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
185        self.indexer.ingestion_metrics()
186    }
187
188    /// Starts the indexer and metrics service, returning a handle over the service's tasks.
189    /// The service will exit when the indexer has finished processing all the checkpoints it was
190    /// configured to process, or when it is instructed to shut down.
191    pub async fn run(self) -> Result<Service> {
192        let s_indexer = self.indexer.run().await?;
193        let s_metrics = self.metrics.run().await?;
194
195        Ok(s_indexer.attach(s_metrics))
196    }
197}
198
199impl Deref for IndexerCluster {
200    type Target = Indexer<Db>;
201
202    fn deref(&self) -> &Self::Target {
203        &self.indexer
204    }
205}
206
207impl DerefMut for IndexerCluster {
208    fn deref_mut(&mut self) -> &mut Self::Target {
209        &mut self.indexer
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
216
217    use async_trait::async_trait;
218    use diesel::{Insertable, QueryDsl, Queryable};
219    use diesel_async::RunQueryDsl;
220    use sui_synthetic_ingestion::synthetic_ingestion;
221    use tempfile::tempdir;
222
223    use crate::FieldCount;
224    use crate::ingestion::ClientArgs;
225    use crate::ingestion::ingestion_client::IngestionClientArgs;
226    use crate::pipeline::Processor;
227    use crate::pipeline::concurrent::ConcurrentConfig;
228    use crate::postgres::{
229        Connection, Db, DbArgs,
230        temp::{TempDb, get_available_port},
231    };
232    use crate::types::full_checkpoint_content::Checkpoint;
233
234    use super::*;
235
236    diesel::table! {
237        /// Table for storing transaction counts per checkpoint.
238        tx_counts (cp_sequence_number) {
239            cp_sequence_number -> BigInt,
240            count -> BigInt,
241        }
242    }
243
244    #[derive(Insertable, Queryable, FieldCount)]
245    #[diesel(table_name = tx_counts)]
246    struct StoredTxCount {
247        cp_sequence_number: i64,
248        count: i64,
249    }
250
251    /// Test concurrent pipeline for populating [tx_counts].
252    struct TxCounts;
253
254    #[async_trait]
255    impl Processor for TxCounts {
256        const NAME: &'static str = "tx_counts";
257        type Value = StoredTxCount;
258
259        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
260            Ok(vec![StoredTxCount {
261                cp_sequence_number: checkpoint.summary.sequence_number as i64,
262                count: checkpoint.transactions.len() as i64,
263            }])
264        }
265    }
266
267    #[async_trait]
268    impl crate::postgres::handler::Handler for TxCounts {
269        async fn commit<'a>(
270            values: &[Self::Value],
271            conn: &mut Connection<'a>,
272        ) -> anyhow::Result<usize> {
273            Ok(diesel::insert_into(tx_counts::table)
274                .values(values)
275                .on_conflict_do_nothing()
276                .execute(conn)
277                .await?)
278        }
279    }
280
281    #[tokio::test]
282    async fn test_indexer_cluster() {
283        let db = TempDb::new().expect("Failed to create temporary database");
284        let url = db.database().url();
285
286        // Generate test transactions to ingest.
287        let checkpoint_dir = tempdir().unwrap();
288        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
289            ingestion_dir: checkpoint_dir.path().to_owned(),
290            starting_checkpoint: 0,
291            num_checkpoints: 10,
292            checkpoint_size: 2,
293        })
294        .await;
295
296        let reader = Db::for_read(url.clone(), DbArgs::default()).await.unwrap();
297        let writer = Db::for_write(url.clone(), DbArgs::default()).await.unwrap();
298
299        {
300            // Create the table we are going to write to. We have to do this manually, because this
301            // table is not handled by migrations.
302            let mut conn = writer.connect().await.unwrap();
303            diesel::sql_query(
304                r#"
305                CREATE TABLE tx_counts (
306                    cp_sequence_number  BIGINT PRIMARY KEY,
307                    count               BIGINT NOT NULL
308                )
309                "#,
310            )
311            .execute(&mut conn)
312            .await
313            .unwrap();
314        }
315
316        let metrics_address =
317            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), get_available_port());
318
319        let args = Args {
320            client_args: Some(ClientArgs {
321                ingestion: IngestionClientArgs {
322                    local_ingestion_path: Some(checkpoint_dir.path().to_owned()),
323                    ..Default::default()
324                },
325                ..Default::default()
326            }),
327            indexer_args: IndexerArgs {
328                first_checkpoint: Some(0),
329                last_checkpoint: Some(9),
330                ..Default::default()
331            },
332            metrics_args: MetricsArgs { metrics_address },
333        };
334
335        let mut indexer = IndexerCluster::builder()
336            .with_database_url(url.clone())
337            .with_args(args)
338            .build()
339            .await
340            .unwrap();
341
342        indexer
343            .concurrent_pipeline(TxCounts, ConcurrentConfig::default())
344            .await
345            .unwrap();
346
347        let ingestion_metrics = indexer.ingestion_metrics().clone();
348        let indexer_metrics = indexer.indexer_metrics().clone();
349
350        // Run the indexer until it signals completion. We have configured it to stop after
351        // ingesting 10 checkpoints, so it should shut itself down.
352        indexer.run().await.unwrap().join().await.unwrap();
353
354        // Check that the results were all written out.
355        {
356            let mut conn = reader.connect().await.unwrap();
357            let counts: Vec<StoredTxCount> = tx_counts::table
358                .order_by(tx_counts::cp_sequence_number)
359                .load(&mut conn)
360                .await
361                .unwrap();
362
363            assert_eq!(counts.len(), 10);
364            for (i, count) in counts.iter().enumerate() {
365                assert_eq!(count.cp_sequence_number, i as i64);
366                assert_eq!(count.count, 3); // 2 user transactions + 1 settlement transaction
367            }
368        }
369
370        // Check that ingestion metrics were updated.
371        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 10);
372        // 10 checkpoints, 2 user transactions + 1 settlement transaction per checkpoint
373        assert_eq!(ingestion_metrics.total_ingested_transactions.get(), 30);
374        assert_eq!(ingestion_metrics.latest_ingested_checkpoint.get(), 9);
375
376        macro_rules! assert_pipeline_metric {
377            ($name:ident, $value:expr) => {
378                assert_eq!(
379                    indexer_metrics
380                        .$name
381                        .get_metric_with_label_values(&["tx_counts"])
382                        .unwrap()
383                        .get(),
384                    $value
385                );
386            };
387        }
388
389        assert_pipeline_metric!(total_handler_checkpoints_received, 10);
390        assert_pipeline_metric!(total_handler_checkpoints_processed, 10);
391        assert_pipeline_metric!(total_handler_rows_created, 10);
392        assert_pipeline_metric!(latest_processed_checkpoint, 9);
393        assert_pipeline_metric!(total_collector_checkpoints_received, 10);
394        assert_pipeline_metric!(total_collector_rows_received, 10);
395        assert_pipeline_metric!(latest_collected_checkpoint, 9);
396
397        // The watermark checkpoint is inclusive, but the transaction is exclusive
398        assert_pipeline_metric!(watermark_checkpoint, 9);
399        assert_pipeline_metric!(watermark_checkpoint_in_db, 9);
400        // 10 checkpoints, 2 user transactions + 1 settlement transaction per checkpoint
401        assert_pipeline_metric!(watermark_transaction, 30);
402        assert_pipeline_metric!(watermark_transaction_in_db, 30);
403    }
404
405    #[test]
406    fn test_individual_methods_override_bundled_args() {
407        let builder = IndexerClusterBuilder::new()
408            .with_args(Args {
409                indexer_args: IndexerArgs {
410                    first_checkpoint: Some(100),
411                    ..Default::default()
412                },
413                client_args: Some(ClientArgs {
414                    ingestion: IngestionClientArgs {
415                        local_ingestion_path: Some("/bundled".into()),
416                        ..Default::default()
417                    },
418                    ..Default::default()
419                }),
420                metrics_args: MetricsArgs {
421                    metrics_address: "127.0.0.1:8080".parse().unwrap(),
422                },
423            })
424            .with_indexer_args(IndexerArgs {
425                first_checkpoint: Some(200),
426                ..Default::default()
427            })
428            .with_client_args(ClientArgs {
429                ingestion: IngestionClientArgs {
430                    local_ingestion_path: Some("/individual".into()),
431                    ..Default::default()
432                },
433                ..Default::default()
434            })
435            .with_metrics_args(MetricsArgs {
436                metrics_address: "127.0.0.1:9090".parse().unwrap(),
437            });
438
439        assert_eq!(builder.args.indexer_args.first_checkpoint, Some(200));
440        assert_eq!(
441            builder
442                .args
443                .client_args
444                .unwrap()
445                .ingestion
446                .local_ingestion_path
447                .unwrap()
448                .to_string_lossy(),
449            "/individual"
450        );
451        assert_eq!(
452            builder.args.metrics_args.metrics_address.to_string(),
453            "127.0.0.1:9090"
454        );
455    }
456
457    #[test]
458    fn test_bundled_args_override_individual_methods() {
459        let builder = IndexerClusterBuilder::new()
460            .with_indexer_args(IndexerArgs {
461                first_checkpoint: Some(200),
462                ..Default::default()
463            })
464            .with_client_args(ClientArgs {
465                ingestion: IngestionClientArgs {
466                    local_ingestion_path: Some("/individual".into()),
467                    ..Default::default()
468                },
469                ..Default::default()
470            })
471            .with_metrics_args(MetricsArgs {
472                metrics_address: "127.0.0.1:9090".parse().unwrap(),
473            })
474            .with_args(Args {
475                indexer_args: IndexerArgs {
476                    first_checkpoint: Some(100),
477                    ..Default::default()
478                },
479                client_args: Some(ClientArgs {
480                    ingestion: IngestionClientArgs {
481                        local_ingestion_path: Some("/bundled".into()),
482                        ..Default::default()
483                    },
484                    ..Default::default()
485                }),
486                metrics_args: MetricsArgs {
487                    metrics_address: "127.0.0.1:8080".parse().unwrap(),
488                },
489            });
490
491        assert_eq!(builder.args.indexer_args.first_checkpoint, Some(100));
492        assert_eq!(
493            builder
494                .args
495                .client_args
496                .unwrap()
497                .ingestion
498                .local_ingestion_path
499                .unwrap()
500                .to_string_lossy(),
501            "/bundled"
502        );
503        assert_eq!(
504            builder.args.metrics_args.metrics_address.to_string(),
505            "127.0.0.1:8080"
506        );
507    }
508}