sui_indexer/store/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use backon::{ExponentialBuilder, Retryable};
use diesel_async::{scoped_futures::ScopedBoxFuture, AsyncPgConnection};
pub(crate) use indexer_store::*;
pub use pg_indexer_store::PgIndexerStore;

use crate::{database::ConnectionPool, errors::IndexerError};

pub mod indexer_store;
pub mod package_resolver;
mod pg_indexer_store;
pub mod pg_partition_manager;

pub async fn transaction_with_retry<'a, Q, T>(
    pool: &ConnectionPool,
    timeout: Duration,
    query: Q,
) -> Result<T, IndexerError>
where
    Q: for<'r> FnOnce(
            &'r mut AsyncPgConnection,
        ) -> ScopedBoxFuture<'a, 'r, Result<T, IndexerError>>
        + Send,
    Q: Clone,
    T: 'a,
{
    let transaction_fn = || async {
        let mut connection = pool.get().await?;

        connection
            .build_transaction()
            .read_write()
            .run(query.clone())
            .await
    };

    transaction_fn
        .retry(ExponentialBuilder::default().with_max_delay(timeout))
        .when(|e: &IndexerError| {
            tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
            true
        })
        .await
}

pub async fn read_with_retry<'a, Q, T>(
    pool: &ConnectionPool,
    timeout: Duration,
    query: Q,
) -> Result<T, IndexerError>
where
    Q: for<'r> FnOnce(
            &'r mut AsyncPgConnection,
        ) -> ScopedBoxFuture<'a, 'r, Result<T, IndexerError>>
        + Send,
    Q: Clone,
    T: 'a,
{
    let read_fn = || async {
        let mut connection = pool.get().await?;

        connection
            .build_transaction()
            .read_only()
            .run(query.clone())
            .await
    };

    read_fn
        .retry(ExponentialBuilder::default().with_max_delay(timeout))
        .when(|e: &IndexerError| {
            tracing::error!("Error with reading data from DB: {:?}, retrying...", e);
            true
        })
        .await
}