1use crate::config::IndexerConfig;
5use crate::eth_bridge_indexer::{
6 EthDataMapper, EthFinalizedSyncDatasource, EthSubscriptionDatasource,
7};
8use crate::metrics::BridgeIndexerMetrics;
9use crate::postgres_manager::PgPool;
10use crate::storage::PgBridgePersistent;
11use crate::sui_bridge_indexer::SuiBridgeDataMapper;
12use ethers::providers::{Http, Provider};
13use ethers::types::Address as EthAddress;
14use std::str::FromStr;
15use std::sync::Arc;
16use sui_bridge::eth_client::EthClient;
17use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
18use sui_bridge::metrics::BridgeMetrics;
19use sui_bridge::utils::get_eth_contract_addresses;
20use sui_bridge_schema::models::{
21 BridgeDataSource, GovernanceAction as DBGovernanceAction, TokenTransferStatus,
22};
23use sui_bridge_schema::models::{GovernanceActionType, TokenTransferData as DBTokenTransferData};
24use sui_bridge_schema::models::{SuiErrorTransactions, TokenTransfer as DBTokenTransfer};
25use sui_data_ingestion_core::DataIngestionMetrics;
26use sui_indexer_builder::indexer_builder::{BackfillStrategy, Datasource, Indexer, IndexerBuilder};
27use sui_indexer_builder::metrics::IndexerMetricProvider;
28use sui_indexer_builder::progress::{
29 OutOfOrderSaveAfterDurationPolicy, ProgressSavingPolicy, SaveAfterDurationPolicy,
30};
31use sui_indexer_builder::sui_datasource::SuiCheckpointDatasource;
32use sui_sdk::SuiClientBuilder;
33use sui_types::base_types::{SuiAddress, TransactionDigest};
34
35pub mod config;
36pub mod metrics;
37pub mod postgres_manager;
38pub mod storage;
39pub mod sui_transaction_handler;
40pub mod sui_transaction_queries;
41pub mod types;
42
43pub mod eth_bridge_indexer;
44pub mod sui_bridge_indexer;
45
46#[derive(Clone)]
47pub enum ProcessedTxnData {
48 TokenTransfer(TokenTransfer),
49 GovernanceAction(GovernanceAction),
50 Error(SuiTxnError),
51}
52
53#[derive(Clone)]
54pub struct SuiTxnError {
55 tx_digest: TransactionDigest,
56 sender: SuiAddress,
57 timestamp_ms: u64,
58 failure_status: String,
59 cmd_idx: Option<u64>,
60}
61
62#[derive(Clone)]
63pub struct TokenTransfer {
64 chain_id: u8,
65 nonce: u64,
66 block_height: u64,
67 timestamp_ms: u64,
68 txn_hash: Vec<u8>,
69 txn_sender: Vec<u8>,
70 status: TokenTransferStatus,
71 gas_usage: i64,
72 data_source: BridgeDataSource,
73 data: Option<TokenTransferData>,
74 is_finalized: bool,
75}
76
77#[derive(Clone)]
78pub struct GovernanceAction {
79 nonce: Option<u64>,
80 data_source: BridgeDataSource,
81 tx_digest: Vec<u8>,
82 sender: Vec<u8>,
83 timestamp_ms: u64,
84 action: GovernanceActionType,
85 data: serde_json::Value,
86}
87
88#[derive(Clone)]
89pub struct TokenTransferData {
90 sender_address: Vec<u8>,
91 destination_chain: u8,
92 recipient_address: Vec<u8>,
93 token_id: u8,
94 amount: u64,
95 is_finalized: bool,
96}
97
98impl TokenTransfer {
99 fn to_db(&self) -> DBTokenTransfer {
100 DBTokenTransfer {
101 chain_id: self.chain_id as i32,
102 nonce: self.nonce as i64,
103 block_height: self.block_height as i64,
104 timestamp_ms: self.timestamp_ms as i64,
105 txn_hash: self.txn_hash.clone(),
106 txn_sender: self.txn_sender.clone(),
107 status: self.status,
108 gas_usage: self.gas_usage,
109 data_source: self.data_source,
110 is_finalized: self.is_finalized,
111 }
112 }
113
114 fn to_data_maybe(&self) -> Option<DBTokenTransferData> {
115 self.data.as_ref().map(|data| DBTokenTransferData {
116 chain_id: self.chain_id as i32,
117 nonce: self.nonce as i64,
118 block_height: self.block_height as i64,
119 timestamp_ms: self.timestamp_ms as i64,
120 txn_hash: self.txn_hash.clone(),
121 sender_address: data.sender_address.clone(),
122 destination_chain: data.destination_chain as i32,
123 recipient_address: data.recipient_address.clone(),
124 token_id: data.token_id as i32,
125 amount: data.amount as i64,
126 is_finalized: data.is_finalized,
127 })
128 }
129}
130
131impl SuiTxnError {
132 fn to_db(&self) -> SuiErrorTransactions {
133 SuiErrorTransactions {
134 txn_digest: self.tx_digest.inner().to_vec(),
135 sender_address: self.sender.to_vec(),
136 timestamp_ms: self.timestamp_ms as i64,
137 failure_status: self.failure_status.clone(),
138 cmd_idx: self.cmd_idx.map(|idx| idx as i64),
139 }
140 }
141}
142
143impl GovernanceAction {
144 fn to_db(&self) -> DBGovernanceAction {
145 DBGovernanceAction {
146 nonce: self.nonce.map(|nonce| nonce as i64),
147 data_source: self.data_source,
148 txn_digest: self.tx_digest.clone(),
149 sender_address: self.sender.to_vec(),
150 timestamp_ms: self.timestamp_ms as i64,
151 action: self.action,
152 data: self.data.clone(),
153 }
154 }
155}
156
157pub async fn create_sui_indexer(
158 pool: PgPool,
159 metrics: BridgeIndexerMetrics,
160 ingestion_metrics: DataIngestionMetrics,
161 config: &IndexerConfig,
162) -> anyhow::Result<
163 Indexer<PgBridgePersistent, SuiCheckpointDatasource, SuiBridgeDataMapper>,
164 anyhow::Error,
165> {
166 let datastore_with_out_of_order_source = PgBridgePersistent::new(
167 pool,
168 ProgressSavingPolicy::OutOfOrderSaveAfterDuration(OutOfOrderSaveAfterDurationPolicy::new(
169 tokio::time::Duration::from_secs(30),
170 )),
171 );
172
173 let sui_client = Arc::new(
174 SuiClientBuilder::default()
175 .build(config.sui_rpc_url.clone())
176 .await?,
177 );
178
179 let sui_checkpoint_datasource = SuiCheckpointDatasource::new(
180 config.remote_store_url.clone(),
181 sui_client,
182 config.concurrency as usize,
183 config
184 .checkpoints_path
185 .clone()
186 .map(|p| p.into())
187 .unwrap_or(tempfile::tempdir()?.keep()),
188 config.sui_bridge_genesis_checkpoint,
189 ingestion_metrics,
190 metrics.clone().boxed(),
191 );
192
193 Ok(IndexerBuilder::new(
194 "SuiBridgeIndexer",
195 sui_checkpoint_datasource,
196 SuiBridgeDataMapper { metrics },
197 datastore_with_out_of_order_source,
198 )
199 .build())
200}
201
202pub async fn create_eth_sync_indexer(
203 pool: PgPool,
204 metrics: BridgeIndexerMetrics,
205 bridge_metrics: Arc<BridgeMetrics>,
206 config: &IndexerConfig,
207 eth_client: Arc<EthClient<MeteredEthHttpProvier>>,
208) -> Result<Indexer<PgBridgePersistent, EthFinalizedSyncDatasource, EthDataMapper>, anyhow::Error> {
209 let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
210 let eth_sync_datasource = EthFinalizedSyncDatasource::new(
212 bridge_addresses,
213 eth_client.clone(),
214 config.eth_rpc_url.clone(),
215 metrics.clone().boxed(),
216 bridge_metrics.clone(),
217 config.eth_bridge_genesis_block,
218 )
219 .await?;
220 Ok(create_eth_indexer_builder(
221 pool,
222 metrics,
223 eth_sync_datasource,
224 "EthBridgeFinalizedSyncIndexer",
225 )
226 .await?
227 .with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
228 .build())
229}
230
231pub async fn create_eth_subscription_indexer(
232 pool: PgPool,
233 metrics: BridgeIndexerMetrics,
234 config: &IndexerConfig,
235 eth_client: Arc<EthClient<MeteredEthHttpProvier>>,
236) -> Result<Indexer<PgBridgePersistent, EthSubscriptionDatasource, EthDataMapper>, anyhow::Error> {
237 let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
239 let eth_subscription_datasource = EthSubscriptionDatasource::new(
241 bridge_addresses.clone(),
242 eth_client.clone(),
243 config.eth_ws_url.clone(),
244 metrics.clone().boxed(),
245 config.eth_bridge_genesis_block,
246 )
247 .await?;
248
249 Ok(create_eth_indexer_builder(
250 pool,
251 metrics,
252 eth_subscription_datasource,
253 "EthBridgeSubscriptionIndexer",
254 )
255 .await?
256 .with_backfill_strategy(BackfillStrategy::Disabled)
257 .build())
258}
259
260async fn create_eth_indexer_builder<T: Send, D: Datasource<T>>(
261 pool: PgPool,
262 metrics: BridgeIndexerMetrics,
263 datasource: D,
264 indexer_name: &str,
265) -> Result<IndexerBuilder<D, EthDataMapper, PgBridgePersistent>, anyhow::Error> {
266 let datastore = PgBridgePersistent::new(
267 pool,
268 ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
269 tokio::time::Duration::from_secs(30),
270 )),
271 );
272
273 Ok(IndexerBuilder::new(
275 indexer_name,
276 datasource,
277 EthDataMapper { metrics },
278 datastore.clone(),
279 ))
280}
281
282async fn get_eth_bridge_contract_addresses(
283 config: &IndexerConfig,
284) -> Result<Vec<EthAddress>, anyhow::Error> {
285 let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
286 let provider = Arc::new(
287 Provider::<Http>::try_from(&config.eth_rpc_url)?
288 .interval(std::time::Duration::from_millis(2000)),
289 );
290 let bridge_addresses = get_eth_contract_addresses(bridge_address, &provider).await?;
291 Ok(vec![
292 bridge_address,
293 bridge_addresses.0,
294 bridge_addresses.1,
295 bridge_addresses.2,
296 bridge_addresses.3,
297 ])
298}