sui_bridge_indexer/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::config::IndexerConfig;
5use crate::eth_bridge_indexer::{
6    EthDataMapper, EthFinalizedSyncDatasource, EthSubscriptionDatasource,
7};
8use crate::metrics::BridgeIndexerMetrics;
9use crate::postgres_manager::PgPool;
10use crate::storage::PgBridgePersistent;
11use alloy::primitives::Address as EthAddress;
12use std::str::FromStr;
13use std::sync::Arc;
14use sui_bridge::eth_client::EthClient;
15use sui_bridge::metrics::BridgeMetrics;
16use sui_bridge::utils::{get_eth_contract_addresses, get_eth_provider};
17use sui_bridge_schema::models::{
18    BridgeDataSource, GovernanceAction as DBGovernanceAction, TokenTransferStatus,
19};
20use sui_bridge_schema::models::{GovernanceActionType, TokenTransferData as DBTokenTransferData};
21use sui_bridge_schema::models::{SuiErrorTransactions, TokenTransfer as DBTokenTransfer};
22use sui_types::base_types::{SuiAddress, TransactionDigest};
23
24pub mod config;
25pub mod metrics;
26pub mod postgres_manager;
27mod storage;
28
29mod eth_bridge_indexer;
30
31use indexer_builder::{BackfillStrategy, Datasource, Indexer, IndexerBuilder};
32use metrics::IndexerMetricProvider;
33use progress::{ProgressSavingPolicy, SaveAfterDurationPolicy};
34
35#[derive(Clone)]
36pub enum ProcessedTxnData {
37    TokenTransfer(TokenTransfer),
38    GovernanceAction(GovernanceAction),
39    Error(SuiTxnError),
40}
41
42#[derive(Clone)]
43pub struct SuiTxnError {
44    tx_digest: TransactionDigest,
45    sender: SuiAddress,
46    timestamp_ms: u64,
47    failure_status: String,
48    cmd_idx: Option<u64>,
49}
50
51#[derive(Clone)]
52pub struct TokenTransfer {
53    chain_id: u8,
54    nonce: u64,
55    block_height: u64,
56    timestamp_ms: u64,
57    txn_hash: Vec<u8>,
58    txn_sender: Vec<u8>,
59    status: TokenTransferStatus,
60    gas_usage: i64,
61    data_source: BridgeDataSource,
62    data: Option<TokenTransferData>,
63    is_finalized: bool,
64}
65
66#[derive(Clone)]
67pub struct GovernanceAction {
68    nonce: Option<u64>,
69    data_source: BridgeDataSource,
70    tx_digest: Vec<u8>,
71    sender: Vec<u8>,
72    timestamp_ms: u64,
73    action: GovernanceActionType,
74    data: serde_json::Value,
75}
76
77#[derive(Clone)]
78pub struct TokenTransferData {
79    sender_address: Vec<u8>,
80    destination_chain: u8,
81    recipient_address: Vec<u8>,
82    token_id: u8,
83    amount: u64,
84    is_finalized: bool,
85    /// For V2 transfers, the timestamp (in ms) from the bridge message payload.
86    /// `None` for V1 transfers.
87    message_timestamp_ms: Option<u64>,
88}
89
90impl TokenTransfer {
91    fn to_db(&self) -> DBTokenTransfer {
92        DBTokenTransfer {
93            chain_id: self.chain_id as i32,
94            nonce: self.nonce as i64,
95            block_height: self.block_height as i64,
96            timestamp_ms: self.timestamp_ms as i64,
97            txn_hash: self.txn_hash.clone(),
98            txn_sender: self.txn_sender.clone(),
99            status: self.status,
100            gas_usage: self.gas_usage,
101            data_source: self.data_source,
102            is_finalized: self.is_finalized,
103        }
104    }
105
106    fn to_data_maybe(&self) -> Option<DBTokenTransferData> {
107        self.data.as_ref().map(|data| DBTokenTransferData {
108            chain_id: self.chain_id as i32,
109            nonce: self.nonce as i64,
110            block_height: self.block_height as i64,
111            timestamp_ms: self.timestamp_ms as i64,
112            txn_hash: self.txn_hash.clone(),
113            sender_address: data.sender_address.clone(),
114            destination_chain: data.destination_chain as i32,
115            recipient_address: data.recipient_address.clone(),
116            token_id: data.token_id as i32,
117            amount: data.amount as i64,
118            is_finalized: data.is_finalized,
119            message_timestamp_ms: data.message_timestamp_ms.map(|ts| ts as i64),
120        })
121    }
122}
123
124impl SuiTxnError {
125    fn to_db(&self) -> SuiErrorTransactions {
126        SuiErrorTransactions {
127            txn_digest: self.tx_digest.inner().to_vec(),
128            sender_address: self.sender.to_vec(),
129            timestamp_ms: self.timestamp_ms as i64,
130            failure_status: self.failure_status.clone(),
131            cmd_idx: self.cmd_idx.map(|idx| idx as i64),
132        }
133    }
134}
135
136impl GovernanceAction {
137    fn to_db(&self) -> DBGovernanceAction {
138        DBGovernanceAction {
139            nonce: self.nonce.map(|nonce| nonce as i64),
140            data_source: self.data_source,
141            txn_digest: self.tx_digest.clone(),
142            sender_address: self.sender.to_vec(),
143            timestamp_ms: self.timestamp_ms as i64,
144            action: self.action,
145            data: self.data.clone(),
146        }
147    }
148}
149
150pub async fn create_eth_sync_indexer(
151    pool: PgPool,
152    metrics: BridgeIndexerMetrics,
153    bridge_metrics: Arc<BridgeMetrics>,
154    config: &IndexerConfig,
155    eth_client: Arc<EthClient>,
156) -> Result<Indexer<PgBridgePersistent, EthFinalizedSyncDatasource, EthDataMapper>, anyhow::Error> {
157    let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
158    // Start the eth sync data source
159    let eth_sync_datasource = EthFinalizedSyncDatasource::new(
160        bridge_addresses,
161        eth_client.clone(),
162        config.eth_rpc_url.clone(),
163        metrics.clone().boxed(),
164        bridge_metrics.clone(),
165        config.eth_bridge_genesis_block,
166    )
167    .await?;
168    Ok(create_eth_indexer_builder(
169        pool,
170        metrics,
171        eth_sync_datasource,
172        "EthBridgeFinalizedSyncIndexer",
173    )
174    .await?
175    .with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
176    .build())
177}
178
179pub async fn create_eth_subscription_indexer(
180    pool: PgPool,
181    metrics: BridgeIndexerMetrics,
182    config: &IndexerConfig,
183    eth_client: Arc<EthClient>,
184) -> Result<Indexer<PgBridgePersistent, EthSubscriptionDatasource, EthDataMapper>, anyhow::Error> {
185    // Start the eth subscription indexer
186    let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
187    // Start the eth subscription indexer
188    let eth_subscription_datasource = EthSubscriptionDatasource::new(
189        bridge_addresses.clone(),
190        eth_client.clone(),
191        config.eth_ws_url.clone(),
192        metrics.clone().boxed(),
193        config.eth_bridge_genesis_block,
194    )
195    .await?;
196
197    Ok(create_eth_indexer_builder(
198        pool,
199        metrics,
200        eth_subscription_datasource,
201        "EthBridgeSubscriptionIndexer",
202    )
203    .await?
204    .with_backfill_strategy(BackfillStrategy::Disabled)
205    .build())
206}
207
208async fn create_eth_indexer_builder<T: Send, D: Datasource<T>>(
209    pool: PgPool,
210    metrics: BridgeIndexerMetrics,
211    datasource: D,
212    indexer_name: &str,
213) -> Result<IndexerBuilder<D, EthDataMapper, PgBridgePersistent>, anyhow::Error> {
214    let datastore = PgBridgePersistent::new(
215        pool,
216        ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
217            tokio::time::Duration::from_secs(30),
218        )),
219    );
220
221    // Start the eth subscription indexer
222    Ok(IndexerBuilder::new(
223        indexer_name,
224        datasource,
225        EthDataMapper { metrics },
226        datastore.clone(),
227    ))
228}
229
230async fn get_eth_bridge_contract_addresses(
231    config: &IndexerConfig,
232) -> Result<Vec<EthAddress>, anyhow::Error> {
233    let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
234    let eth_provider = get_eth_provider(&config.eth_rpc_url)?;
235    let bridge_addresses = get_eth_contract_addresses(bridge_address, eth_provider).await?;
236    Ok(vec![
237        bridge_address,
238        bridge_addresses.0,
239        bridge_addresses.1,
240        bridge_addresses.2,
241        bridge_addresses.3,
242    ])
243}
244
245// inline old sui-indexer-builder
246
247mod indexer_builder;
248mod progress;
249const LIVE_TASK_TARGET_CHECKPOINT: i64 = i64::MAX;
250
251#[derive(Clone, Debug)]
252pub struct Task {
253    pub task_name: String,
254    pub start_checkpoint: u64,
255    pub target_checkpoint: u64,
256    pub timestamp: u64,
257    pub is_live_task: bool,
258}
259
260impl Task {
261    // TODO: this is really fragile and we should fix the task naming thing and storage schema asasp
262    pub fn name_prefix(&self) -> &str {
263        self.task_name.split(' ').next().unwrap_or("Unknown")
264    }
265
266    pub fn type_str(&self) -> &str {
267        if self.is_live_task {
268            "live"
269        } else {
270            "backfill"
271        }
272    }
273}
274
275#[derive(Clone, Debug)]
276pub struct Tasks {
277    live_task: Option<Task>,
278    backfill_tasks: Vec<Task>,
279}
280
281impl Tasks {
282    pub fn new(tasks: Vec<Task>) -> anyhow::Result<Self> {
283        let mut live_tasks = vec![];
284        let mut backfill_tasks = vec![];
285        for task in tasks {
286            if task.is_live_task {
287                live_tasks.push(task);
288            } else {
289                backfill_tasks.push(task);
290            }
291        }
292        if live_tasks.len() > 1 {
293            anyhow::bail!("More than one live task found: {:?}", live_tasks);
294        }
295        Ok(Self {
296            live_task: live_tasks.pop(),
297            backfill_tasks,
298        })
299    }
300
301    pub fn live_task(&self) -> Option<Task> {
302        self.live_task.clone()
303    }
304
305    pub fn backfill_tasks_ordered_desc(&self) -> Vec<Task> {
306        let mut tasks = self.backfill_tasks.clone();
307        tasks.sort_by(|t1, t2| t2.start_checkpoint.cmp(&t1.start_checkpoint));
308        tasks
309    }
310}