sui_indexer_alt_framework/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    ops::{Deref, DerefMut},
6    sync::Arc,
7};
8
9use anyhow::Context;
10use diesel_migrations::EmbeddedMigrations;
11use prometheus::Registry;
12use sui_indexer_alt_metrics::{MetricsArgs, MetricsService};
13use tokio::{signal, task::JoinHandle};
14use tokio_util::sync::CancellationToken;
15use tracing::info;
16use url::Url;
17
18use crate::{
19    Indexer, IndexerArgs, Result,
20    ingestion::{ClientArgs, IngestionConfig},
21    metrics::{IndexerMetrics, IngestionMetrics},
22    postgres::{Db, DbArgs},
23};
24
25/// Bundle of arguments for setting up an indexer cluster (an Indexer and its associated Metrics
26/// service). This struct is offered as a convenience for the common case of parsing command-line
27/// arguments for a binary running a standalone indexer and its metrics service.
28#[derive(clap::Parser, Debug, Default)]
29pub struct Args {
30    /// What to index and in what time range.
31    #[clap(flatten)]
32    pub indexer_args: IndexerArgs,
33
34    /// Where to get checkpoint data from.
35    #[clap(flatten)]
36    pub client_args: Option<ClientArgs>,
37
38    /// How to expose metrics.
39    #[clap(flatten)]
40    pub metrics_args: MetricsArgs,
41}
42
43/// An opinionated [IndexerCluster] that spins up an [Indexer] implementation using Postgres as its
44/// store, along with a [MetricsService] and a tracing subscriber (outputting to stderr) to provide
45/// observability. It is a useful starting point for an indexer binary.
46pub struct IndexerCluster {
47    indexer: Indexer<Db>,
48    metrics: MetricsService,
49
50    /// Cancelling this token signals cancellation to both the indexer and metrics service.
51    cancel: CancellationToken,
52}
53
54/// Builder for creating an IndexerCluster with a fluent API
55#[derive(Default)]
56pub struct IndexerClusterBuilder {
57    database_url: Option<Url>,
58    db_args: DbArgs,
59    args: Args,
60    ingestion_config: IngestionConfig,
61    migrations: Option<&'static EmbeddedMigrations>,
62    metrics_prefix: Option<String>,
63}
64
65impl IndexerClusterBuilder {
66    /// Create a new builder instance
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Set the PostgreSQL database connection URL (required).
72    ///
73    /// This should be a valid PostgreSQL connection urls, e.g.:
74    /// - `postgres://user:password@host:5432/mydb`
75    pub fn with_database_url(mut self, url: Url) -> Self {
76        self.database_url = Some(url);
77        self
78    }
79
80    /// Configure database connection parameters such as pool size, connection timeout, etc.
81    ///
82    /// Defaults to [`DbArgs::default()`] if not specified, which provides reasonable defaults
83    /// for most use cases.
84    pub fn with_db_args(mut self, args: DbArgs) -> Self {
85        self.db_args = args;
86        self
87    }
88
89    /// Set the main indexer cluster's configuration arguments (required).
90    ///
91    /// This bundles all configuration needed for the indexer:
92    /// - `IndexerArgs`: Controls what to index (checkpoint range, which pipelines to run, watermark behavior)
93    /// - `ClientArgs`: Specifies where to fetch checkpoint data from (remote store, local path, or RPC)
94    /// - `MetricsArgs`: Configures how to expose Prometheus metrics (address to serve on)
95    ///
96    /// This overwrites any previously set args.
97    pub fn with_args(mut self, args: Args) -> Self {
98        self.args = args;
99        self
100    }
101
102    /// Set indexer arguments (what to index and in what time range).
103    /// This overwrites any previously set indexer args.
104    pub fn with_indexer_args(mut self, args: IndexerArgs) -> Self {
105        self.args.indexer_args = args;
106        self
107    }
108
109    /// Set client arguments (where to get checkpoint data from).
110    /// This overwrites any previously set client args.
111    pub fn with_client_args(mut self, args: ClientArgs) -> Self {
112        self.args.client_args = Some(args);
113        self
114    }
115
116    /// Set metrics arguments (how to expose metrics).
117    /// This overwrites any previously set metrics args.
118    pub fn with_metrics_args(mut self, args: MetricsArgs) -> Self {
119        self.args.metrics_args = args;
120        self
121    }
122
123    /// Set the ingestion configuration, which controls how the ingestion service is
124    /// set-up (its concurrency, polling, intervals, etc).
125    pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
126        self.ingestion_config = config;
127        self
128    }
129
130    /// Set database migrations to run.
131    ///
132    /// See the [Diesel migration guide](https://diesel.rs/guides/migration_guide.html) for more information.
133    pub fn with_migrations(mut self, migrations: &'static EmbeddedMigrations) -> Self {
134        self.migrations = Some(migrations);
135        self
136    }
137
138    /// Add a custom prefix to all metrics reported by this indexer instance.
139    pub fn with_metrics_prefix(mut self, label: impl Into<String>) -> Self {
140        self.metrics_prefix = Some(label.into());
141        self
142    }
143
144    /// Build the IndexerCluster instance.
145    ///
146    /// Returns an error if:
147    /// - Required fields are missing
148    /// - Database connection cannot be established
149    /// - Metrics registry creation fails
150    pub async fn build(self) -> Result<IndexerCluster> {
151        let database_url = self.database_url.context("database_url is required")?;
152
153        tracing_subscriber::fmt::init();
154
155        let cancel = CancellationToken::new();
156        let registry = Registry::new();
157        let metrics = MetricsService::new(self.args.metrics_args, registry, cancel.child_token());
158        let client_args = self.args.client_args.context("client_args is required")?;
159
160        let indexer = Indexer::new_from_pg(
161            database_url,
162            self.db_args,
163            self.args.indexer_args,
164            client_args,
165            self.ingestion_config,
166            self.migrations,
167            self.metrics_prefix.as_deref(),
168            metrics.registry(),
169            cancel.child_token(),
170        )
171        .await?;
172
173        Ok(IndexerCluster {
174            indexer,
175            metrics,
176            cancel,
177        })
178    }
179}
180
181impl IndexerCluster {
182    /// Create a new builder for constructing an IndexerCluster.
183    pub fn builder() -> IndexerClusterBuilder {
184        IndexerClusterBuilder::new()
185    }
186
187    /// Access to the indexer's metrics. This can be cloned before a call to [Self::run], to retain
188    /// shared access to the underlying metrics.
189    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
190        self.indexer.indexer_metrics()
191    }
192
193    /// Access to the ingestion service's metrics. This can be cloned before a call to [Self::run],
194    /// to retain shared access to the underlying metrics.
195    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
196        self.indexer.ingestion_metrics()
197    }
198
199    /// This token controls stopping the indexer and metrics service. Clone it before calling
200    /// [Self::run] to retain the ability to stop the service after it has started.
201    pub fn cancel(&self) -> &CancellationToken {
202        &self.cancel
203    }
204
205    /// Starts the indexer and metrics service, returning a handle to `await` the service's exit.
206    /// The service will exit when the indexer has finished processing all the checkpoints it was
207    /// configured to process, or when it receives an interrupt signal.
208    pub async fn run(self) -> Result<JoinHandle<()>> {
209        let h_ctrl_c = tokio::spawn({
210            let cancel = self.cancel.clone();
211            async move {
212                tokio::select! {
213                    _ = cancel.cancelled() => {}
214                    _ = signal::ctrl_c() => {
215                        info!("Received Ctrl-C, shutting down...");
216                        cancel.cancel();
217                    }
218                }
219            }
220        });
221
222        let h_metrics = self.metrics.run().await?;
223        let h_indexer = self.indexer.run().await?;
224
225        Ok(tokio::spawn(async move {
226            let _ = h_indexer.await;
227            self.cancel.cancel();
228            let _ = h_metrics.await;
229            let _ = h_ctrl_c.await;
230        }))
231    }
232}
233
234impl Deref for IndexerCluster {
235    type Target = Indexer<Db>;
236
237    fn deref(&self) -> &Self::Target {
238        &self.indexer
239    }
240}
241
242impl DerefMut for IndexerCluster {
243    fn deref_mut(&mut self) -> &mut Self::Target {
244        &mut self.indexer
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
251
252    use async_trait::async_trait;
253    use diesel::{Insertable, QueryDsl, Queryable};
254    use diesel_async::RunQueryDsl;
255    use sui_synthetic_ingestion::synthetic_ingestion;
256    use tempfile::tempdir;
257
258    use crate::FieldCount;
259    use crate::ingestion::ClientArgs;
260    use crate::ingestion::ingestion_client::IngestionClientArgs;
261    use crate::pipeline::Processor;
262    use crate::pipeline::concurrent::ConcurrentConfig;
263    use crate::postgres::{
264        Connection, Db, DbArgs,
265        temp::{TempDb, get_available_port},
266    };
267    use crate::types::full_checkpoint_content::Checkpoint;
268
269    use super::*;
270
271    diesel::table! {
272        /// Table for storing transaction counts per checkpoint.
273        tx_counts (cp_sequence_number) {
274            cp_sequence_number -> BigInt,
275            count -> BigInt,
276        }
277    }
278
279    #[derive(Insertable, Queryable, FieldCount)]
280    #[diesel(table_name = tx_counts)]
281    struct StoredTxCount {
282        cp_sequence_number: i64,
283        count: i64,
284    }
285
286    /// Test concurrent pipeline for populating [tx_counts].
287    struct TxCounts;
288
289    #[async_trait]
290    impl Processor for TxCounts {
291        const NAME: &'static str = "tx_counts";
292        type Value = StoredTxCount;
293
294        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
295            Ok(vec![StoredTxCount {
296                cp_sequence_number: checkpoint.summary.sequence_number as i64,
297                count: checkpoint.transactions.len() as i64,
298            }])
299        }
300    }
301
302    #[async_trait]
303    impl crate::postgres::handler::Handler for TxCounts {
304        async fn commit<'a>(
305            values: &[Self::Value],
306            conn: &mut Connection<'a>,
307        ) -> anyhow::Result<usize> {
308            Ok(diesel::insert_into(tx_counts::table)
309                .values(values)
310                .on_conflict_do_nothing()
311                .execute(conn)
312                .await?)
313        }
314    }
315
316    #[tokio::test]
317    async fn test_indexer_cluster() {
318        let db = TempDb::new().expect("Failed to create temporary database");
319        let url = db.database().url();
320
321        // Generate test transactions to ingest.
322        let checkpoint_dir = tempdir().unwrap();
323        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
324            ingestion_dir: checkpoint_dir.path().to_owned(),
325            starting_checkpoint: 0,
326            num_checkpoints: 10,
327            checkpoint_size: 2,
328        })
329        .await;
330
331        let reader = Db::for_read(url.clone(), DbArgs::default()).await.unwrap();
332        let writer = Db::for_write(url.clone(), DbArgs::default()).await.unwrap();
333
334        {
335            // Create the table we are going to write to. We have to do this manually, because this
336            // table is not handled by migrations.
337            let mut conn = writer.connect().await.unwrap();
338            diesel::sql_query(
339                r#"
340                CREATE TABLE tx_counts (
341                    cp_sequence_number  BIGINT PRIMARY KEY,
342                    count               BIGINT NOT NULL
343                )
344                "#,
345            )
346            .execute(&mut conn)
347            .await
348            .unwrap();
349        }
350
351        let metrics_address =
352            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), get_available_port());
353
354        let args = Args {
355            client_args: Some(ClientArgs {
356                ingestion: IngestionClientArgs {
357                    local_ingestion_path: Some(checkpoint_dir.path().to_owned()),
358                    ..Default::default()
359                },
360                ..Default::default()
361            }),
362            indexer_args: IndexerArgs {
363                first_checkpoint: Some(0),
364                last_checkpoint: Some(9),
365                ..Default::default()
366            },
367            metrics_args: MetricsArgs { metrics_address },
368        };
369
370        let mut indexer = IndexerCluster::builder()
371            .with_database_url(url.clone())
372            .with_args(args)
373            .build()
374            .await
375            .unwrap();
376
377        indexer
378            .concurrent_pipeline(TxCounts, ConcurrentConfig::default())
379            .await
380            .unwrap();
381
382        let ingestion_metrics = indexer.ingestion_metrics().clone();
383        let indexer_metrics = indexer.indexer_metrics().clone();
384
385        // Run the indexer until it signals completion. We have configured it to stop after
386        // ingesting 10 checkpoints, so it should shut itself down.
387        indexer.run().await.unwrap().await.unwrap();
388
389        // Check that the results were all written out.
390        {
391            let mut conn = reader.connect().await.unwrap();
392            let counts: Vec<StoredTxCount> = tx_counts::table
393                .order_by(tx_counts::cp_sequence_number)
394                .load(&mut conn)
395                .await
396                .unwrap();
397
398            assert_eq!(counts.len(), 10);
399            for (i, count) in counts.iter().enumerate() {
400                assert_eq!(count.cp_sequence_number, i as i64);
401                assert_eq!(count.count, 2);
402            }
403        }
404
405        // Check that ingestion metrics were updated.
406        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 10);
407        assert_eq!(ingestion_metrics.total_ingested_transactions.get(), 20);
408        assert_eq!(ingestion_metrics.latest_ingested_checkpoint.get(), 9);
409
410        macro_rules! assert_pipeline_metric {
411            ($name:ident, $value:expr) => {
412                assert_eq!(
413                    indexer_metrics
414                        .$name
415                        .get_metric_with_label_values(&["tx_counts"])
416                        .unwrap()
417                        .get(),
418                    $value
419                );
420            };
421        }
422
423        assert_pipeline_metric!(total_handler_checkpoints_received, 10);
424        assert_pipeline_metric!(total_handler_checkpoints_processed, 10);
425        assert_pipeline_metric!(total_handler_rows_created, 10);
426        assert_pipeline_metric!(latest_processed_checkpoint, 9);
427        assert_pipeline_metric!(total_collector_checkpoints_received, 10);
428        assert_pipeline_metric!(total_collector_rows_received, 10);
429        assert_pipeline_metric!(latest_collected_checkpoint, 9);
430
431        // The watermark checkpoint is inclusive, but the transaction is exclusive
432        assert_pipeline_metric!(watermark_checkpoint, 9);
433        assert_pipeline_metric!(watermark_checkpoint_in_db, 9);
434        assert_pipeline_metric!(watermark_transaction, 20);
435        assert_pipeline_metric!(watermark_transaction_in_db, 20);
436    }
437
438    #[test]
439    fn test_individual_methods_override_bundled_args() {
440        let builder = IndexerClusterBuilder::new()
441            .with_args(Args {
442                indexer_args: IndexerArgs {
443                    first_checkpoint: Some(100),
444                    ..Default::default()
445                },
446                client_args: Some(ClientArgs {
447                    ingestion: IngestionClientArgs {
448                        local_ingestion_path: Some("/bundled".into()),
449                        ..Default::default()
450                    },
451                    ..Default::default()
452                }),
453                metrics_args: MetricsArgs {
454                    metrics_address: "127.0.0.1:8080".parse().unwrap(),
455                },
456            })
457            .with_indexer_args(IndexerArgs {
458                first_checkpoint: Some(200),
459                ..Default::default()
460            })
461            .with_client_args(ClientArgs {
462                ingestion: IngestionClientArgs {
463                    local_ingestion_path: Some("/individual".into()),
464                    ..Default::default()
465                },
466                ..Default::default()
467            })
468            .with_metrics_args(MetricsArgs {
469                metrics_address: "127.0.0.1:9090".parse().unwrap(),
470            });
471
472        assert_eq!(builder.args.indexer_args.first_checkpoint, Some(200));
473        assert_eq!(
474            builder
475                .args
476                .client_args
477                .unwrap()
478                .ingestion
479                .local_ingestion_path
480                .unwrap()
481                .to_string_lossy(),
482            "/individual"
483        );
484        assert_eq!(
485            builder.args.metrics_args.metrics_address.to_string(),
486            "127.0.0.1:9090"
487        );
488    }
489
490    #[test]
491    fn test_bundled_args_override_individual_methods() {
492        let builder = IndexerClusterBuilder::new()
493            .with_indexer_args(IndexerArgs {
494                first_checkpoint: Some(200),
495                ..Default::default()
496            })
497            .with_client_args(ClientArgs {
498                ingestion: IngestionClientArgs {
499                    local_ingestion_path: Some("/individual".into()),
500                    ..Default::default()
501                },
502                ..Default::default()
503            })
504            .with_metrics_args(MetricsArgs {
505                metrics_address: "127.0.0.1:9090".parse().unwrap(),
506            })
507            .with_args(Args {
508                indexer_args: IndexerArgs {
509                    first_checkpoint: Some(100),
510                    ..Default::default()
511                },
512                client_args: Some(ClientArgs {
513                    ingestion: IngestionClientArgs {
514                        local_ingestion_path: Some("/bundled".into()),
515                        ..Default::default()
516                    },
517                    ..Default::default()
518                }),
519                metrics_args: MetricsArgs {
520                    metrics_address: "127.0.0.1:8080".parse().unwrap(),
521                },
522            });
523
524        assert_eq!(builder.args.indexer_args.first_checkpoint, Some(100));
525        assert_eq!(
526            builder
527                .args
528                .client_args
529                .unwrap()
530                .ingestion
531                .local_ingestion_path
532                .unwrap()
533                .to_string_lossy(),
534            "/bundled"
535        );
536        assert_eq!(
537            builder.args.metrics_args.metrics_address.to_string(),
538            "127.0.0.1:8080"
539        );
540    }
541}