sui_indexer_alt_framework/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    ops::{Deref, DerefMut},
6    sync::Arc,
7};
8
9use anyhow::Context;
10use diesel_migrations::EmbeddedMigrations;
11use prometheus::Registry;
12use sui_indexer_alt_metrics::{MetricsArgs, MetricsService};
13use tokio::{signal, task::JoinHandle};
14use tokio_util::sync::CancellationToken;
15use tracing::info;
16use url::Url;
17
18use crate::postgres::{Db, DbArgs};
19use crate::{
20    Indexer, IndexerArgs, IndexerMetrics, Result,
21    ingestion::{ClientArgs, IngestionConfig},
22};
23
24/// Bundle of arguments for setting up an indexer cluster (an Indexer and its associated Metrics
25/// service). This struct is offered as a convenience for the common case of parsing command-line
26/// arguments for a binary running a standalone indexer and its metrics service.
27#[derive(clap::Parser, Debug, Default)]
28pub struct Args {
29    /// What to index and in what time range.
30    #[clap(flatten)]
31    pub indexer_args: IndexerArgs,
32
33    /// Where to get checkpoint data from.
34    #[clap(flatten)]
35    pub client_args: Option<ClientArgs>,
36
37    /// How to expose metrics.
38    #[clap(flatten)]
39    pub metrics_args: MetricsArgs,
40}
41
42/// An opinionated [IndexerCluster] that spins up an [Indexer] implementation using Postgres as its
43/// store, along with a [MetricsService] and a tracing subscriber (outputting to stderr) to provide
44/// observability. It is a useful starting point for an indexer binary.
45pub struct IndexerCluster {
46    indexer: Indexer<Db>,
47    metrics: MetricsService,
48
49    /// Cancelling this token signals cancellation to both the indexer and metrics service.
50    cancel: CancellationToken,
51}
52
53/// Builder for creating an IndexerCluster with a fluent API
54#[derive(Default)]
55pub struct IndexerClusterBuilder {
56    database_url: Option<Url>,
57    db_args: DbArgs,
58    args: Args,
59    ingestion_config: IngestionConfig,
60    migrations: Option<&'static EmbeddedMigrations>,
61    metrics_prefix: Option<String>,
62}
63
64impl IndexerClusterBuilder {
65    /// Create a new builder instance
66    pub fn new() -> Self {
67        Self::default()
68    }
69
70    /// Set the PostgreSQL database connection URL (required).
71    ///
72    /// This should be a valid PostgreSQL connection urls, e.g.:
73    /// - `postgres://user:password@host:5432/mydb`
74    pub fn with_database_url(mut self, url: Url) -> Self {
75        self.database_url = Some(url);
76        self
77    }
78
79    /// Configure database connection parameters such as pool size, connection timeout, etc.
80    ///
81    /// Defaults to [`DbArgs::default()`] if not specified, which provides reasonable defaults
82    /// for most use cases.
83    pub fn with_db_args(mut self, args: DbArgs) -> Self {
84        self.db_args = args;
85        self
86    }
87
88    /// Set the main indexer cluster's configuration arguments (required).
89    ///
90    /// This bundles all configuration needed for the indexer:
91    /// - `IndexerArgs`: Controls what to index (checkpoint range, which pipelines to run, watermark behavior)
92    /// - `ClientArgs`: Specifies where to fetch checkpoint data from (remote store, local path, or RPC)
93    /// - `MetricsArgs`: Configures how to expose Prometheus metrics (address to serve on)
94    ///
95    /// This overwrites any previously set args.
96    pub fn with_args(mut self, args: Args) -> Self {
97        self.args = args;
98        self
99    }
100
101    /// Set indexer arguments (what to index and in what time range).
102    /// This overwrites any previously set indexer args.
103    pub fn with_indexer_args(mut self, args: IndexerArgs) -> Self {
104        self.args.indexer_args = args;
105        self
106    }
107
108    /// Set client arguments (where to get checkpoint data from).
109    /// This overwrites any previously set client args.
110    pub fn with_client_args(mut self, args: ClientArgs) -> Self {
111        self.args.client_args = Some(args);
112        self
113    }
114
115    /// Set metrics arguments (how to expose metrics).
116    /// This overwrites any previously set metrics args.
117    pub fn with_metrics_args(mut self, args: MetricsArgs) -> Self {
118        self.args.metrics_args = args;
119        self
120    }
121
122    /// Set the ingestion configuration, which controls how the ingestion service is
123    /// set-up (its concurrency, polling, intervals, etc).
124    pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
125        self.ingestion_config = config;
126        self
127    }
128
129    /// Set database migrations to run.
130    ///
131    /// See the [Diesel migration guide](https://diesel.rs/guides/migration_guide.html) for more information.
132    pub fn with_migrations(mut self, migrations: &'static EmbeddedMigrations) -> Self {
133        self.migrations = Some(migrations);
134        self
135    }
136
137    /// Add a custom prefix to all metrics reported by this indexer instance.
138    pub fn with_metrics_prefix(mut self, label: impl Into<String>) -> Self {
139        self.metrics_prefix = Some(label.into());
140        self
141    }
142
143    /// Build the IndexerCluster instance.
144    ///
145    /// Returns an error if:
146    /// - Required fields are missing
147    /// - Database connection cannot be established
148    /// - Metrics registry creation fails
149    pub async fn build(self) -> Result<IndexerCluster> {
150        let database_url = self.database_url.context("database_url is required")?;
151
152        tracing_subscriber::fmt::init();
153
154        let cancel = CancellationToken::new();
155        let registry = Registry::new();
156        let metrics = MetricsService::new(self.args.metrics_args, registry, cancel.child_token());
157        let client_args = self.args.client_args.context("client_args is required")?;
158
159        let indexer = Indexer::new_from_pg(
160            database_url,
161            self.db_args,
162            self.args.indexer_args,
163            client_args,
164            self.ingestion_config,
165            self.migrations,
166            self.metrics_prefix.as_deref(),
167            metrics.registry(),
168            cancel.child_token(),
169        )
170        .await?;
171
172        Ok(IndexerCluster {
173            indexer,
174            metrics,
175            cancel,
176        })
177    }
178}
179
180impl IndexerCluster {
181    /// Create a new builder for constructing an IndexerCluster.
182    pub fn builder() -> IndexerClusterBuilder {
183        IndexerClusterBuilder::new()
184    }
185
186    /// Access to the indexer's metrics. This can be cloned before a call to [Self::run], to retain
187    /// shared access to the underlying metrics.
188    pub fn metrics(&self) -> &Arc<IndexerMetrics> {
189        self.indexer.metrics()
190    }
191
192    /// This token controls stopping the indexer and metrics service. Clone it before calling
193    /// [Self::run] to retain the ability to stop the service after it has started.
194    pub fn cancel(&self) -> &CancellationToken {
195        &self.cancel
196    }
197
198    /// Starts the indexer and metrics service, returning a handle to `await` the service's exit.
199    /// The service will exit when the indexer has finished processing all the checkpoints it was
200    /// configured to process, or when it receives an interrupt signal.
201    pub async fn run(self) -> Result<JoinHandle<()>> {
202        let h_ctrl_c = tokio::spawn({
203            let cancel = self.cancel.clone();
204            async move {
205                tokio::select! {
206                    _ = cancel.cancelled() => {}
207                    _ = signal::ctrl_c() => {
208                        info!("Received Ctrl-C, shutting down...");
209                        cancel.cancel();
210                    }
211                }
212            }
213        });
214
215        let h_metrics = self.metrics.run().await?;
216        let h_indexer = self.indexer.run().await?;
217
218        Ok(tokio::spawn(async move {
219            let _ = h_indexer.await;
220            self.cancel.cancel();
221            let _ = h_metrics.await;
222            let _ = h_ctrl_c.await;
223        }))
224    }
225}
226
227impl Deref for IndexerCluster {
228    type Target = Indexer<Db>;
229
230    fn deref(&self) -> &Self::Target {
231        &self.indexer
232    }
233}
234
235impl DerefMut for IndexerCluster {
236    fn deref_mut(&mut self) -> &mut Self::Target {
237        &mut self.indexer
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
244
245    use async_trait::async_trait;
246    use diesel::{Insertable, QueryDsl, Queryable};
247    use diesel_async::RunQueryDsl;
248    use sui_synthetic_ingestion::synthetic_ingestion;
249    use tempfile::tempdir;
250
251    use crate::FieldCount;
252    use crate::ingestion::ClientArgs;
253    use crate::pipeline::Processor;
254    use crate::pipeline::concurrent::{self, ConcurrentConfig};
255    use crate::postgres::{
256        Connection, Db, DbArgs,
257        temp::{TempDb, get_available_port},
258    };
259    use crate::types::full_checkpoint_content::CheckpointData;
260
261    use super::*;
262
263    diesel::table! {
264        /// Table for storing transaction counts per checkpoint.
265        tx_counts (cp_sequence_number) {
266            cp_sequence_number -> BigInt,
267            count -> BigInt,
268        }
269    }
270
271    #[derive(Insertable, Queryable, FieldCount)]
272    #[diesel(table_name = tx_counts)]
273    struct StoredTxCount {
274        cp_sequence_number: i64,
275        count: i64,
276    }
277
278    /// Test concurrent pipeline for populating [tx_counts].
279    struct TxCounts;
280
281    #[async_trait]
282    impl Processor for TxCounts {
283        const NAME: &'static str = "tx_counts";
284        type Value = StoredTxCount;
285
286        async fn process(
287            &self,
288            checkpoint: &Arc<CheckpointData>,
289        ) -> anyhow::Result<Vec<Self::Value>> {
290            Ok(vec![StoredTxCount {
291                cp_sequence_number: checkpoint.checkpoint_summary.sequence_number as i64,
292                count: checkpoint.transactions.len() as i64,
293            }])
294        }
295    }
296
297    #[async_trait]
298    impl concurrent::Handler for TxCounts {
299        type Store = Db;
300
301        async fn commit<'a>(
302            values: &[Self::Value],
303            conn: &mut Connection<'a>,
304        ) -> anyhow::Result<usize> {
305            Ok(diesel::insert_into(tx_counts::table)
306                .values(values)
307                .on_conflict_do_nothing()
308                .execute(conn)
309                .await?)
310        }
311    }
312
313    #[tokio::test]
314    async fn test_indexer_cluster() {
315        let db = TempDb::new().expect("Failed to create temporary database");
316        let url = db.database().url();
317
318        // Generate test transactions to ingest.
319        let checkpoint_dir = tempdir().unwrap();
320        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
321            ingestion_dir: checkpoint_dir.path().to_owned(),
322            starting_checkpoint: 0,
323            num_checkpoints: 10,
324            checkpoint_size: 2,
325        })
326        .await;
327
328        let reader = Db::for_read(url.clone(), DbArgs::default()).await.unwrap();
329        let writer = Db::for_write(url.clone(), DbArgs::default()).await.unwrap();
330
331        {
332            // Create the table we are going to write to. We have to do this manually, because this
333            // table is not handled by migrations.
334            let mut conn = writer.connect().await.unwrap();
335            diesel::sql_query(
336                r#"
337                CREATE TABLE tx_counts (
338                    cp_sequence_number  BIGINT PRIMARY KEY,
339                    count               BIGINT NOT NULL
340                )
341                "#,
342            )
343            .execute(&mut conn)
344            .await
345            .unwrap();
346        }
347
348        let metrics_address =
349            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), get_available_port());
350
351        let args = Args {
352            client_args: Some(ClientArgs {
353                local_ingestion_path: Some(checkpoint_dir.path().to_owned()),
354                remote_store_url: None,
355                rpc_api_url: None,
356                rpc_username: None,
357                rpc_password: None,
358            }),
359            indexer_args: IndexerArgs {
360                first_checkpoint: Some(0),
361                last_checkpoint: Some(9),
362                ..Default::default()
363            },
364            metrics_args: MetricsArgs { metrics_address },
365        };
366
367        let mut indexer = IndexerCluster::builder()
368            .with_database_url(url.clone())
369            .with_args(args)
370            .build()
371            .await
372            .unwrap();
373
374        indexer
375            .concurrent_pipeline(TxCounts, ConcurrentConfig::default())
376            .await
377            .unwrap();
378
379        let metrics = indexer.metrics().clone();
380
381        // Run the indexer until it signals completion. We have configured it to stop after
382        // ingesting 10 checkpoints, so it should shut itself down.
383        indexer.run().await.unwrap().await.unwrap();
384
385        // Check that the results were all written out.
386        {
387            let mut conn = reader.connect().await.unwrap();
388            let counts: Vec<StoredTxCount> = tx_counts::table
389                .order_by(tx_counts::cp_sequence_number)
390                .load(&mut conn)
391                .await
392                .unwrap();
393
394            assert_eq!(counts.len(), 10);
395            for (i, count) in counts.iter().enumerate() {
396                assert_eq!(count.cp_sequence_number, i as i64);
397                assert_eq!(count.count, 2);
398            }
399        }
400
401        // Check that metrics were updated.
402        assert_eq!(metrics.total_ingested_checkpoints.get(), 10);
403        assert_eq!(metrics.total_ingested_transactions.get(), 20);
404        assert_eq!(metrics.latest_ingested_checkpoint.get(), 9);
405
406        macro_rules! assert_pipeline_metric {
407            ($name:ident, $value:expr) => {
408                assert_eq!(
409                    metrics
410                        .$name
411                        .get_metric_with_label_values(&["tx_counts"])
412                        .unwrap()
413                        .get(),
414                    $value
415                );
416            };
417        }
418
419        assert_pipeline_metric!(total_handler_checkpoints_received, 10);
420        assert_pipeline_metric!(total_handler_checkpoints_processed, 10);
421        assert_pipeline_metric!(total_handler_rows_created, 10);
422        assert_pipeline_metric!(latest_processed_checkpoint, 9);
423        assert_pipeline_metric!(total_collector_checkpoints_received, 10);
424        assert_pipeline_metric!(total_collector_rows_received, 10);
425        assert_pipeline_metric!(latest_collected_checkpoint, 9);
426
427        // The watermark checkpoint is inclusive, but the transaction is exclusive
428        assert_pipeline_metric!(watermark_checkpoint, 9);
429        assert_pipeline_metric!(watermark_checkpoint_in_db, 9);
430        assert_pipeline_metric!(watermark_transaction, 20);
431        assert_pipeline_metric!(watermark_transaction_in_db, 20);
432    }
433
434    #[test]
435    fn test_individual_methods_override_bundled_args() {
436        let builder = IndexerClusterBuilder::new()
437            .with_args(Args {
438                indexer_args: IndexerArgs {
439                    first_checkpoint: Some(100),
440                    ..Default::default()
441                },
442                client_args: Some(ClientArgs {
443                    local_ingestion_path: Some("/bundled".into()),
444                    ..Default::default()
445                }),
446                metrics_args: MetricsArgs {
447                    metrics_address: "127.0.0.1:8080".parse().unwrap(),
448                },
449            })
450            .with_indexer_args(IndexerArgs {
451                first_checkpoint: Some(200),
452                ..Default::default()
453            })
454            .with_client_args(ClientArgs {
455                local_ingestion_path: Some("/individual".into()),
456                ..Default::default()
457            })
458            .with_metrics_args(MetricsArgs {
459                metrics_address: "127.0.0.1:9090".parse().unwrap(),
460            });
461
462        assert_eq!(builder.args.indexer_args.first_checkpoint, Some(200));
463        assert_eq!(
464            builder
465                .args
466                .client_args
467                .unwrap()
468                .local_ingestion_path
469                .unwrap()
470                .to_string_lossy(),
471            "/individual"
472        );
473        assert_eq!(
474            builder.args.metrics_args.metrics_address.to_string(),
475            "127.0.0.1:9090"
476        );
477    }
478
479    #[test]
480    fn test_bundled_args_override_individual_methods() {
481        let builder = IndexerClusterBuilder::new()
482            .with_indexer_args(IndexerArgs {
483                first_checkpoint: Some(200),
484                ..Default::default()
485            })
486            .with_client_args(ClientArgs {
487                local_ingestion_path: Some("/individual".into()),
488                ..Default::default()
489            })
490            .with_metrics_args(MetricsArgs {
491                metrics_address: "127.0.0.1:9090".parse().unwrap(),
492            })
493            .with_args(Args {
494                indexer_args: IndexerArgs {
495                    first_checkpoint: Some(100),
496                    ..Default::default()
497                },
498                client_args: Some(ClientArgs {
499                    local_ingestion_path: Some("/bundled".into()),
500                    ..Default::default()
501                }),
502                metrics_args: MetricsArgs {
503                    metrics_address: "127.0.0.1:8080".parse().unwrap(),
504                },
505            });
506
507        assert_eq!(builder.args.indexer_args.first_checkpoint, Some(100));
508        assert_eq!(
509            builder
510                .args
511                .client_args
512                .unwrap()
513                .local_ingestion_path
514                .unwrap()
515                .to_string_lossy(),
516            "/bundled"
517        );
518        assert_eq!(
519            builder.args.metrics_args.metrics_address.to_string(),
520            "127.0.0.1:8080"
521        );
522    }
523}