sui_indexer_alt_framework/
postgres.rs1use anyhow::{Context, Result};
5use diesel_migrations::EmbeddedMigrations;
6use prometheus::Registry;
7use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
8use sui_pg_db::temp::TempDb;
9use tempfile::tempdir;
10use tokio_util::sync::CancellationToken;
11use url::Url;
12
13use crate::{
14 Indexer, IndexerArgs,
15 ingestion::{ClientArgs, IngestionConfig},
16};
17
18pub use sui_pg_db::*;
19
20impl Indexer<Db> {
22 pub async fn new_from_pg(
39 database_url: Url,
40 db_args: DbArgs,
41 indexer_args: IndexerArgs,
42 client_args: ClientArgs,
43 ingestion_config: IngestionConfig,
44 migrations: Option<&'static EmbeddedMigrations>,
45 metrics_prefix: Option<&str>,
46 registry: &Registry,
47 cancel: CancellationToken,
48 ) -> Result<Self> {
49 let store = Db::for_write(database_url, db_args) .await
51 .context("Failed to connect to database")?;
52
53 store
55 .run_migrations(migrations)
56 .await
57 .context("Failed to run pending migrations")?;
58
59 registry.register(Box::new(DbConnectionStatsCollector::new(
60 Some("indexer_db"),
61 store.clone(),
62 )))?;
63
64 Indexer::new(
65 store,
66 indexer_args,
67 client_args,
68 ingestion_config,
69 metrics_prefix,
70 registry,
71 cancel,
72 )
73 .await
74 }
75
76 pub async fn new_for_testing(migrations: &'static EmbeddedMigrations) -> (Indexer<Db>, TempDb) {
80 let temp_db = TempDb::new().unwrap();
81 let store = Db::for_write(temp_db.database().url().clone(), DbArgs::default())
82 .await
83 .unwrap();
84 store.run_migrations(Some(migrations)).await.unwrap();
85
86 let indexer = Indexer::new(
87 store,
88 IndexerArgs::default(),
89 ClientArgs {
90 remote_store_url: None,
91 local_ingestion_path: Some(tempdir().unwrap().keep()),
92 rpc_api_url: None,
93 rpc_username: None,
94 rpc_password: None,
95 },
96 IngestionConfig::default(),
97 None,
98 &Registry::new(),
99 CancellationToken::new(),
100 )
101 .await
102 .unwrap();
103 (indexer, temp_db)
104 }
105}
106
107#[cfg(test)]
108pub mod tests {
109
110 use async_trait::async_trait;
111 use std::sync::Arc;
112 use sui_indexer_alt_framework_store_traits::{CommitterWatermark, Store};
113 use sui_types::full_checkpoint_content::CheckpointData;
114
115 use super::*;
116
117 use crate::pipeline::concurrent;
118 use crate::{ConcurrentConfig, FieldCount, pipeline::Processor, store::Connection};
119
120 #[derive(FieldCount)]
121 struct V {
122 _v: u64,
123 }
124
125 macro_rules! define_test_concurrent_pipeline {
126 ($name:ident) => {
127 struct $name;
128 #[async_trait]
129 impl Processor for $name {
130 const NAME: &'static str = stringify!($name);
131 type Value = V;
132 async fn process(
133 &self,
134 _checkpoint: &Arc<CheckpointData>,
135 ) -> anyhow::Result<Vec<Self::Value>> {
136 todo!()
137 }
138 }
139
140 #[async_trait]
141 impl concurrent::Handler for $name {
142 type Store = Db;
143
144 async fn commit<'a>(
145 _values: &[Self::Value],
146 _conn: &mut <Self::Store as Store>::Connection<'a>,
147 ) -> anyhow::Result<usize> {
148 todo!()
149 }
150 }
151 };
152 }
153
154 define_test_concurrent_pipeline!(ConcurrentPipeline1);
155 define_test_concurrent_pipeline!(ConcurrentPipeline2);
156
157 #[tokio::test]
158 async fn test_add_new_pipeline() {
159 let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
160 indexer
161 .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
162 .await
163 .unwrap();
164 assert_eq!(indexer.first_checkpoint_from_watermark, 0);
165 }
166
167 #[tokio::test]
168 async fn test_add_existing_pipeline() {
169 let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
170 {
171 let watermark = CommitterWatermark::new_for_testing(10);
172 let mut conn = indexer.store().connect().await.unwrap();
173 assert!(
174 conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark)
175 .await
176 .unwrap()
177 );
178 }
179 indexer
180 .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
181 .await
182 .unwrap();
183 assert_eq!(indexer.first_checkpoint_from_watermark, 11);
184 }
185
186 #[tokio::test]
187 async fn test_add_multiple_pipelines() {
188 let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
189 {
190 let watermark1 = CommitterWatermark::new_for_testing(10);
191 let mut conn = indexer.store().connect().await.unwrap();
192 assert!(
193 conn.set_committer_watermark(ConcurrentPipeline1::NAME, watermark1)
194 .await
195 .unwrap()
196 );
197 let watermark2 = CommitterWatermark::new_for_testing(20);
198 assert!(
199 conn.set_committer_watermark(ConcurrentPipeline2::NAME, watermark2)
200 .await
201 .unwrap()
202 );
203 }
204
205 indexer
206 .concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default())
207 .await
208 .unwrap();
209 assert_eq!(indexer.first_checkpoint_from_watermark, 21);
210 indexer
211 .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
212 .await
213 .unwrap();
214 assert_eq!(indexer.first_checkpoint_from_watermark, 11);
215 }
216}