sui_indexer/store/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use backon::{ExponentialBuilder, Retryable};
7use diesel_async::{AsyncPgConnection, scoped_futures::ScopedBoxFuture};
8pub(crate) use indexer_store::*;
9pub use pg_indexer_store::PgIndexerStore;
10
11use crate::{database::ConnectionPool, errors::IndexerError};
12
13pub mod indexer_store;
14pub mod package_resolver;
15mod pg_indexer_store;
16pub mod pg_partition_manager;
17
18pub async fn transaction_with_retry<'a, Q, T>(
19    pool: &ConnectionPool,
20    timeout: Duration,
21    query: Q,
22) -> Result<T, IndexerError>
23where
24    Q: for<'r> FnOnce(
25            &'r mut AsyncPgConnection,
26        ) -> ScopedBoxFuture<'a, 'r, Result<T, IndexerError>>
27        + Send,
28    Q: Clone,
29    T: 'a,
30{
31    let transaction_fn = || async {
32        let mut connection = pool.get().await?;
33
34        connection
35            .build_transaction()
36            .read_write()
37            .run(query.clone())
38            .await
39    };
40
41    transaction_fn
42        .retry(ExponentialBuilder::default().with_max_delay(timeout))
43        .when(|e: &IndexerError| {
44            tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
45            true
46        })
47        .await
48}
49
50pub async fn read_with_retry<'a, Q, T>(
51    pool: &ConnectionPool,
52    timeout: Duration,
53    query: Q,
54) -> Result<T, IndexerError>
55where
56    Q: for<'r> FnOnce(
57            &'r mut AsyncPgConnection,
58        ) -> ScopedBoxFuture<'a, 'r, Result<T, IndexerError>>
59        + Send,
60    Q: Clone,
61    T: 'a,
62{
63    let read_fn = || async {
64        let mut connection = pool.get().await?;
65
66        connection
67            .build_transaction()
68            .read_only()
69            .run(query.clone())
70            .await
71    };
72
73    read_fn
74        .retry(ExponentialBuilder::default().with_max_delay(timeout))
75        .when(|e: &IndexerError| {
76            tracing::error!("Error with reading data from DB: {:?}, retrying...", e);
77            true
78        })
79        .await
80}