sui_bridge_indexer/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::config::IndexerConfig;
5use crate::eth_bridge_indexer::{
6    EthDataMapper, EthFinalizedSyncDatasource, EthSubscriptionDatasource,
7};
8use crate::metrics::BridgeIndexerMetrics;
9use crate::postgres_manager::PgPool;
10use crate::storage::PgBridgePersistent;
11use crate::sui_bridge_indexer::SuiBridgeDataMapper;
12use ethers::providers::{Http, Provider};
13use ethers::types::Address as EthAddress;
14use std::str::FromStr;
15use std::sync::Arc;
16use sui_bridge::eth_client::EthClient;
17use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
18use sui_bridge::metrics::BridgeMetrics;
19use sui_bridge::utils::get_eth_contract_addresses;
20use sui_bridge_schema::models::{
21    BridgeDataSource, GovernanceAction as DBGovernanceAction, TokenTransferStatus,
22};
23use sui_bridge_schema::models::{GovernanceActionType, TokenTransferData as DBTokenTransferData};
24use sui_bridge_schema::models::{SuiErrorTransactions, TokenTransfer as DBTokenTransfer};
25use sui_data_ingestion_core::DataIngestionMetrics;
26use sui_indexer_builder::indexer_builder::{BackfillStrategy, Datasource, Indexer, IndexerBuilder};
27use sui_indexer_builder::metrics::IndexerMetricProvider;
28use sui_indexer_builder::progress::{
29    OutOfOrderSaveAfterDurationPolicy, ProgressSavingPolicy, SaveAfterDurationPolicy,
30};
31use sui_indexer_builder::sui_datasource::SuiCheckpointDatasource;
32use sui_sdk::SuiClientBuilder;
33use sui_types::base_types::{SuiAddress, TransactionDigest};
34
35pub mod config;
36pub mod metrics;
37pub mod postgres_manager;
38pub mod storage;
39pub mod sui_transaction_handler;
40pub mod sui_transaction_queries;
41pub mod types;
42
43pub mod eth_bridge_indexer;
44pub mod sui_bridge_indexer;
45
46#[derive(Clone)]
47pub enum ProcessedTxnData {
48    TokenTransfer(TokenTransfer),
49    GovernanceAction(GovernanceAction),
50    Error(SuiTxnError),
51}
52
53#[derive(Clone)]
54pub struct SuiTxnError {
55    tx_digest: TransactionDigest,
56    sender: SuiAddress,
57    timestamp_ms: u64,
58    failure_status: String,
59    cmd_idx: Option<u64>,
60}
61
62#[derive(Clone)]
63pub struct TokenTransfer {
64    chain_id: u8,
65    nonce: u64,
66    block_height: u64,
67    timestamp_ms: u64,
68    txn_hash: Vec<u8>,
69    txn_sender: Vec<u8>,
70    status: TokenTransferStatus,
71    gas_usage: i64,
72    data_source: BridgeDataSource,
73    data: Option<TokenTransferData>,
74    is_finalized: bool,
75}
76
77#[derive(Clone)]
78pub struct GovernanceAction {
79    nonce: Option<u64>,
80    data_source: BridgeDataSource,
81    tx_digest: Vec<u8>,
82    sender: Vec<u8>,
83    timestamp_ms: u64,
84    action: GovernanceActionType,
85    data: serde_json::Value,
86}
87
88#[derive(Clone)]
89pub struct TokenTransferData {
90    sender_address: Vec<u8>,
91    destination_chain: u8,
92    recipient_address: Vec<u8>,
93    token_id: u8,
94    amount: u64,
95    is_finalized: bool,
96}
97
98impl TokenTransfer {
99    fn to_db(&self) -> DBTokenTransfer {
100        DBTokenTransfer {
101            chain_id: self.chain_id as i32,
102            nonce: self.nonce as i64,
103            block_height: self.block_height as i64,
104            timestamp_ms: self.timestamp_ms as i64,
105            txn_hash: self.txn_hash.clone(),
106            txn_sender: self.txn_sender.clone(),
107            status: self.status,
108            gas_usage: self.gas_usage,
109            data_source: self.data_source,
110            is_finalized: self.is_finalized,
111        }
112    }
113
114    fn to_data_maybe(&self) -> Option<DBTokenTransferData> {
115        self.data.as_ref().map(|data| DBTokenTransferData {
116            chain_id: self.chain_id as i32,
117            nonce: self.nonce as i64,
118            block_height: self.block_height as i64,
119            timestamp_ms: self.timestamp_ms as i64,
120            txn_hash: self.txn_hash.clone(),
121            sender_address: data.sender_address.clone(),
122            destination_chain: data.destination_chain as i32,
123            recipient_address: data.recipient_address.clone(),
124            token_id: data.token_id as i32,
125            amount: data.amount as i64,
126            is_finalized: data.is_finalized,
127        })
128    }
129}
130
131impl SuiTxnError {
132    fn to_db(&self) -> SuiErrorTransactions {
133        SuiErrorTransactions {
134            txn_digest: self.tx_digest.inner().to_vec(),
135            sender_address: self.sender.to_vec(),
136            timestamp_ms: self.timestamp_ms as i64,
137            failure_status: self.failure_status.clone(),
138            cmd_idx: self.cmd_idx.map(|idx| idx as i64),
139        }
140    }
141}
142
143impl GovernanceAction {
144    fn to_db(&self) -> DBGovernanceAction {
145        DBGovernanceAction {
146            nonce: self.nonce.map(|nonce| nonce as i64),
147            data_source: self.data_source,
148            txn_digest: self.tx_digest.clone(),
149            sender_address: self.sender.to_vec(),
150            timestamp_ms: self.timestamp_ms as i64,
151            action: self.action,
152            data: self.data.clone(),
153        }
154    }
155}
156
157pub async fn create_sui_indexer(
158    pool: PgPool,
159    metrics: BridgeIndexerMetrics,
160    ingestion_metrics: DataIngestionMetrics,
161    config: &IndexerConfig,
162) -> anyhow::Result<
163    Indexer<PgBridgePersistent, SuiCheckpointDatasource, SuiBridgeDataMapper>,
164    anyhow::Error,
165> {
166    let datastore_with_out_of_order_source = PgBridgePersistent::new(
167        pool,
168        ProgressSavingPolicy::OutOfOrderSaveAfterDuration(OutOfOrderSaveAfterDurationPolicy::new(
169            tokio::time::Duration::from_secs(30),
170        )),
171    );
172
173    let sui_client = Arc::new(
174        SuiClientBuilder::default()
175            .build(config.sui_rpc_url.clone())
176            .await?,
177    );
178
179    let sui_checkpoint_datasource = SuiCheckpointDatasource::new(
180        config.remote_store_url.clone(),
181        sui_client,
182        config.concurrency as usize,
183        config
184            .checkpoints_path
185            .clone()
186            .map(|p| p.into())
187            .unwrap_or(tempfile::tempdir()?.keep()),
188        config.sui_bridge_genesis_checkpoint,
189        ingestion_metrics,
190        metrics.clone().boxed(),
191    );
192
193    Ok(IndexerBuilder::new(
194        "SuiBridgeIndexer",
195        sui_checkpoint_datasource,
196        SuiBridgeDataMapper { metrics },
197        datastore_with_out_of_order_source,
198    )
199    .build())
200}
201
202pub async fn create_eth_sync_indexer(
203    pool: PgPool,
204    metrics: BridgeIndexerMetrics,
205    bridge_metrics: Arc<BridgeMetrics>,
206    config: &IndexerConfig,
207    eth_client: Arc<EthClient<MeteredEthHttpProvier>>,
208) -> Result<Indexer<PgBridgePersistent, EthFinalizedSyncDatasource, EthDataMapper>, anyhow::Error> {
209    let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
210    // Start the eth sync data source
211    let eth_sync_datasource = EthFinalizedSyncDatasource::new(
212        bridge_addresses,
213        eth_client.clone(),
214        config.eth_rpc_url.clone(),
215        metrics.clone().boxed(),
216        bridge_metrics.clone(),
217        config.eth_bridge_genesis_block,
218    )
219    .await?;
220    Ok(create_eth_indexer_builder(
221        pool,
222        metrics,
223        eth_sync_datasource,
224        "EthBridgeFinalizedSyncIndexer",
225    )
226    .await?
227    .with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
228    .build())
229}
230
231pub async fn create_eth_subscription_indexer(
232    pool: PgPool,
233    metrics: BridgeIndexerMetrics,
234    config: &IndexerConfig,
235    eth_client: Arc<EthClient<MeteredEthHttpProvier>>,
236) -> Result<Indexer<PgBridgePersistent, EthSubscriptionDatasource, EthDataMapper>, anyhow::Error> {
237    // Start the eth subscription indexer
238    let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
239    // Start the eth subscription indexer
240    let eth_subscription_datasource = EthSubscriptionDatasource::new(
241        bridge_addresses.clone(),
242        eth_client.clone(),
243        config.eth_ws_url.clone(),
244        metrics.clone().boxed(),
245        config.eth_bridge_genesis_block,
246    )
247    .await?;
248
249    Ok(create_eth_indexer_builder(
250        pool,
251        metrics,
252        eth_subscription_datasource,
253        "EthBridgeSubscriptionIndexer",
254    )
255    .await?
256    .with_backfill_strategy(BackfillStrategy::Disabled)
257    .build())
258}
259
260async fn create_eth_indexer_builder<T: Send, D: Datasource<T>>(
261    pool: PgPool,
262    metrics: BridgeIndexerMetrics,
263    datasource: D,
264    indexer_name: &str,
265) -> Result<IndexerBuilder<D, EthDataMapper, PgBridgePersistent>, anyhow::Error> {
266    let datastore = PgBridgePersistent::new(
267        pool,
268        ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
269            tokio::time::Duration::from_secs(30),
270        )),
271    );
272
273    // Start the eth subscription indexer
274    Ok(IndexerBuilder::new(
275        indexer_name,
276        datasource,
277        EthDataMapper { metrics },
278        datastore.clone(),
279    ))
280}
281
282async fn get_eth_bridge_contract_addresses(
283    config: &IndexerConfig,
284) -> Result<Vec<EthAddress>, anyhow::Error> {
285    let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
286    let provider = Arc::new(
287        Provider::<Http>::try_from(&config.eth_rpc_url)?
288            .interval(std::time::Duration::from_millis(2000)),
289    );
290    let bridge_addresses = get_eth_contract_addresses(bridge_address, &provider).await?;
291    Ok(vec![
292        bridge_address,
293        bridge_addresses.0,
294        bridge_addresses.1,
295        bridge_addresses.2,
296        bridge_addresses.3,
297    ])
298}