sui_indexer_alt_framework/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::ops::Deref;
5use std::ops::DerefMut;
6use std::sync::Arc;
7
8use anyhow::Context;
9use diesel_migrations::EmbeddedMigrations;
10use prometheus::Registry;
11use sui_futures::service::Service;
12use sui_indexer_alt_metrics::MetricsArgs;
13use sui_indexer_alt_metrics::MetricsService;
14use url::Url;
15
16use crate::Indexer;
17use crate::IndexerArgs;
18use crate::Result;
19use crate::ingestion::ClientArgs;
20use crate::ingestion::IngestionConfig;
21use crate::metrics::IndexerMetrics;
22use crate::metrics::IngestionMetrics;
23use crate::postgres::Db;
24use crate::postgres::DbArgs;
25
26/// Bundle of arguments for setting up an indexer cluster (an Indexer and its associated Metrics
27/// service). This struct is offered as a convenience for the common case of parsing command-line
28/// arguments for a binary running a standalone indexer and its metrics service.
29#[derive(clap::Parser, Debug, Default)]
30pub struct Args {
31    /// What to index and in what time range.
32    #[clap(flatten)]
33    pub indexer_args: IndexerArgs,
34
35    /// Where to get checkpoint data from.
36    #[clap(flatten)]
37    pub client_args: ClientArgs,
38
39    /// How to expose metrics.
40    #[clap(flatten)]
41    pub metrics_args: MetricsArgs,
42}
43
44/// An opinionated [IndexerCluster] that spins up an [Indexer] implementation using Postgres as its
45/// store, along with a [MetricsService] and a tracing subscriber (outputting to stderr) to provide
46/// observability. It is a useful starting point for an indexer binary.
47pub struct IndexerCluster {
48    indexer: Indexer<Db>,
49    metrics: MetricsService,
50}
51
52/// Builder for creating an IndexerCluster with a fluent API
53#[derive(Default)]
54pub struct IndexerClusterBuilder {
55    database_url: Option<Url>,
56    db_args: DbArgs,
57    args: Args,
58    ingestion_config: IngestionConfig,
59    migrations: Option<&'static EmbeddedMigrations>,
60    metrics_prefix: Option<String>,
61}
62
63impl IndexerClusterBuilder {
64    /// Create a new builder instance
65    pub fn new() -> Self {
66        Self::default()
67    }
68
69    /// Set the PostgreSQL database connection URL (required).
70    ///
71    /// This should be a valid PostgreSQL connection urls, e.g.:
72    /// - `postgres://user:password@host:5432/mydb`
73    pub fn with_database_url(mut self, url: Url) -> Self {
74        self.database_url = Some(url);
75        self
76    }
77
78    /// Configure database connection parameters such as pool size, connection timeout, etc.
79    ///
80    /// Defaults to [`DbArgs::default()`] if not specified, which provides reasonable defaults
81    /// for most use cases.
82    pub fn with_db_args(mut self, args: DbArgs) -> Self {
83        self.db_args = args;
84        self
85    }
86
87    /// Set the main indexer cluster's configuration arguments (required).
88    ///
89    /// This bundles all configuration needed for the indexer:
90    /// - `IndexerArgs`: Controls what to index (checkpoint range, which pipelines to run, watermark behavior)
91    /// - `ClientArgs`: Specifies where to fetch checkpoint data from (remote store, local path, or RPC)
92    /// - `MetricsArgs`: Configures how to expose Prometheus metrics (address to serve on)
93    ///
94    /// This overwrites any previously set args.
95    pub fn with_args(mut self, args: Args) -> Self {
96        self.args = args;
97        self
98    }
99
100    /// Set indexer arguments (what to index and in what time range).
101    /// This overwrites any previously set indexer args.
102    pub fn with_indexer_args(mut self, args: IndexerArgs) -> Self {
103        self.args.indexer_args = args;
104        self
105    }
106
107    /// Set client arguments (where to get checkpoint data from).
108    /// This overwrites any previously set client args.
109    pub fn with_client_args(mut self, args: ClientArgs) -> Self {
110        self.args.client_args = args;
111        self
112    }
113
114    /// Set metrics arguments (how to expose metrics).
115    /// This overwrites any previously set metrics args.
116    pub fn with_metrics_args(mut self, args: MetricsArgs) -> Self {
117        self.args.metrics_args = args;
118        self
119    }
120
121    /// Set the ingestion configuration, which controls how the ingestion service is
122    /// set-up (its concurrency, polling, intervals, etc).
123    pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
124        self.ingestion_config = config;
125        self
126    }
127
128    /// Set database migrations to run.
129    ///
130    /// See the [Diesel migration guide](https://diesel.rs/guides/migration_guide.html) for more information.
131    pub fn with_migrations(mut self, migrations: &'static EmbeddedMigrations) -> Self {
132        self.migrations = Some(migrations);
133        self
134    }
135
136    /// Add a custom prefix to all metrics reported by this indexer instance.
137    pub fn with_metrics_prefix(mut self, label: impl Into<String>) -> Self {
138        self.metrics_prefix = Some(label.into());
139        self
140    }
141
142    /// Build the IndexerCluster instance.
143    ///
144    /// Returns an error if:
145    /// - Required fields are missing
146    /// - Database connection cannot be established
147    /// - Metrics registry creation fails
148    pub async fn build(self) -> Result<IndexerCluster> {
149        let database_url = self.database_url.context("database_url is required")?;
150
151        tracing_subscriber::fmt::init();
152
153        let registry = Registry::new();
154        let metrics = MetricsService::new(self.args.metrics_args, registry);
155        let client_args = self.args.client_args;
156
157        let indexer = Indexer::new_from_pg(
158            database_url,
159            self.db_args,
160            self.args.indexer_args,
161            client_args,
162            self.ingestion_config,
163            self.migrations,
164            self.metrics_prefix.as_deref(),
165            metrics.registry(),
166        )
167        .await?;
168
169        Ok(IndexerCluster { indexer, metrics })
170    }
171}
172
173impl IndexerCluster {
174    /// Create a new builder for constructing an IndexerCluster.
175    pub fn builder() -> IndexerClusterBuilder {
176        IndexerClusterBuilder::new()
177    }
178
179    /// Access to the indexer's metrics. This can be cloned before a call to [Self::run], to retain
180    /// shared access to the underlying metrics.
181    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
182        self.indexer.indexer_metrics()
183    }
184
185    /// Access to the ingestion service's metrics. This can be cloned before a call to [Self::run],
186    /// to retain shared access to the underlying metrics.
187    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
188        self.indexer.ingestion_metrics()
189    }
190
191    /// Starts the indexer and metrics service, returning a handle over the service's tasks.
192    /// The service will exit when the indexer has finished processing all the checkpoints it was
193    /// configured to process, or when it is instructed to shut down.
194    pub async fn run(self) -> Result<Service> {
195        let s_indexer = self.indexer.run().await?;
196        let s_metrics = self.metrics.run().await?;
197
198        Ok(s_indexer.attach(s_metrics))
199    }
200}
201
202impl Deref for IndexerCluster {
203    type Target = Indexer<Db>;
204
205    fn deref(&self) -> &Self::Target {
206        &self.indexer
207    }
208}
209
210impl DerefMut for IndexerCluster {
211    fn deref_mut(&mut self) -> &mut Self::Target {
212        &mut self.indexer
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use std::net::IpAddr;
219    use std::net::Ipv4Addr;
220    use std::net::SocketAddr;
221
222    use async_trait::async_trait;
223    use diesel::Insertable;
224    use diesel::QueryDsl;
225    use diesel::Queryable;
226    use diesel_async::RunQueryDsl;
227    use sui_synthetic_ingestion::synthetic_ingestion;
228    use tempfile::tempdir;
229
230    use crate::FieldCount;
231    use crate::ingestion::ClientArgs;
232    use crate::ingestion::ingestion_client::IngestionClientArgs;
233    use crate::pipeline::Processor;
234    use crate::pipeline::concurrent::ConcurrentConfig;
235    use crate::postgres::Connection;
236    use crate::postgres::Db;
237    use crate::postgres::DbArgs;
238    use crate::postgres::temp::TempDb;
239    use crate::postgres::temp::get_available_port;
240    use crate::types::full_checkpoint_content::Checkpoint;
241
242    use super::*;
243
244    diesel::table! {
245        /// Table for storing transaction counts per checkpoint.
246        tx_counts (cp_sequence_number) {
247            cp_sequence_number -> BigInt,
248            count -> BigInt,
249        }
250    }
251
252    #[derive(Insertable, Queryable, FieldCount)]
253    #[diesel(table_name = tx_counts)]
254    struct StoredTxCount {
255        cp_sequence_number: i64,
256        count: i64,
257    }
258
259    /// Test concurrent pipeline for populating [tx_counts].
260    struct TxCounts;
261
262    #[async_trait]
263    impl Processor for TxCounts {
264        const NAME: &'static str = "tx_counts";
265        type Value = StoredTxCount;
266
267        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
268            Ok(vec![StoredTxCount {
269                cp_sequence_number: checkpoint.summary.sequence_number as i64,
270                count: checkpoint.transactions.len() as i64,
271            }])
272        }
273    }
274
275    #[async_trait]
276    impl crate::postgres::handler::Handler for TxCounts {
277        async fn commit<'a>(
278            values: &[Self::Value],
279            conn: &mut Connection<'a>,
280        ) -> anyhow::Result<usize> {
281            Ok(diesel::insert_into(tx_counts::table)
282                .values(values)
283                .on_conflict_do_nothing()
284                .execute(conn)
285                .await?)
286        }
287    }
288
289    #[tokio::test]
290    async fn test_indexer_cluster() {
291        let db = TempDb::new().expect("Failed to create temporary database");
292        let url = db.database().url();
293
294        // Generate test transactions to ingest.
295        let checkpoint_dir = tempdir().unwrap();
296        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
297            ingestion_dir: checkpoint_dir.path().to_owned(),
298            starting_checkpoint: 0,
299            num_checkpoints: 10,
300            checkpoint_size: 2,
301        })
302        .await;
303
304        let reader = Db::for_read(url.clone(), DbArgs::default()).await.unwrap();
305        let writer = Db::for_write(url.clone(), DbArgs::default()).await.unwrap();
306
307        {
308            // Create the table we are going to write to. We have to do this manually, because this
309            // table is not handled by migrations.
310            let mut conn = writer.connect().await.unwrap();
311            diesel::sql_query(
312                r#"
313                CREATE TABLE tx_counts (
314                    cp_sequence_number  BIGINT PRIMARY KEY,
315                    count               BIGINT NOT NULL
316                )
317                "#,
318            )
319            .execute(&mut conn)
320            .await
321            .unwrap();
322        }
323
324        let metrics_address =
325            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), get_available_port());
326
327        let args = Args {
328            client_args: ClientArgs {
329                ingestion: IngestionClientArgs {
330                    local_ingestion_path: Some(checkpoint_dir.path().to_owned()),
331                    ..Default::default()
332                },
333                ..Default::default()
334            },
335            indexer_args: IndexerArgs {
336                first_checkpoint: Some(0),
337                last_checkpoint: Some(9),
338                ..Default::default()
339            },
340            metrics_args: MetricsArgs { metrics_address },
341        };
342
343        let mut indexer = IndexerCluster::builder()
344            .with_database_url(url.clone())
345            .with_args(args)
346            .build()
347            .await
348            .unwrap();
349
350        indexer
351            .concurrent_pipeline(TxCounts, ConcurrentConfig::default())
352            .await
353            .unwrap();
354
355        let ingestion_metrics = indexer.ingestion_metrics().clone();
356        let indexer_metrics = indexer.indexer_metrics().clone();
357
358        // Run the indexer until it signals completion. We have configured it to stop after
359        // ingesting 10 checkpoints, so it should shut itself down.
360        indexer.run().await.unwrap().join().await.unwrap();
361
362        // Check that the results were all written out.
363        {
364            let mut conn = reader.connect().await.unwrap();
365            let counts: Vec<StoredTxCount> = tx_counts::table
366                .order_by(tx_counts::cp_sequence_number)
367                .load(&mut conn)
368                .await
369                .unwrap();
370
371            assert_eq!(counts.len(), 10);
372            for (i, count) in counts.iter().enumerate() {
373                assert_eq!(count.cp_sequence_number, i as i64);
374                assert_eq!(count.count, 3); // 2 user transactions + 1 settlement transaction
375            }
376        }
377
378        // Check that ingestion metrics were updated.
379        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 10);
380        // 10 checkpoints, 2 user transactions + 1 settlement transaction per checkpoint
381        assert_eq!(ingestion_metrics.total_ingested_transactions.get(), 30);
382        assert_eq!(ingestion_metrics.latest_ingested_checkpoint.get(), 9);
383
384        macro_rules! assert_pipeline_metric {
385            ($name:ident, $value:expr) => {
386                assert_eq!(
387                    indexer_metrics
388                        .$name
389                        .get_metric_with_label_values(&["tx_counts"])
390                        .unwrap()
391                        .get(),
392                    $value
393                );
394            };
395        }
396
397        assert_pipeline_metric!(total_handler_checkpoints_received, 10);
398        assert_pipeline_metric!(total_handler_checkpoints_processed, 10);
399        assert_pipeline_metric!(total_handler_rows_created, 10);
400        assert_pipeline_metric!(latest_processed_checkpoint, 9);
401        assert_pipeline_metric!(total_collector_checkpoints_received, 10);
402        assert_pipeline_metric!(total_collector_rows_received, 10);
403        assert_pipeline_metric!(latest_collected_checkpoint, 9);
404
405        // The watermark checkpoint is inclusive, but the transaction is exclusive
406        assert_pipeline_metric!(watermark_checkpoint, 9);
407        assert_pipeline_metric!(watermark_checkpoint_in_db, 9);
408        // 10 checkpoints, 2 user transactions + 1 settlement transaction per checkpoint
409        assert_pipeline_metric!(watermark_transaction, 30);
410        assert_pipeline_metric!(watermark_transaction_in_db, 30);
411    }
412
413    #[test]
414    fn test_individual_methods_override_bundled_args() {
415        let builder = IndexerClusterBuilder::new()
416            .with_args(Args {
417                indexer_args: IndexerArgs {
418                    first_checkpoint: Some(100),
419                    ..Default::default()
420                },
421                client_args: ClientArgs {
422                    ingestion: IngestionClientArgs {
423                        local_ingestion_path: Some("/bundled".into()),
424                        ..Default::default()
425                    },
426                    ..Default::default()
427                },
428                metrics_args: MetricsArgs {
429                    metrics_address: "127.0.0.1:8080".parse().unwrap(),
430                },
431            })
432            .with_indexer_args(IndexerArgs {
433                first_checkpoint: Some(200),
434                ..Default::default()
435            })
436            .with_client_args(ClientArgs {
437                ingestion: IngestionClientArgs {
438                    local_ingestion_path: Some("/individual".into()),
439                    ..Default::default()
440                },
441                ..Default::default()
442            })
443            .with_metrics_args(MetricsArgs {
444                metrics_address: "127.0.0.1:9090".parse().unwrap(),
445            });
446
447        assert_eq!(builder.args.indexer_args.first_checkpoint, Some(200));
448        assert_eq!(
449            builder
450                .args
451                .client_args
452                .ingestion
453                .local_ingestion_path
454                .as_ref()
455                .unwrap()
456                .to_string_lossy(),
457            "/individual"
458        );
459        assert_eq!(
460            builder.args.metrics_args.metrics_address.to_string(),
461            "127.0.0.1:9090"
462        );
463    }
464
465    #[test]
466    fn test_bundled_args_override_individual_methods() {
467        let builder = IndexerClusterBuilder::new()
468            .with_indexer_args(IndexerArgs {
469                first_checkpoint: Some(200),
470                ..Default::default()
471            })
472            .with_client_args(ClientArgs {
473                ingestion: IngestionClientArgs {
474                    local_ingestion_path: Some("/individual".into()),
475                    ..Default::default()
476                },
477                ..Default::default()
478            })
479            .with_metrics_args(MetricsArgs {
480                metrics_address: "127.0.0.1:9090".parse().unwrap(),
481            })
482            .with_args(Args {
483                indexer_args: IndexerArgs {
484                    first_checkpoint: Some(100),
485                    ..Default::default()
486                },
487                client_args: ClientArgs {
488                    ingestion: IngestionClientArgs {
489                        local_ingestion_path: Some("/bundled".into()),
490                        ..Default::default()
491                    },
492                    ..Default::default()
493                },
494                metrics_args: MetricsArgs {
495                    metrics_address: "127.0.0.1:8080".parse().unwrap(),
496                },
497            });
498
499        assert_eq!(builder.args.indexer_args.first_checkpoint, Some(100));
500        assert_eq!(
501            builder
502                .args
503                .client_args
504                .ingestion
505                .local_ingestion_path
506                .as_ref()
507                .unwrap()
508                .to_string_lossy(),
509            "/bundled"
510        );
511        assert_eq!(
512            builder.args.metrics_args.metrics_address.to_string(),
513            "127.0.0.1:8080"
514        );
515    }
516}