sui_bridge_indexer/
postgres_manager.rs1use crate::ProcessedTxnData;
5use diesel::query_dsl::methods::FilterDsl;
6use diesel::upsert::excluded;
7use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
8use diesel_async::AsyncConnection;
9use diesel_async::AsyncPgConnection;
10use diesel_async::RunQueryDsl;
11use diesel_async::pooled_connection::AsyncDieselConnectionManager;
12use diesel_async::pooled_connection::bb8::Pool;
13use diesel_async::scoped_futures::ScopedFutureExt;
14use sui_bridge_schema::models::SuiProgressStore;
15use sui_bridge_schema::schema::governance_actions;
16use sui_bridge_schema::schema::sui_progress_store::txn_digest;
17use sui_bridge_schema::schema::{sui_error_transactions, token_transfer_data};
18use sui_bridge_schema::{schema, schema::token_transfer};
19use sui_types::digests::TransactionDigest;
20
21pub(crate) type PgPool =
22 diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>;
23
24const SUI_PROGRESS_STORE_DUMMY_KEY: i32 = 1;
25
26pub async fn get_connection_pool(database_url: String) -> PgPool {
27 let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(database_url);
28
29 Pool::builder()
30 .test_on_check_out(true)
31 .build(manager)
32 .await
33 .expect("Could not build Postgres DB connection pool")
34}
35
36pub async fn write(pool: &PgPool, token_txns: Vec<ProcessedTxnData>) -> Result<(), anyhow::Error> {
38 if token_txns.is_empty() {
39 return Ok(());
40 }
41 let (transfers, data, errors, gov_actions) = token_txns.iter().fold(
42 (vec![], vec![], vec![], vec![]),
43 |(mut transfers, mut data, mut errors, mut gov_actions), d| {
44 match d {
45 ProcessedTxnData::TokenTransfer(t) => {
46 transfers.push(t.to_db());
47 if let Some(d) = t.to_data_maybe() {
48 data.push(d)
49 }
50 }
51 ProcessedTxnData::Error(e) => errors.push(e.to_db()),
52 ProcessedTxnData::GovernanceAction(a) => gov_actions.push(a.to_db()),
53 }
54 (transfers, data, errors, gov_actions)
55 },
56 );
57
58 let connection = &mut pool.get().await?;
59 connection
60 .transaction(|conn| {
61 async move {
62 diesel::insert_into(token_transfer_data::table)
63 .values(&data)
64 .on_conflict((
65 schema::token_transfer_data::dsl::chain_id,
66 schema::token_transfer_data::dsl::nonce,
67 ))
68 .do_update()
69 .set((
70 token_transfer_data::txn_hash.eq(excluded(token_transfer_data::txn_hash)),
71 token_transfer_data::chain_id.eq(excluded(token_transfer_data::chain_id)),
72 token_transfer_data::nonce.eq(excluded(token_transfer_data::nonce)),
73 token_transfer_data::block_height
74 .eq(excluded(token_transfer_data::block_height)),
75 token_transfer_data::timestamp_ms
76 .eq(excluded(token_transfer_data::timestamp_ms)),
77 token_transfer_data::sender_address
78 .eq(excluded(token_transfer_data::sender_address)),
79 token_transfer_data::destination_chain
80 .eq(excluded(token_transfer_data::destination_chain)),
81 token_transfer_data::recipient_address
82 .eq(excluded(token_transfer_data::recipient_address)),
83 token_transfer_data::token_id.eq(excluded(token_transfer_data::token_id)),
84 token_transfer_data::amount.eq(excluded(token_transfer_data::amount)),
85 token_transfer_data::is_finalized
86 .eq(excluded(token_transfer_data::is_finalized)),
87 ))
88 .filter(token_transfer_data::is_finalized.eq(false))
89 .execute(conn)
90 .await?;
91 diesel::insert_into(token_transfer::table)
92 .values(&transfers)
93 .on_conflict((
94 schema::token_transfer::dsl::chain_id,
95 schema::token_transfer::dsl::nonce,
96 schema::token_transfer::dsl::status,
97 ))
98 .do_update()
99 .set((
100 token_transfer::txn_hash.eq(excluded(token_transfer::txn_hash)),
101 token_transfer::chain_id.eq(excluded(token_transfer::chain_id)),
102 token_transfer::nonce.eq(excluded(token_transfer::nonce)),
103 token_transfer::status.eq(excluded(token_transfer::status)),
104 token_transfer::block_height.eq(excluded(token_transfer::block_height)),
105 token_transfer::timestamp_ms.eq(excluded(token_transfer::timestamp_ms)),
106 token_transfer::txn_sender.eq(excluded(token_transfer::txn_sender)),
107 token_transfer::gas_usage.eq(excluded(token_transfer::gas_usage)),
108 token_transfer::data_source.eq(excluded(token_transfer::data_source)),
109 token_transfer::is_finalized.eq(excluded(token_transfer::is_finalized)),
110 ))
111 .filter(token_transfer::is_finalized.eq(false))
112 .execute(conn)
113 .await?;
114 diesel::insert_into(sui_error_transactions::table)
115 .values(&errors)
116 .on_conflict_do_nothing()
117 .execute(conn)
118 .await?;
119 diesel::insert_into(governance_actions::table)
120 .values(&gov_actions)
121 .on_conflict_do_nothing()
122 .execute(conn)
123 .await
124 }
125 .scope_boxed()
126 })
127 .await?;
128 Ok(())
129}
130
131pub async fn update_sui_progress_store(
132 pool: &PgPool,
133 tx_digest: TransactionDigest,
134) -> Result<(), anyhow::Error> {
135 let mut conn = pool.get().await?;
136 diesel::insert_into(schema::sui_progress_store::table)
137 .values(&SuiProgressStore {
138 id: SUI_PROGRESS_STORE_DUMMY_KEY,
139 txn_digest: tx_digest.inner().to_vec(),
140 })
141 .on_conflict(schema::sui_progress_store::dsl::id)
142 .do_update()
143 .set(txn_digest.eq(tx_digest.inner().to_vec()))
144 .execute(&mut conn)
145 .await?;
146 Ok(())
147}
148
149pub async fn read_sui_progress_store(pool: &PgPool) -> anyhow::Result<Option<TransactionDigest>> {
150 let mut conn = pool.get().await?;
151 let val: Option<SuiProgressStore> = schema::sui_progress_store::dsl::sui_progress_store
152 .select(SuiProgressStore::as_select())
153 .first(&mut conn)
154 .await
155 .optional()?;
156 match val {
157 Some(val) => Ok(Some(TransactionDigest::try_from(
158 val.txn_digest.as_slice(),
159 )?)),
160 None => Ok(None),
161 }
162}