sui_bridge_indexer/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::config::IndexerConfig;
5use crate::eth_bridge_indexer::{
6    EthDataMapper, EthFinalizedSyncDatasource, EthSubscriptionDatasource,
7};
8use crate::metrics::BridgeIndexerMetrics;
9use crate::postgres_manager::PgPool;
10use crate::storage::PgBridgePersistent;
11use alloy::primitives::Address as EthAddress;
12use std::str::FromStr;
13use std::sync::Arc;
14use sui_bridge::eth_client::EthClient;
15use sui_bridge::metrics::BridgeMetrics;
16use sui_bridge::utils::{get_eth_contract_addresses, get_eth_provider};
17use sui_bridge_schema::models::{
18    BridgeDataSource, GovernanceAction as DBGovernanceAction, TokenTransferStatus,
19};
20use sui_bridge_schema::models::{GovernanceActionType, TokenTransferData as DBTokenTransferData};
21use sui_bridge_schema::models::{SuiErrorTransactions, TokenTransfer as DBTokenTransfer};
22use sui_types::base_types::{SuiAddress, TransactionDigest};
23
24pub mod config;
25pub mod metrics;
26pub mod postgres_manager;
27mod storage;
28
29mod eth_bridge_indexer;
30
31use indexer_builder::{BackfillStrategy, Datasource, Indexer, IndexerBuilder};
32use metrics::IndexerMetricProvider;
33use progress::{ProgressSavingPolicy, SaveAfterDurationPolicy};
34
35#[derive(Clone)]
36pub enum ProcessedTxnData {
37    TokenTransfer(TokenTransfer),
38    GovernanceAction(GovernanceAction),
39    Error(SuiTxnError),
40}
41
42#[derive(Clone)]
43pub struct SuiTxnError {
44    tx_digest: TransactionDigest,
45    sender: SuiAddress,
46    timestamp_ms: u64,
47    failure_status: String,
48    cmd_idx: Option<u64>,
49}
50
51#[derive(Clone)]
52pub struct TokenTransfer {
53    chain_id: u8,
54    nonce: u64,
55    block_height: u64,
56    timestamp_ms: u64,
57    txn_hash: Vec<u8>,
58    txn_sender: Vec<u8>,
59    status: TokenTransferStatus,
60    gas_usage: i64,
61    data_source: BridgeDataSource,
62    data: Option<TokenTransferData>,
63    is_finalized: bool,
64}
65
66#[derive(Clone)]
67pub struct GovernanceAction {
68    nonce: Option<u64>,
69    data_source: BridgeDataSource,
70    tx_digest: Vec<u8>,
71    sender: Vec<u8>,
72    timestamp_ms: u64,
73    action: GovernanceActionType,
74    data: serde_json::Value,
75}
76
77#[derive(Clone)]
78pub struct TokenTransferData {
79    sender_address: Vec<u8>,
80    destination_chain: u8,
81    recipient_address: Vec<u8>,
82    token_id: u8,
83    amount: u64,
84    is_finalized: bool,
85}
86
87impl TokenTransfer {
88    fn to_db(&self) -> DBTokenTransfer {
89        DBTokenTransfer {
90            chain_id: self.chain_id as i32,
91            nonce: self.nonce as i64,
92            block_height: self.block_height as i64,
93            timestamp_ms: self.timestamp_ms as i64,
94            txn_hash: self.txn_hash.clone(),
95            txn_sender: self.txn_sender.clone(),
96            status: self.status,
97            gas_usage: self.gas_usage,
98            data_source: self.data_source,
99            is_finalized: self.is_finalized,
100        }
101    }
102
103    fn to_data_maybe(&self) -> Option<DBTokenTransferData> {
104        self.data.as_ref().map(|data| DBTokenTransferData {
105            chain_id: self.chain_id as i32,
106            nonce: self.nonce as i64,
107            block_height: self.block_height as i64,
108            timestamp_ms: self.timestamp_ms as i64,
109            txn_hash: self.txn_hash.clone(),
110            sender_address: data.sender_address.clone(),
111            destination_chain: data.destination_chain as i32,
112            recipient_address: data.recipient_address.clone(),
113            token_id: data.token_id as i32,
114            amount: data.amount as i64,
115            is_finalized: data.is_finalized,
116        })
117    }
118}
119
120impl SuiTxnError {
121    fn to_db(&self) -> SuiErrorTransactions {
122        SuiErrorTransactions {
123            txn_digest: self.tx_digest.inner().to_vec(),
124            sender_address: self.sender.to_vec(),
125            timestamp_ms: self.timestamp_ms as i64,
126            failure_status: self.failure_status.clone(),
127            cmd_idx: self.cmd_idx.map(|idx| idx as i64),
128        }
129    }
130}
131
132impl GovernanceAction {
133    fn to_db(&self) -> DBGovernanceAction {
134        DBGovernanceAction {
135            nonce: self.nonce.map(|nonce| nonce as i64),
136            data_source: self.data_source,
137            txn_digest: self.tx_digest.clone(),
138            sender_address: self.sender.to_vec(),
139            timestamp_ms: self.timestamp_ms as i64,
140            action: self.action,
141            data: self.data.clone(),
142        }
143    }
144}
145
146pub async fn create_eth_sync_indexer(
147    pool: PgPool,
148    metrics: BridgeIndexerMetrics,
149    bridge_metrics: Arc<BridgeMetrics>,
150    config: &IndexerConfig,
151    eth_client: Arc<EthClient>,
152) -> Result<Indexer<PgBridgePersistent, EthFinalizedSyncDatasource, EthDataMapper>, anyhow::Error> {
153    let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
154    // Start the eth sync data source
155    let eth_sync_datasource = EthFinalizedSyncDatasource::new(
156        bridge_addresses,
157        eth_client.clone(),
158        config.eth_rpc_url.clone(),
159        metrics.clone().boxed(),
160        bridge_metrics.clone(),
161        config.eth_bridge_genesis_block,
162    )
163    .await?;
164    Ok(create_eth_indexer_builder(
165        pool,
166        metrics,
167        eth_sync_datasource,
168        "EthBridgeFinalizedSyncIndexer",
169    )
170    .await?
171    .with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
172    .build())
173}
174
175pub async fn create_eth_subscription_indexer(
176    pool: PgPool,
177    metrics: BridgeIndexerMetrics,
178    config: &IndexerConfig,
179    eth_client: Arc<EthClient>,
180) -> Result<Indexer<PgBridgePersistent, EthSubscriptionDatasource, EthDataMapper>, anyhow::Error> {
181    // Start the eth subscription indexer
182    let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
183    // Start the eth subscription indexer
184    let eth_subscription_datasource = EthSubscriptionDatasource::new(
185        bridge_addresses.clone(),
186        eth_client.clone(),
187        config.eth_ws_url.clone(),
188        metrics.clone().boxed(),
189        config.eth_bridge_genesis_block,
190    )
191    .await?;
192
193    Ok(create_eth_indexer_builder(
194        pool,
195        metrics,
196        eth_subscription_datasource,
197        "EthBridgeSubscriptionIndexer",
198    )
199    .await?
200    .with_backfill_strategy(BackfillStrategy::Disabled)
201    .build())
202}
203
204async fn create_eth_indexer_builder<T: Send, D: Datasource<T>>(
205    pool: PgPool,
206    metrics: BridgeIndexerMetrics,
207    datasource: D,
208    indexer_name: &str,
209) -> Result<IndexerBuilder<D, EthDataMapper, PgBridgePersistent>, anyhow::Error> {
210    let datastore = PgBridgePersistent::new(
211        pool,
212        ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
213            tokio::time::Duration::from_secs(30),
214        )),
215    );
216
217    // Start the eth subscription indexer
218    Ok(IndexerBuilder::new(
219        indexer_name,
220        datasource,
221        EthDataMapper { metrics },
222        datastore.clone(),
223    ))
224}
225
226async fn get_eth_bridge_contract_addresses(
227    config: &IndexerConfig,
228) -> Result<Vec<EthAddress>, anyhow::Error> {
229    let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
230    let eth_provider = get_eth_provider(&config.eth_rpc_url)?;
231    let bridge_addresses = get_eth_contract_addresses(bridge_address, eth_provider).await?;
232    Ok(vec![
233        bridge_address,
234        bridge_addresses.0,
235        bridge_addresses.1,
236        bridge_addresses.2,
237        bridge_addresses.3,
238    ])
239}
240
241// inline old sui-indexer-builder
242
243mod indexer_builder;
244mod progress;
245const LIVE_TASK_TARGET_CHECKPOINT: i64 = i64::MAX;
246
247#[derive(Clone, Debug)]
248pub struct Task {
249    pub task_name: String,
250    pub start_checkpoint: u64,
251    pub target_checkpoint: u64,
252    pub timestamp: u64,
253    pub is_live_task: bool,
254}
255
256impl Task {
257    // TODO: this is really fragile and we should fix the task naming thing and storage schema asasp
258    pub fn name_prefix(&self) -> &str {
259        self.task_name.split(' ').next().unwrap_or("Unknown")
260    }
261
262    pub fn type_str(&self) -> &str {
263        if self.is_live_task {
264            "live"
265        } else {
266            "backfill"
267        }
268    }
269}
270
271#[derive(Clone, Debug)]
272pub struct Tasks {
273    live_task: Option<Task>,
274    backfill_tasks: Vec<Task>,
275}
276
277impl Tasks {
278    pub fn new(tasks: Vec<Task>) -> anyhow::Result<Self> {
279        let mut live_tasks = vec![];
280        let mut backfill_tasks = vec![];
281        for task in tasks {
282            if task.is_live_task {
283                live_tasks.push(task);
284            } else {
285                backfill_tasks.push(task);
286            }
287        }
288        if live_tasks.len() > 1 {
289            anyhow::bail!("More than one live task found: {:?}", live_tasks);
290        }
291        Ok(Self {
292            live_task: live_tasks.pop(),
293            backfill_tasks,
294        })
295    }
296
297    pub fn live_task(&self) -> Option<Task> {
298        self.live_task.clone()
299    }
300
301    pub fn backfill_tasks_ordered_desc(&self) -> Vec<Task> {
302        let mut tasks = self.backfill_tasks.clone();
303        tasks.sort_by(|t1, t2| t2.start_checkpoint.cmp(&t1.start_checkpoint));
304        tasks
305    }
306}