1use std::time::Duration;
5
6use backon::{ExponentialBuilder, Retryable};
7use diesel_async::{AsyncPgConnection, scoped_futures::ScopedBoxFuture};
8pub(crate) use indexer_store::*;
9pub use pg_indexer_store::PgIndexerStore;
10
11use crate::{database::ConnectionPool, errors::IndexerError};
12
13pub mod indexer_store;
14pub mod package_resolver;
15mod pg_indexer_store;
16pub mod pg_partition_manager;
17
18pub async fn transaction_with_retry<'a, Q, T>(
19 pool: &ConnectionPool,
20 timeout: Duration,
21 query: Q,
22) -> Result<T, IndexerError>
23where
24 Q: for<'r> FnOnce(
25 &'r mut AsyncPgConnection,
26 ) -> ScopedBoxFuture<'a, 'r, Result<T, IndexerError>>
27 + Send,
28 Q: Clone,
29 T: 'a,
30{
31 let transaction_fn = || async {
32 let mut connection = pool.get().await?;
33
34 connection
35 .build_transaction()
36 .read_write()
37 .run(query.clone())
38 .await
39 };
40
41 transaction_fn
42 .retry(ExponentialBuilder::default().with_max_delay(timeout))
43 .when(|e: &IndexerError| {
44 tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
45 true
46 })
47 .await
48}
49
50pub async fn read_with_retry<'a, Q, T>(
51 pool: &ConnectionPool,
52 timeout: Duration,
53 query: Q,
54) -> Result<T, IndexerError>
55where
56 Q: for<'r> FnOnce(
57 &'r mut AsyncPgConnection,
58 ) -> ScopedBoxFuture<'a, 'r, Result<T, IndexerError>>
59 + Send,
60 Q: Clone,
61 T: 'a,
62{
63 let read_fn = || async {
64 let mut connection = pool.get().await?;
65
66 connection
67 .build_transaction()
68 .read_only()
69 .run(query.clone())
70 .await
71 };
72
73 read_fn
74 .retry(ExponentialBuilder::default().with_max_delay(timeout))
75 .when(|e: &IndexerError| {
76 tracing::error!("Error with reading data from DB: {:?}, retrying...", e);
77 true
78 })
79 .await
80}