sui_indexer_alt_framework/postgres/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Context;
5use anyhow::Result;
6use diesel_migrations::EmbeddedMigrations;
7use prometheus::Registry;
8use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
9use sui_pg_db::temp::TempDb;
10use tempfile::tempdir;
11use url::Url;
12
13use crate::Indexer;
14use crate::IndexerArgs;
15use crate::ingestion::ClientArgs;
16use crate::ingestion::IngestionConfig;
17use crate::ingestion::ingestion_client::IngestionClientArgs;
18
19pub use sui_pg_db::*;
20
21pub mod handler;
22
23/// An opinionated indexer implementation that uses a Postgres database as the store.
24impl Indexer<Db> {
25    /// Create a new instance of the indexer framework. `database_url`, `db_args`, `indexer_args,`,
26    /// `client_args`, and `ingestion_config` contain configurations for the following,
27    /// respectively:
28    ///
29    /// - Connecting to the database,
30    /// - What is indexed (which checkpoints, which pipelines, whether to update the watermarks
31    ///   table) and where to serve metrics from,
32    /// - Where to download checkpoints from,
33    /// - Concurrency and buffering parameters for downloading checkpoints.
34    ///
35    /// Optional `migrations` contains the SQL to run in order to bring the database schema up-to-date for
36    /// the specific instance of the indexer, generated using diesel's `embed_migrations!` macro.
37    /// These migrations will be run as part of initializing the indexer if provided.
38    ///
39    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
40    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
41    pub async fn new_from_pg(
42        database_url: Url,
43        db_args: DbArgs,
44        indexer_args: IndexerArgs,
45        client_args: ClientArgs,
46        ingestion_config: IngestionConfig,
47        migrations: Option<&'static EmbeddedMigrations>,
48        metrics_prefix: Option<&str>,
49        registry: &Registry,
50    ) -> Result<Self> {
51        let store = Db::for_write(database_url, db_args) // I guess our store needs a constructor fn
52            .await
53            .context("Failed to connect to database")?;
54
55        // At indexer initialization, we ensure that the DB schema is up-to-date.
56        store
57            .run_migrations(migrations)
58            .await
59            .context("Failed to run pending migrations")?;
60
61        registry.register(Box::new(DbConnectionStatsCollector::new(
62            Some("indexer_db"),
63            store.clone(),
64        )))?;
65
66        Indexer::new(
67            store,
68            indexer_args,
69            client_args,
70            ingestion_config,
71            metrics_prefix,
72            registry,
73        )
74        .await
75    }
76
77    /// Create a new temporary database and runs provided migrations in tandem with the migrations
78    /// necessary to support watermark operations on the indexer. The indexer is then instantiated
79    /// and returned along with the temporary database.
80    pub async fn new_for_testing(migrations: &'static EmbeddedMigrations) -> (Indexer<Db>, TempDb) {
81        let temp_db = TempDb::new().unwrap();
82        let store = Db::for_write(temp_db.database().url().clone(), DbArgs::default())
83            .await
84            .unwrap();
85        store.run_migrations(Some(migrations)).await.unwrap();
86
87        let indexer = Indexer::new(
88            store,
89            IndexerArgs::default(),
90            ClientArgs {
91                ingestion: IngestionClientArgs {
92                    local_ingestion_path: Some(tempdir().unwrap().keep()),
93                    ..Default::default()
94                },
95                ..Default::default()
96            },
97            IngestionConfig::default(),
98            None,
99            &Registry::new(),
100        )
101        .await
102        .unwrap();
103        (indexer, temp_db)
104    }
105}
106
107#[cfg(test)]
108pub mod tests {
109    use std::sync::Arc;
110
111    use async_trait::async_trait;
112    use sui_indexer_alt_framework_store_traits::CommitterWatermark;
113    use sui_indexer_alt_framework_store_traits::Connection as _;
114    use sui_indexer_alt_framework_store_traits::InitWatermark;
115    use sui_types::full_checkpoint_content::Checkpoint;
116
117    use crate::ConcurrentConfig;
118    use crate::pipeline::Processor;
119
120    use super::*;
121
122    #[derive(FieldCount)]
123    struct V {
124        _v: u64,
125    }
126
127    macro_rules! define_test_concurrent_pipeline {
128        ($name:ident) => {
129            struct $name;
130            #[async_trait]
131            impl Processor for $name {
132                const NAME: &'static str = stringify!($name);
133                type Value = V;
134                async fn process(
135                    &self,
136                    _checkpoint: &Arc<Checkpoint>,
137                ) -> anyhow::Result<Vec<Self::Value>> {
138                    todo!()
139                }
140            }
141
142            #[async_trait]
143            impl handler::Handler for $name {
144                async fn commit<'a>(
145                    _values: &[Self::Value],
146                    _conn: &mut Connection<'a>,
147                ) -> anyhow::Result<usize> {
148                    todo!()
149                }
150            }
151        };
152    }
153
154    define_test_concurrent_pipeline!(ConcurrentPipeline1);
155    define_test_concurrent_pipeline!(ConcurrentPipeline2);
156
157    #[tokio::test]
158    async fn test_add_new_pipeline() {
159        let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
160        indexer
161            .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
162            .await
163            .unwrap();
164        assert_eq!(indexer.first_ingestion_checkpoint, 0);
165    }
166
167    #[tokio::test]
168    async fn test_add_existing_pipeline() {
169        let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
170        {
171            let watermark = CommitterWatermark::new_for_testing(10);
172            let mut conn = indexer.store().connect().await.unwrap();
173            conn.init_watermark(ConcurrentPipeline1::NAME, InitWatermark::default())
174                .await
175                .unwrap();
176            assert!(
177                conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark)
178                    .await
179                    .unwrap()
180            );
181        }
182        indexer
183            .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
184            .await
185            .unwrap();
186        assert_eq!(indexer.first_ingestion_checkpoint, 11);
187    }
188
189    #[tokio::test]
190    async fn test_add_multiple_pipelines() {
191        let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
192        {
193            let mut conn = indexer.store().connect().await.unwrap();
194
195            conn.init_watermark(ConcurrentPipeline1::NAME, InitWatermark::default())
196                .await
197                .unwrap();
198            let watermark1 = CommitterWatermark::new_for_testing(10);
199            assert!(
200                conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark1)
201                    .await
202                    .unwrap()
203            );
204
205            conn.init_watermark(ConcurrentPipeline2::NAME, InitWatermark::default())
206                .await
207                .unwrap();
208            let watermark2 = CommitterWatermark::new_for_testing(20);
209            assert!(
210                conn.set_committer_watermark(ConcurrentPipeline2::NAME, watermark2)
211                    .await
212                    .unwrap()
213            );
214        }
215
216        indexer
217            .concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default())
218            .await
219            .unwrap();
220        assert_eq!(indexer.first_ingestion_checkpoint, 21);
221        indexer
222            .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
223            .await
224            .unwrap();
225        assert_eq!(indexer.first_ingestion_checkpoint, 11);
226    }
227}