sui_indexer_alt_framework/
postgres.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::{Context, Result};
5use diesel_migrations::EmbeddedMigrations;
6use prometheus::Registry;
7use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
8use sui_pg_db::temp::TempDb;
9use tempfile::tempdir;
10use tokio_util::sync::CancellationToken;
11use url::Url;
12
13use crate::{
14    Indexer, IndexerArgs,
15    ingestion::{ClientArgs, IngestionConfig},
16};
17
18pub use sui_pg_db::*;
19
20/// An opinionated indexer implementation that uses a Postgres database as the store.
21impl Indexer<Db> {
22    /// Create a new instance of the indexer framework. `database_url`, `db_args`, `indexer_args,`,
23    /// `client_args`, and `ingestion_config` contain configurations for the following,
24    /// respectively:
25    ///
26    /// - Connecting to the database,
27    /// - What is indexed (which checkpoints, which pipelines, whether to update the watermarks
28    ///   table) and where to serve metrics from,
29    /// - Where to download checkpoints from,
30    /// - Concurrency and buffering parameters for downloading checkpoints.
31    ///
32    /// Optional `migrations` contains the SQL to run in order to bring the database schema up-to-date for
33    /// the specific instance of the indexer, generated using diesel's `embed_migrations!` macro.
34    /// These migrations will be run as part of initializing the indexer if provided.
35    ///
36    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
37    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
38    pub async fn new_from_pg(
39        database_url: Url,
40        db_args: DbArgs,
41        indexer_args: IndexerArgs,
42        client_args: ClientArgs,
43        ingestion_config: IngestionConfig,
44        migrations: Option<&'static EmbeddedMigrations>,
45        metrics_prefix: Option<&str>,
46        registry: &Registry,
47        cancel: CancellationToken,
48    ) -> Result<Self> {
49        let store = Db::for_write(database_url, db_args) // I guess our store needs a constructor fn
50            .await
51            .context("Failed to connect to database")?;
52
53        // At indexer initialization, we ensure that the DB schema is up-to-date.
54        store
55            .run_migrations(migrations)
56            .await
57            .context("Failed to run pending migrations")?;
58
59        registry.register(Box::new(DbConnectionStatsCollector::new(
60            Some("indexer_db"),
61            store.clone(),
62        )))?;
63
64        Indexer::new(
65            store,
66            indexer_args,
67            client_args,
68            ingestion_config,
69            metrics_prefix,
70            registry,
71            cancel,
72        )
73        .await
74    }
75
76    /// Create a new temporary database and runs provided migrations in tandem with the migrations
77    /// necessary to support watermark operations on the indexer. The indexer is then instantiated
78    /// and returned along with the temporary database.
79    pub async fn new_for_testing(migrations: &'static EmbeddedMigrations) -> (Indexer<Db>, TempDb) {
80        let temp_db = TempDb::new().unwrap();
81        let store = Db::for_write(temp_db.database().url().clone(), DbArgs::default())
82            .await
83            .unwrap();
84        store.run_migrations(Some(migrations)).await.unwrap();
85
86        let indexer = Indexer::new(
87            store,
88            IndexerArgs::default(),
89            ClientArgs {
90                remote_store_url: None,
91                local_ingestion_path: Some(tempdir().unwrap().keep()),
92                rpc_api_url: None,
93                rpc_username: None,
94                rpc_password: None,
95            },
96            IngestionConfig::default(),
97            None,
98            &Registry::new(),
99            CancellationToken::new(),
100        )
101        .await
102        .unwrap();
103        (indexer, temp_db)
104    }
105}
106
107#[cfg(test)]
108pub mod tests {
109
110    use async_trait::async_trait;
111    use std::sync::Arc;
112    use sui_indexer_alt_framework_store_traits::{CommitterWatermark, Store};
113    use sui_types::full_checkpoint_content::CheckpointData;
114
115    use super::*;
116
117    use crate::pipeline::concurrent;
118    use crate::{ConcurrentConfig, FieldCount, pipeline::Processor, store::Connection};
119
120    #[derive(FieldCount)]
121    struct V {
122        _v: u64,
123    }
124
125    macro_rules! define_test_concurrent_pipeline {
126        ($name:ident) => {
127            struct $name;
128            #[async_trait]
129            impl Processor for $name {
130                const NAME: &'static str = stringify!($name);
131                type Value = V;
132                async fn process(
133                    &self,
134                    _checkpoint: &Arc<CheckpointData>,
135                ) -> anyhow::Result<Vec<Self::Value>> {
136                    todo!()
137                }
138            }
139
140            #[async_trait]
141            impl concurrent::Handler for $name {
142                type Store = Db;
143
144                async fn commit<'a>(
145                    _values: &[Self::Value],
146                    _conn: &mut <Self::Store as Store>::Connection<'a>,
147                ) -> anyhow::Result<usize> {
148                    todo!()
149                }
150            }
151        };
152    }
153
154    define_test_concurrent_pipeline!(ConcurrentPipeline1);
155    define_test_concurrent_pipeline!(ConcurrentPipeline2);
156
157    #[tokio::test]
158    async fn test_add_new_pipeline() {
159        let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
160        indexer
161            .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
162            .await
163            .unwrap();
164        assert_eq!(indexer.first_checkpoint_from_watermark, 0);
165    }
166
167    #[tokio::test]
168    async fn test_add_existing_pipeline() {
169        let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
170        {
171            let watermark = CommitterWatermark::new_for_testing(10);
172            let mut conn = indexer.store().connect().await.unwrap();
173            assert!(
174                conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark)
175                    .await
176                    .unwrap()
177            );
178        }
179        indexer
180            .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
181            .await
182            .unwrap();
183        assert_eq!(indexer.first_checkpoint_from_watermark, 11);
184    }
185
186    #[tokio::test]
187    async fn test_add_multiple_pipelines() {
188        let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
189        {
190            let watermark1 = CommitterWatermark::new_for_testing(10);
191            let mut conn = indexer.store().connect().await.unwrap();
192            assert!(
193                conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark1)
194                    .await
195                    .unwrap()
196            );
197            let watermark2 = CommitterWatermark::new_for_testing(20);
198            assert!(
199                conn.set_committer_watermark(ConcurrentPipeline2::NAME, watermark2)
200                    .await
201                    .unwrap()
202            );
203        }
204
205        indexer
206            .concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default())
207            .await
208            .unwrap();
209        assert_eq!(indexer.first_checkpoint_from_watermark, 21);
210        indexer
211            .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
212            .await
213            .unwrap();
214        assert_eq!(indexer.first_checkpoint_from_watermark, 11);
215    }
216}