sui_indexer_alt_framework/postgres/
mod.rs1use anyhow::Context;
5use anyhow::Result;
6use diesel_migrations::EmbeddedMigrations;
7use prometheus::Registry;
8use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
9use sui_pg_db::temp::TempDb;
10use tempfile::tempdir;
11use url::Url;
12
13use crate::Indexer;
14use crate::IndexerArgs;
15use crate::ingestion::ClientArgs;
16use crate::ingestion::IngestionConfig;
17use crate::ingestion::ingestion_client::IngestionClientArgs;
18
19pub use sui_pg_db::*;
20
21pub mod handler;
22
23impl Indexer<Db> {
25 pub async fn new_from_pg(
42 database_url: Url,
43 db_args: DbArgs,
44 indexer_args: IndexerArgs,
45 client_args: ClientArgs,
46 ingestion_config: IngestionConfig,
47 migrations: Option<&'static EmbeddedMigrations>,
48 metrics_prefix: Option<&str>,
49 registry: &Registry,
50 ) -> Result<Self> {
51 let store = Db::for_write(database_url, db_args) .await
53 .context("Failed to connect to database")?;
54
55 store
57 .run_migrations(migrations)
58 .await
59 .context("Failed to run pending migrations")?;
60
61 registry.register(Box::new(DbConnectionStatsCollector::new(
62 Some("indexer_db"),
63 store.clone(),
64 )))?;
65
66 Indexer::new(
67 store,
68 indexer_args,
69 client_args,
70 ingestion_config,
71 metrics_prefix,
72 registry,
73 )
74 .await
75 }
76
77 pub async fn new_for_testing(migrations: &'static EmbeddedMigrations) -> (Indexer<Db>, TempDb) {
81 let temp_db = TempDb::new().unwrap();
82 let store = Db::for_write(temp_db.database().url().clone(), DbArgs::default())
83 .await
84 .unwrap();
85 store.run_migrations(Some(migrations)).await.unwrap();
86
87 let indexer = Indexer::new(
88 store,
89 IndexerArgs::default(),
90 ClientArgs {
91 ingestion: IngestionClientArgs {
92 local_ingestion_path: Some(tempdir().unwrap().keep()),
93 ..Default::default()
94 },
95 ..Default::default()
96 },
97 IngestionConfig::default(),
98 None,
99 &Registry::new(),
100 )
101 .await
102 .unwrap();
103 (indexer, temp_db)
104 }
105}
106
107#[cfg(test)]
108pub mod tests {
109 use std::sync::Arc;
110
111 use async_trait::async_trait;
112 use sui_indexer_alt_framework_store_traits::CommitterWatermark;
113 use sui_indexer_alt_framework_store_traits::Connection as _;
114 use sui_types::full_checkpoint_content::Checkpoint;
115
116 use crate::ConcurrentConfig;
117 use crate::pipeline::Processor;
118
119 use super::*;
120
121 #[derive(FieldCount)]
122 struct V {
123 _v: u64,
124 }
125
126 macro_rules! define_test_concurrent_pipeline {
127 ($name:ident) => {
128 struct $name;
129 #[async_trait]
130 impl Processor for $name {
131 const NAME: &'static str = stringify!($name);
132 type Value = V;
133 async fn process(
134 &self,
135 _checkpoint: &Arc<Checkpoint>,
136 ) -> anyhow::Result<Vec<Self::Value>> {
137 todo!()
138 }
139 }
140
141 #[async_trait]
142 impl handler::Handler for $name {
143 async fn commit<'a>(
144 _values: &[Self::Value],
145 _conn: &mut Connection<'a>,
146 ) -> anyhow::Result<usize> {
147 todo!()
148 }
149 }
150 };
151 }
152
153 define_test_concurrent_pipeline!(ConcurrentPipeline1);
154 define_test_concurrent_pipeline!(ConcurrentPipeline2);
155
156 #[tokio::test]
157 async fn test_add_new_pipeline() {
158 let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
159 indexer
160 .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
161 .await
162 .unwrap();
163 assert_eq!(indexer.last_checkpoint, None);
164 assert_eq!(indexer.next_checkpoint, 0);
165 assert_eq!(indexer.next_sequential_checkpoint, None);
166 }
167
168 #[tokio::test]
169 async fn test_add_existing_pipeline() {
170 let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
171 {
172 let watermark = CommitterWatermark::new_for_testing(10);
173 let mut conn = indexer.store().connect().await.unwrap();
174 conn.init_watermark(ConcurrentPipeline1::NAME, None)
175 .await
176 .unwrap();
177 assert!(
178 conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark)
179 .await
180 .unwrap()
181 );
182 }
183 indexer
184 .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
185 .await
186 .unwrap();
187 assert_eq!(indexer.last_checkpoint, None);
188 assert_eq!(indexer.next_checkpoint, 11);
189 assert_eq!(indexer.next_sequential_checkpoint, None);
190 }
191
192 #[tokio::test]
193 async fn test_add_multiple_pipelines() {
194 let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
195 {
196 let mut conn = indexer.store().connect().await.unwrap();
197
198 conn.init_watermark(ConcurrentPipeline1::NAME, None)
199 .await
200 .unwrap();
201 let watermark1 = CommitterWatermark::new_for_testing(10);
202 assert!(
203 conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark1)
204 .await
205 .unwrap()
206 );
207
208 conn.init_watermark(ConcurrentPipeline2::NAME, None)
209 .await
210 .unwrap();
211 let watermark2 = CommitterWatermark::new_for_testing(20);
212 assert!(
213 conn.set_committer_watermark(ConcurrentPipeline2::NAME, watermark2)
214 .await
215 .unwrap()
216 );
217 }
218
219 indexer
220 .concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default())
221 .await
222 .unwrap();
223 assert_eq!(indexer.last_checkpoint, None);
224 assert_eq!(indexer.next_checkpoint, 21);
225 assert_eq!(indexer.next_sequential_checkpoint, None);
226 indexer
227 .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
228 .await
229 .unwrap();
230 assert_eq!(indexer.last_checkpoint, None);
231 assert_eq!(indexer.next_checkpoint, 11);
232 assert_eq!(indexer.next_sequential_checkpoint, None);
233 }
234}