sui_indexer/
test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use mysten_metrics::init_metrics;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use simulacrum::Simulacrum;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13use sui_json_rpc_types::SuiTransactionBlockResponse;
14use sui_pg_db::temp::{TempDb, get_available_port};
15
16use crate::IndexerMetrics;
17use crate::config::{IngestionConfig, RetentionConfig, SnapshotLagConfig, UploadOptions};
18use crate::database::Connection;
19use crate::database::ConnectionPool;
20use crate::db::ConnectionPoolConfig;
21use crate::errors::IndexerError;
22use crate::indexer::Indexer;
23use crate::store::PgIndexerStore;
24
25/// Wrapper over `Indexer::start_reader` to make it easier to configure an indexer jsonrpc reader
26/// for testing.
27pub async fn start_indexer_jsonrpc_for_testing(
28    db_url: String,
29    fullnode_url: String,
30    json_rpc_url: String,
31    cancel: Option<CancellationToken>,
32) -> (JoinHandle<Result<(), IndexerError>>, CancellationToken) {
33    let token = cancel.unwrap_or_default();
34
35    // Reduce the connection pool size to 10 for testing
36    // to prevent maxing out
37    let pool_config = ConnectionPoolConfig {
38        pool_size: 5,
39        connection_timeout: Duration::from_secs(10),
40        statement_timeout: Duration::from_secs(30),
41    };
42
43    println!("db_url: {db_url}");
44    println!("pool_config: {pool_config:?}");
45
46    let registry = prometheus::Registry::default();
47    init_metrics(&registry);
48
49    let pool = ConnectionPool::new(db_url.parse().unwrap(), pool_config)
50        .await
51        .unwrap();
52
53    let handle = {
54        let config = crate::config::JsonRpcConfig {
55            name_service_options: crate::config::NameServiceOptions::default(),
56            rpc_address: json_rpc_url.parse().unwrap(),
57            rpc_client_url: fullnode_url,
58        };
59        let token_clone = token.clone();
60        tokio::spawn(
61            async move { Indexer::start_reader(&config, &registry, pool, token_clone).await },
62        )
63    };
64
65    (handle, token)
66}
67
68/// Wrapper over `Indexer::start_writer_with_config` to make it easier to configure an indexer
69/// writer for testing. If the config options are null, default values that have historically worked
70/// for testing will be used.
71pub async fn start_indexer_writer_for_testing(
72    db_url: String,
73    snapshot_config: Option<SnapshotLagConfig>,
74    retention_config: Option<RetentionConfig>,
75    data_ingestion_path: Option<PathBuf>,
76    cancel: Option<CancellationToken>,
77    start_checkpoint: Option<u64>,
78    end_checkpoint: Option<u64>,
79) -> (
80    PgIndexerStore,
81    JoinHandle<Result<(), IndexerError>>,
82    CancellationToken,
83) {
84    let token = cancel.unwrap_or_default();
85    let snapshot_config = snapshot_config.unwrap_or(SnapshotLagConfig {
86        snapshot_min_lag: 5,
87        sleep_duration: 0,
88    });
89
90    // Reduce the connection pool size to 10 for testing to prevent maxing out
91    let pool_config = ConnectionPoolConfig {
92        pool_size: 5,
93        connection_timeout: Duration::from_secs(10),
94        statement_timeout: Duration::from_secs(30),
95    };
96
97    println!("db_url: {db_url}");
98    println!("pool_config: {pool_config:?}");
99    println!("{data_ingestion_path:?}");
100
101    let registry = prometheus::Registry::default();
102    init_metrics(&registry);
103    let indexer_metrics = IndexerMetrics::new(&registry);
104
105    let pool = ConnectionPool::new(db_url.parse().unwrap(), pool_config)
106        .await
107        .unwrap();
108    let store = PgIndexerStore::new(
109        pool.clone(),
110        UploadOptions::default(),
111        indexer_metrics.clone(),
112    );
113
114    let handle = {
115        let connection = Connection::dedicated(&db_url.parse().unwrap())
116            .await
117            .unwrap();
118        crate::db::reset_database(connection).await.unwrap();
119
120        let store_clone = store.clone();
121        let mut ingestion_config = IngestionConfig {
122            start_checkpoint,
123            end_checkpoint,
124            ..Default::default()
125        };
126        ingestion_config.sources.data_ingestion_path = data_ingestion_path;
127        let token_clone = token.clone();
128
129        tokio::spawn(async move {
130            Indexer::start_writer(
131                ingestion_config,
132                store_clone,
133                indexer_metrics,
134                snapshot_config,
135                retention_config,
136                token_clone,
137            )
138            .await
139        })
140    };
141
142    (store, handle, token)
143}
144
145#[derive(Clone)]
146pub struct SuiTransactionBlockResponseBuilder<'a> {
147    response: SuiTransactionBlockResponse,
148    full_response: &'a SuiTransactionBlockResponse,
149}
150
151impl<'a> SuiTransactionBlockResponseBuilder<'a> {
152    pub fn new(full_response: &'a SuiTransactionBlockResponse) -> Self {
153        Self {
154            response: SuiTransactionBlockResponse::default(),
155            full_response,
156        }
157    }
158
159    pub fn with_input(mut self) -> Self {
160        self.response = SuiTransactionBlockResponse {
161            transaction: self.full_response.transaction.clone(),
162            ..self.response
163        };
164        self
165    }
166
167    pub fn with_raw_input(mut self) -> Self {
168        self.response = SuiTransactionBlockResponse {
169            raw_transaction: self.full_response.raw_transaction.clone(),
170            ..self.response
171        };
172        self
173    }
174
175    pub fn with_effects(mut self) -> Self {
176        self.response = SuiTransactionBlockResponse {
177            effects: self.full_response.effects.clone(),
178            ..self.response
179        };
180        self
181    }
182
183    pub fn with_events(mut self) -> Self {
184        self.response = SuiTransactionBlockResponse {
185            events: self.full_response.events.clone(),
186            ..self.response
187        };
188        self
189    }
190
191    pub fn with_balance_changes(mut self) -> Self {
192        self.response = SuiTransactionBlockResponse {
193            balance_changes: self.full_response.balance_changes.clone(),
194            ..self.response
195        };
196        self
197    }
198
199    pub fn with_object_changes(mut self) -> Self {
200        self.response = SuiTransactionBlockResponse {
201            object_changes: self.full_response.object_changes.clone(),
202            ..self.response
203        };
204        self
205    }
206
207    pub fn with_input_and_changes(mut self) -> Self {
208        self.response = SuiTransactionBlockResponse {
209            transaction: self.full_response.transaction.clone(),
210            balance_changes: self.full_response.balance_changes.clone(),
211            object_changes: self.full_response.object_changes.clone(),
212            ..self.response
213        };
214        self
215    }
216
217    pub fn build(self) -> SuiTransactionBlockResponse {
218        SuiTransactionBlockResponse {
219            transaction: self.response.transaction,
220            raw_transaction: self.response.raw_transaction,
221            effects: self.response.effects,
222            events: self.response.events,
223            balance_changes: self.response.balance_changes,
224            object_changes: self.response.object_changes,
225            // Use full response for any fields that aren't showable
226            ..self.full_response.clone()
227        }
228    }
229}
230
231/// Set up a test indexer fetching from a REST endpoint served by the given Simulacrum.
232pub async fn set_up(
233    sim: Arc<Simulacrum>,
234    data_ingestion_path: PathBuf,
235) -> (
236    JoinHandle<()>,
237    PgIndexerStore,
238    JoinHandle<Result<(), IndexerError>>,
239    TempDb,
240) {
241    let database = TempDb::new().unwrap();
242    let server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port())
243        .parse()
244        .unwrap();
245
246    let server_handle = tokio::spawn(async move {
247        sui_rpc_api::RpcService::new(sim)
248            .start_service(server_url)
249            .await;
250    });
251    // Starts indexer
252    let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
253        database.database().url().as_str().to_owned(),
254        None,
255        None,
256        Some(data_ingestion_path),
257        None, /* cancel */
258        None, /* start_checkpoint */
259        None, /* end_checkpoint */
260    )
261    .await;
262    (server_handle, pg_store, pg_handle, database)
263}
264
265pub async fn set_up_with_start_and_end_checkpoints(
266    sim: Arc<Simulacrum>,
267    data_ingestion_path: PathBuf,
268    start_checkpoint: u64,
269    end_checkpoint: u64,
270) -> (
271    JoinHandle<()>,
272    PgIndexerStore,
273    JoinHandle<Result<(), IndexerError>>,
274    TempDb,
275) {
276    let database = TempDb::new().unwrap();
277    let server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port())
278        .parse()
279        .unwrap();
280    let server_handle = tokio::spawn(async move {
281        sui_rpc_api::RpcService::new(sim)
282            .start_service(server_url)
283            .await;
284    });
285    // Starts indexer
286    let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
287        database.database().url().as_str().to_owned(),
288        None,
289        None,
290        Some(data_ingestion_path),
291        None, /* cancel */
292        Some(start_checkpoint),
293        Some(end_checkpoint),
294    )
295    .await;
296    (server_handle, pg_store, pg_handle, database)
297}
298
299/// Wait for the indexer to catch up to the given checkpoint sequence number.
300pub async fn wait_for_checkpoint(
301    pg_store: &PgIndexerStore,
302    checkpoint_sequence_number: u64,
303) -> Result<(), IndexerError> {
304    tokio::time::timeout(Duration::from_secs(30), async {
305        while {
306            let cp_opt = pg_store
307                .get_latest_checkpoint_sequence_number()
308                .await
309                .unwrap();
310            cp_opt.is_none() || (cp_opt.unwrap() < checkpoint_sequence_number)
311        } {
312            tokio::time::sleep(Duration::from_millis(100)).await;
313        }
314    })
315    .await
316    .expect("Timeout waiting for indexer to catchup to checkpoint");
317    Ok(())
318}
319
320/// Wait for the indexer to catch up to the given checkpoint sequence number for objects snapshot.
321pub async fn wait_for_objects_snapshot(
322    pg_store: &PgIndexerStore,
323    checkpoint_sequence_number: u64,
324) -> Result<(), IndexerError> {
325    tokio::time::timeout(Duration::from_secs(30), async {
326        while {
327            let cp_opt = pg_store
328                .get_latest_object_snapshot_checkpoint_sequence_number()
329                .await
330                .unwrap();
331            cp_opt.is_none() || (cp_opt.unwrap() < checkpoint_sequence_number)
332        } {
333            tokio::time::sleep(Duration::from_millis(100)).await;
334        }
335    })
336    .await
337    .expect("Timeout waiting for indexer to catchup to checkpoint for objects snapshot");
338    Ok(())
339}