sui_indexer_alt_framework/postgres/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::{Context, Result};
5use diesel_migrations::EmbeddedMigrations;
6use prometheus::Registry;
7use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
8use sui_pg_db::temp::TempDb;
9use tempfile::tempdir;
10use tokio_util::sync::CancellationToken;
11use url::Url;
12
13use crate::{
14    Indexer, IndexerArgs,
15    ingestion::{ClientArgs, IngestionConfig},
16};
17
18pub use sui_pg_db::*;
19
20pub mod handler;
21
22/// An opinionated indexer implementation that uses a Postgres database as the store.
23impl Indexer<Db> {
24    /// Create a new instance of the indexer framework. `database_url`, `db_args`, `indexer_args,`,
25    /// `client_args`, and `ingestion_config` contain configurations for the following,
26    /// respectively:
27    ///
28    /// - Connecting to the database,
29    /// - What is indexed (which checkpoints, which pipelines, whether to update the watermarks
30    ///   table) and where to serve metrics from,
31    /// - Where to download checkpoints from,
32    /// - Concurrency and buffering parameters for downloading checkpoints.
33    ///
34    /// Optional `migrations` contains the SQL to run in order to bring the database schema up-to-date for
35    /// the specific instance of the indexer, generated using diesel's `embed_migrations!` macro.
36    /// These migrations will be run as part of initializing the indexer if provided.
37    ///
38    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
39    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
40    pub async fn new_from_pg(
41        database_url: Url,
42        db_args: DbArgs,
43        indexer_args: IndexerArgs,
44        client_args: ClientArgs,
45        ingestion_config: IngestionConfig,
46        migrations: Option<&'static EmbeddedMigrations>,
47        metrics_prefix: Option<&str>,
48        registry: &Registry,
49        cancel: CancellationToken,
50    ) -> Result<Self> {
51        let store = Db::for_write(database_url, db_args) // I guess our store needs a constructor fn
52            .await
53            .context("Failed to connect to database")?;
54
55        // At indexer initialization, we ensure that the DB schema is up-to-date.
56        store
57            .run_migrations(migrations)
58            .await
59            .context("Failed to run pending migrations")?;
60
61        registry.register(Box::new(DbConnectionStatsCollector::new(
62            Some("indexer_db"),
63            store.clone(),
64        )))?;
65
66        Indexer::new(
67            store,
68            indexer_args,
69            client_args,
70            ingestion_config,
71            metrics_prefix,
72            registry,
73            cancel,
74        )
75        .await
76    }
77
78    /// Create a new temporary database and runs provided migrations in tandem with the migrations
79    /// necessary to support watermark operations on the indexer. The indexer is then instantiated
80    /// and returned along with the temporary database.
81    pub async fn new_for_testing(migrations: &'static EmbeddedMigrations) -> (Indexer<Db>, TempDb) {
82        let temp_db = TempDb::new().unwrap();
83        let store = Db::for_write(temp_db.database().url().clone(), DbArgs::default())
84            .await
85            .unwrap();
86        store.run_migrations(Some(migrations)).await.unwrap();
87
88        let indexer = Indexer::new(
89            store,
90            IndexerArgs::default(),
91            ClientArgs {
92                local_ingestion_path: Some(tempdir().unwrap().keep()),
93                ..Default::default()
94            },
95            IngestionConfig::default(),
96            None,
97            &Registry::new(),
98            CancellationToken::new(),
99        )
100        .await
101        .unwrap();
102        (indexer, temp_db)
103    }
104}
105
106#[cfg(test)]
107pub mod tests {
108
109    use async_trait::async_trait;
110    use std::sync::Arc;
111    use sui_indexer_alt_framework_store_traits::{CommitterWatermark, Connection as _};
112    use sui_types::full_checkpoint_content::Checkpoint;
113
114    use super::*;
115
116    use crate::{ConcurrentConfig, pipeline::Processor};
117
118    #[derive(FieldCount)]
119    struct V {
120        _v: u64,
121    }
122
123    macro_rules! define_test_concurrent_pipeline {
124        ($name:ident) => {
125            struct $name;
126            #[async_trait]
127            impl Processor for $name {
128                const NAME: &'static str = stringify!($name);
129                type Value = V;
130                async fn process(
131                    &self,
132                    _checkpoint: &Arc<Checkpoint>,
133                ) -> anyhow::Result<Vec<Self::Value>> {
134                    todo!()
135                }
136            }
137
138            #[async_trait]
139            impl handler::Handler for $name {
140                async fn commit<'a>(
141                    _values: &[Self::Value],
142                    _conn: &mut Connection<'a>,
143                ) -> anyhow::Result<usize> {
144                    todo!()
145                }
146            }
147        };
148    }
149
150    define_test_concurrent_pipeline!(ConcurrentPipeline1);
151    define_test_concurrent_pipeline!(ConcurrentPipeline2);
152
153    #[tokio::test]
154    async fn test_add_new_pipeline() {
155        let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
156        indexer
157            .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
158            .await
159            .unwrap();
160        assert_eq!(indexer.first_checkpoint_from_watermark, 0);
161    }
162
163    #[tokio::test]
164    async fn test_add_existing_pipeline() {
165        let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
166        {
167            let watermark = CommitterWatermark::new_for_testing(10);
168            let mut conn = indexer.store().connect().await.unwrap();
169            assert!(
170                conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark)
171                    .await
172                    .unwrap()
173            );
174        }
175        indexer
176            .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
177            .await
178            .unwrap();
179        assert_eq!(indexer.first_checkpoint_from_watermark, 11);
180    }
181
182    #[tokio::test]
183    async fn test_add_multiple_pipelines() {
184        let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
185        {
186            let watermark1 = CommitterWatermark::new_for_testing(10);
187            let mut conn = indexer.store().connect().await.unwrap();
188            assert!(
189                conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark1)
190                    .await
191                    .unwrap()
192            );
193            let watermark2 = CommitterWatermark::new_for_testing(20);
194            assert!(
195                conn.set_committer_watermark(ConcurrentPipeline2::NAME, watermark2)
196                    .await
197                    .unwrap()
198            );
199        }
200
201        indexer
202            .concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default())
203            .await
204            .unwrap();
205        assert_eq!(indexer.first_checkpoint_from_watermark, 21);
206        indexer
207            .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
208            .await
209            .unwrap();
210        assert_eq!(indexer.first_checkpoint_from_watermark, 11);
211    }
212}