sui_indexer_alt_framework/postgres/
mod.rs1use anyhow::{Context, Result};
5use diesel_migrations::EmbeddedMigrations;
6use prometheus::Registry;
7use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
8use sui_pg_db::temp::TempDb;
9use tempfile::tempdir;
10use tokio_util::sync::CancellationToken;
11use url::Url;
12
13use crate::{
14 Indexer, IndexerArgs,
15 ingestion::{ClientArgs, IngestionConfig},
16};
17
18pub use sui_pg_db::*;
19
20pub mod handler;
21
22impl Indexer<Db> {
24 pub async fn new_from_pg(
41 database_url: Url,
42 db_args: DbArgs,
43 indexer_args: IndexerArgs,
44 client_args: ClientArgs,
45 ingestion_config: IngestionConfig,
46 migrations: Option<&'static EmbeddedMigrations>,
47 metrics_prefix: Option<&str>,
48 registry: &Registry,
49 cancel: CancellationToken,
50 ) -> Result<Self> {
51 let store = Db::for_write(database_url, db_args) .await
53 .context("Failed to connect to database")?;
54
55 store
57 .run_migrations(migrations)
58 .await
59 .context("Failed to run pending migrations")?;
60
61 registry.register(Box::new(DbConnectionStatsCollector::new(
62 Some("indexer_db"),
63 store.clone(),
64 )))?;
65
66 Indexer::new(
67 store,
68 indexer_args,
69 client_args,
70 ingestion_config,
71 metrics_prefix,
72 registry,
73 cancel,
74 )
75 .await
76 }
77
78 pub async fn new_for_testing(migrations: &'static EmbeddedMigrations) -> (Indexer<Db>, TempDb) {
82 let temp_db = TempDb::new().unwrap();
83 let store = Db::for_write(temp_db.database().url().clone(), DbArgs::default())
84 .await
85 .unwrap();
86 store.run_migrations(Some(migrations)).await.unwrap();
87
88 let indexer = Indexer::new(
89 store,
90 IndexerArgs::default(),
91 ClientArgs {
92 local_ingestion_path: Some(tempdir().unwrap().keep()),
93 ..Default::default()
94 },
95 IngestionConfig::default(),
96 None,
97 &Registry::new(),
98 CancellationToken::new(),
99 )
100 .await
101 .unwrap();
102 (indexer, temp_db)
103 }
104}
105
106#[cfg(test)]
107pub mod tests {
108
109 use async_trait::async_trait;
110 use std::sync::Arc;
111 use sui_indexer_alt_framework_store_traits::{CommitterWatermark, Connection as _};
112 use sui_types::full_checkpoint_content::Checkpoint;
113
114 use super::*;
115
116 use crate::{ConcurrentConfig, pipeline::Processor};
117
118 #[derive(FieldCount)]
119 struct V {
120 _v: u64,
121 }
122
123 macro_rules! define_test_concurrent_pipeline {
124 ($name:ident) => {
125 struct $name;
126 #[async_trait]
127 impl Processor for $name {
128 const NAME: &'static str = stringify!($name);
129 type Value = V;
130 async fn process(
131 &self,
132 _checkpoint: &Arc<Checkpoint>,
133 ) -> anyhow::Result<Vec<Self::Value>> {
134 todo!()
135 }
136 }
137
138 #[async_trait]
139 impl handler::Handler for $name {
140 async fn commit<'a>(
141 _values: &[Self::Value],
142 _conn: &mut Connection<'a>,
143 ) -> anyhow::Result<usize> {
144 todo!()
145 }
146 }
147 };
148 }
149
150 define_test_concurrent_pipeline!(ConcurrentPipeline1);
151 define_test_concurrent_pipeline!(ConcurrentPipeline2);
152
153 #[tokio::test]
154 async fn test_add_new_pipeline() {
155 let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
156 indexer
157 .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
158 .await
159 .unwrap();
160 assert_eq!(indexer.first_checkpoint_from_watermark, 0);
161 }
162
163 #[tokio::test]
164 async fn test_add_existing_pipeline() {
165 let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
166 {
167 let watermark = CommitterWatermark::new_for_testing(10);
168 let mut conn = indexer.store().connect().await.unwrap();
169 assert!(
170 conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark)
171 .await
172 .unwrap()
173 );
174 }
175 indexer
176 .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
177 .await
178 .unwrap();
179 assert_eq!(indexer.first_checkpoint_from_watermark, 11);
180 }
181
182 #[tokio::test]
183 async fn test_add_multiple_pipelines() {
184 let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
185 {
186 let watermark1 = CommitterWatermark::new_for_testing(10);
187 let mut conn = indexer.store().connect().await.unwrap();
188 assert!(
189 conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark1)
190 .await
191 .unwrap()
192 );
193 let watermark2 = CommitterWatermark::new_for_testing(20);
194 assert!(
195 conn.set_committer_watermark(ConcurrentPipeline2::NAME, watermark2)
196 .await
197 .unwrap()
198 );
199 }
200
201 indexer
202 .concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default())
203 .await
204 .unwrap();
205 assert_eq!(indexer.first_checkpoint_from_watermark, 21);
206 indexer
207 .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
208 .await
209 .unwrap();
210 assert_eq!(indexer.first_checkpoint_from_watermark, 11);
211 }
212}