sui_bridge_indexer/
postgres_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::ProcessedTxnData;
5use diesel::query_dsl::methods::FilterDsl;
6use diesel::upsert::excluded;
7use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
8use diesel_async::AsyncConnection;
9use diesel_async::AsyncPgConnection;
10use diesel_async::RunQueryDsl;
11use diesel_async::pooled_connection::AsyncDieselConnectionManager;
12use diesel_async::pooled_connection::bb8::Pool;
13use diesel_async::scoped_futures::ScopedFutureExt;
14use sui_bridge_schema::models::SuiProgressStore;
15use sui_bridge_schema::schema::governance_actions;
16use sui_bridge_schema::schema::sui_progress_store::txn_digest;
17use sui_bridge_schema::schema::{sui_error_transactions, token_transfer_data};
18use sui_bridge_schema::{schema, schema::token_transfer};
19use sui_types::digests::TransactionDigest;
20
21pub(crate) type PgPool =
22    diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>;
23
24const SUI_PROGRESS_STORE_DUMMY_KEY: i32 = 1;
25
26pub async fn get_connection_pool(database_url: String) -> PgPool {
27    let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(database_url);
28
29    Pool::builder()
30        .test_on_check_out(true)
31        .build(manager)
32        .await
33        .expect("Could not build Postgres DB connection pool")
34}
35
36// TODO: add retry logic
37pub async fn write(pool: &PgPool, token_txns: Vec<ProcessedTxnData>) -> Result<(), anyhow::Error> {
38    if token_txns.is_empty() {
39        return Ok(());
40    }
41    let (transfers, data, errors, gov_actions) = token_txns.iter().fold(
42        (vec![], vec![], vec![], vec![]),
43        |(mut transfers, mut data, mut errors, mut gov_actions), d| {
44            match d {
45                ProcessedTxnData::TokenTransfer(t) => {
46                    transfers.push(t.to_db());
47                    if let Some(d) = t.to_data_maybe() {
48                        data.push(d)
49                    }
50                }
51                ProcessedTxnData::Error(e) => errors.push(e.to_db()),
52                ProcessedTxnData::GovernanceAction(a) => gov_actions.push(a.to_db()),
53            }
54            (transfers, data, errors, gov_actions)
55        },
56    );
57
58    let connection = &mut pool.get().await?;
59    connection
60        .transaction(|conn| {
61            async move {
62                diesel::insert_into(token_transfer_data::table)
63                    .values(&data)
64                    .on_conflict((
65                        schema::token_transfer_data::dsl::chain_id,
66                        schema::token_transfer_data::dsl::nonce,
67                    ))
68                    .do_update()
69                    .set((
70                        token_transfer_data::txn_hash.eq(excluded(token_transfer_data::txn_hash)),
71                        token_transfer_data::chain_id.eq(excluded(token_transfer_data::chain_id)),
72                        token_transfer_data::nonce.eq(excluded(token_transfer_data::nonce)),
73                        token_transfer_data::block_height
74                            .eq(excluded(token_transfer_data::block_height)),
75                        token_transfer_data::timestamp_ms
76                            .eq(excluded(token_transfer_data::timestamp_ms)),
77                        token_transfer_data::sender_address
78                            .eq(excluded(token_transfer_data::sender_address)),
79                        token_transfer_data::destination_chain
80                            .eq(excluded(token_transfer_data::destination_chain)),
81                        token_transfer_data::recipient_address
82                            .eq(excluded(token_transfer_data::recipient_address)),
83                        token_transfer_data::token_id.eq(excluded(token_transfer_data::token_id)),
84                        token_transfer_data::amount.eq(excluded(token_transfer_data::amount)),
85                        token_transfer_data::is_finalized
86                            .eq(excluded(token_transfer_data::is_finalized)),
87                    ))
88                    .filter(token_transfer_data::is_finalized.eq(false))
89                    .execute(conn)
90                    .await?;
91                diesel::insert_into(token_transfer::table)
92                    .values(&transfers)
93                    .on_conflict((
94                        schema::token_transfer::dsl::chain_id,
95                        schema::token_transfer::dsl::nonce,
96                        schema::token_transfer::dsl::status,
97                    ))
98                    .do_update()
99                    .set((
100                        token_transfer::txn_hash.eq(excluded(token_transfer::txn_hash)),
101                        token_transfer::chain_id.eq(excluded(token_transfer::chain_id)),
102                        token_transfer::nonce.eq(excluded(token_transfer::nonce)),
103                        token_transfer::status.eq(excluded(token_transfer::status)),
104                        token_transfer::block_height.eq(excluded(token_transfer::block_height)),
105                        token_transfer::timestamp_ms.eq(excluded(token_transfer::timestamp_ms)),
106                        token_transfer::txn_sender.eq(excluded(token_transfer::txn_sender)),
107                        token_transfer::gas_usage.eq(excluded(token_transfer::gas_usage)),
108                        token_transfer::data_source.eq(excluded(token_transfer::data_source)),
109                        token_transfer::is_finalized.eq(excluded(token_transfer::is_finalized)),
110                    ))
111                    .filter(token_transfer::is_finalized.eq(false))
112                    .execute(conn)
113                    .await?;
114                diesel::insert_into(sui_error_transactions::table)
115                    .values(&errors)
116                    .on_conflict_do_nothing()
117                    .execute(conn)
118                    .await?;
119                diesel::insert_into(governance_actions::table)
120                    .values(&gov_actions)
121                    .on_conflict_do_nothing()
122                    .execute(conn)
123                    .await
124            }
125            .scope_boxed()
126        })
127        .await?;
128    Ok(())
129}
130
131pub async fn update_sui_progress_store(
132    pool: &PgPool,
133    tx_digest: TransactionDigest,
134) -> Result<(), anyhow::Error> {
135    let mut conn = pool.get().await?;
136    diesel::insert_into(schema::sui_progress_store::table)
137        .values(&SuiProgressStore {
138            id: SUI_PROGRESS_STORE_DUMMY_KEY,
139            txn_digest: tx_digest.inner().to_vec(),
140        })
141        .on_conflict(schema::sui_progress_store::dsl::id)
142        .do_update()
143        .set(txn_digest.eq(tx_digest.inner().to_vec()))
144        .execute(&mut conn)
145        .await?;
146    Ok(())
147}
148
149pub async fn read_sui_progress_store(pool: &PgPool) -> anyhow::Result<Option<TransactionDigest>> {
150    let mut conn = pool.get().await?;
151    let val: Option<SuiProgressStore> = schema::sui_progress_store::dsl::sui_progress_store
152        .select(SuiProgressStore::as_select())
153        .first(&mut conn)
154        .await
155        .optional()?;
156    match val {
157        Some(val) => Ok(Some(TransactionDigest::try_from(
158            val.txn_digest.as_slice(),
159        )?)),
160        None => Ok(None),
161    }
162}