1use mysten_metrics::init_metrics;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use simulacrum::Simulacrum;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13use sui_json_rpc_types::SuiTransactionBlockResponse;
14use sui_pg_db::temp::{TempDb, get_available_port};
15
16use crate::IndexerMetrics;
17use crate::config::{IngestionConfig, RetentionConfig, SnapshotLagConfig, UploadOptions};
18use crate::database::Connection;
19use crate::database::ConnectionPool;
20use crate::db::ConnectionPoolConfig;
21use crate::errors::IndexerError;
22use crate::indexer::Indexer;
23use crate::store::PgIndexerStore;
24
25pub async fn start_indexer_jsonrpc_for_testing(
28 db_url: String,
29 fullnode_url: String,
30 json_rpc_url: String,
31 cancel: Option<CancellationToken>,
32) -> (JoinHandle<Result<(), IndexerError>>, CancellationToken) {
33 let token = cancel.unwrap_or_default();
34
35 let pool_config = ConnectionPoolConfig {
38 pool_size: 5,
39 connection_timeout: Duration::from_secs(10),
40 statement_timeout: Duration::from_secs(30),
41 };
42
43 println!("db_url: {db_url}");
44 println!("pool_config: {pool_config:?}");
45
46 let registry = prometheus::Registry::default();
47 init_metrics(®istry);
48
49 let pool = ConnectionPool::new(db_url.parse().unwrap(), pool_config)
50 .await
51 .unwrap();
52
53 let handle = {
54 let config = crate::config::JsonRpcConfig {
55 name_service_options: crate::config::NameServiceOptions::default(),
56 rpc_address: json_rpc_url.parse().unwrap(),
57 rpc_client_url: fullnode_url,
58 };
59 let token_clone = token.clone();
60 tokio::spawn(
61 async move { Indexer::start_reader(&config, ®istry, pool, token_clone).await },
62 )
63 };
64
65 (handle, token)
66}
67
68pub async fn start_indexer_writer_for_testing(
72 db_url: String,
73 snapshot_config: Option<SnapshotLagConfig>,
74 retention_config: Option<RetentionConfig>,
75 data_ingestion_path: Option<PathBuf>,
76 cancel: Option<CancellationToken>,
77 start_checkpoint: Option<u64>,
78 end_checkpoint: Option<u64>,
79) -> (
80 PgIndexerStore,
81 JoinHandle<Result<(), IndexerError>>,
82 CancellationToken,
83) {
84 let token = cancel.unwrap_or_default();
85 let snapshot_config = snapshot_config.unwrap_or(SnapshotLagConfig {
86 snapshot_min_lag: 5,
87 sleep_duration: 0,
88 });
89
90 let pool_config = ConnectionPoolConfig {
92 pool_size: 5,
93 connection_timeout: Duration::from_secs(10),
94 statement_timeout: Duration::from_secs(30),
95 };
96
97 println!("db_url: {db_url}");
98 println!("pool_config: {pool_config:?}");
99 println!("{data_ingestion_path:?}");
100
101 let registry = prometheus::Registry::default();
102 init_metrics(®istry);
103 let indexer_metrics = IndexerMetrics::new(®istry);
104
105 let pool = ConnectionPool::new(db_url.parse().unwrap(), pool_config)
106 .await
107 .unwrap();
108 let store = PgIndexerStore::new(
109 pool.clone(),
110 UploadOptions::default(),
111 indexer_metrics.clone(),
112 );
113
114 let handle = {
115 let connection = Connection::dedicated(&db_url.parse().unwrap())
116 .await
117 .unwrap();
118 crate::db::reset_database(connection).await.unwrap();
119
120 let store_clone = store.clone();
121 let mut ingestion_config = IngestionConfig {
122 start_checkpoint,
123 end_checkpoint,
124 ..Default::default()
125 };
126 ingestion_config.sources.data_ingestion_path = data_ingestion_path;
127 let token_clone = token.clone();
128
129 tokio::spawn(async move {
130 Indexer::start_writer(
131 ingestion_config,
132 store_clone,
133 indexer_metrics,
134 snapshot_config,
135 retention_config,
136 token_clone,
137 )
138 .await
139 })
140 };
141
142 (store, handle, token)
143}
144
145#[derive(Clone)]
146pub struct SuiTransactionBlockResponseBuilder<'a> {
147 response: SuiTransactionBlockResponse,
148 full_response: &'a SuiTransactionBlockResponse,
149}
150
151impl<'a> SuiTransactionBlockResponseBuilder<'a> {
152 pub fn new(full_response: &'a SuiTransactionBlockResponse) -> Self {
153 Self {
154 response: SuiTransactionBlockResponse::default(),
155 full_response,
156 }
157 }
158
159 pub fn with_input(mut self) -> Self {
160 self.response = SuiTransactionBlockResponse {
161 transaction: self.full_response.transaction.clone(),
162 ..self.response
163 };
164 self
165 }
166
167 pub fn with_raw_input(mut self) -> Self {
168 self.response = SuiTransactionBlockResponse {
169 raw_transaction: self.full_response.raw_transaction.clone(),
170 ..self.response
171 };
172 self
173 }
174
175 pub fn with_effects(mut self) -> Self {
176 self.response = SuiTransactionBlockResponse {
177 effects: self.full_response.effects.clone(),
178 ..self.response
179 };
180 self
181 }
182
183 pub fn with_events(mut self) -> Self {
184 self.response = SuiTransactionBlockResponse {
185 events: self.full_response.events.clone(),
186 ..self.response
187 };
188 self
189 }
190
191 pub fn with_balance_changes(mut self) -> Self {
192 self.response = SuiTransactionBlockResponse {
193 balance_changes: self.full_response.balance_changes.clone(),
194 ..self.response
195 };
196 self
197 }
198
199 pub fn with_object_changes(mut self) -> Self {
200 self.response = SuiTransactionBlockResponse {
201 object_changes: self.full_response.object_changes.clone(),
202 ..self.response
203 };
204 self
205 }
206
207 pub fn with_input_and_changes(mut self) -> Self {
208 self.response = SuiTransactionBlockResponse {
209 transaction: self.full_response.transaction.clone(),
210 balance_changes: self.full_response.balance_changes.clone(),
211 object_changes: self.full_response.object_changes.clone(),
212 ..self.response
213 };
214 self
215 }
216
217 pub fn build(self) -> SuiTransactionBlockResponse {
218 SuiTransactionBlockResponse {
219 transaction: self.response.transaction,
220 raw_transaction: self.response.raw_transaction,
221 effects: self.response.effects,
222 events: self.response.events,
223 balance_changes: self.response.balance_changes,
224 object_changes: self.response.object_changes,
225 ..self.full_response.clone()
227 }
228 }
229}
230
231pub async fn set_up(
233 sim: Arc<Simulacrum>,
234 data_ingestion_path: PathBuf,
235) -> (
236 JoinHandle<()>,
237 PgIndexerStore,
238 JoinHandle<Result<(), IndexerError>>,
239 TempDb,
240) {
241 let database = TempDb::new().unwrap();
242 let server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port())
243 .parse()
244 .unwrap();
245
246 let server_handle = tokio::spawn(async move {
247 sui_rpc_api::RpcService::new(sim)
248 .start_service(server_url)
249 .await;
250 });
251 let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
253 database.database().url().as_str().to_owned(),
254 None,
255 None,
256 Some(data_ingestion_path),
257 None, None, None, )
261 .await;
262 (server_handle, pg_store, pg_handle, database)
263}
264
265pub async fn set_up_with_start_and_end_checkpoints(
266 sim: Arc<Simulacrum>,
267 data_ingestion_path: PathBuf,
268 start_checkpoint: u64,
269 end_checkpoint: u64,
270) -> (
271 JoinHandle<()>,
272 PgIndexerStore,
273 JoinHandle<Result<(), IndexerError>>,
274 TempDb,
275) {
276 let database = TempDb::new().unwrap();
277 let server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port())
278 .parse()
279 .unwrap();
280 let server_handle = tokio::spawn(async move {
281 sui_rpc_api::RpcService::new(sim)
282 .start_service(server_url)
283 .await;
284 });
285 let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
287 database.database().url().as_str().to_owned(),
288 None,
289 None,
290 Some(data_ingestion_path),
291 None, Some(start_checkpoint),
293 Some(end_checkpoint),
294 )
295 .await;
296 (server_handle, pg_store, pg_handle, database)
297}
298
299pub async fn wait_for_checkpoint(
301 pg_store: &PgIndexerStore,
302 checkpoint_sequence_number: u64,
303) -> Result<(), IndexerError> {
304 tokio::time::timeout(Duration::from_secs(30), async {
305 while {
306 let cp_opt = pg_store
307 .get_latest_checkpoint_sequence_number()
308 .await
309 .unwrap();
310 cp_opt.is_none() || (cp_opt.unwrap() < checkpoint_sequence_number)
311 } {
312 tokio::time::sleep(Duration::from_millis(100)).await;
313 }
314 })
315 .await
316 .expect("Timeout waiting for indexer to catchup to checkpoint");
317 Ok(())
318}
319
320pub async fn wait_for_objects_snapshot(
322 pg_store: &PgIndexerStore,
323 checkpoint_sequence_number: u64,
324) -> Result<(), IndexerError> {
325 tokio::time::timeout(Duration::from_secs(30), async {
326 while {
327 let cp_opt = pg_store
328 .get_latest_object_snapshot_checkpoint_sequence_number()
329 .await
330 .unwrap();
331 cp_opt.is_none() || (cp_opt.unwrap() < checkpoint_sequence_number)
332 } {
333 tokio::time::sleep(Duration::from_millis(100)).await;
334 }
335 })
336 .await
337 .expect("Timeout waiting for indexer to catchup to checkpoint for objects snapshot");
338 Ok(())
339}