1use crate::config::IndexerConfig;
5use crate::eth_bridge_indexer::{
6 EthDataMapper, EthFinalizedSyncDatasource, EthSubscriptionDatasource,
7};
8use crate::metrics::BridgeIndexerMetrics;
9use crate::postgres_manager::PgPool;
10use crate::storage::PgBridgePersistent;
11use alloy::primitives::Address as EthAddress;
12use std::str::FromStr;
13use std::sync::Arc;
14use sui_bridge::eth_client::EthClient;
15use sui_bridge::metrics::BridgeMetrics;
16use sui_bridge::utils::{get_eth_contract_addresses, get_eth_provider};
17use sui_bridge_schema::models::{
18 BridgeDataSource, GovernanceAction as DBGovernanceAction, TokenTransferStatus,
19};
20use sui_bridge_schema::models::{GovernanceActionType, TokenTransferData as DBTokenTransferData};
21use sui_bridge_schema::models::{SuiErrorTransactions, TokenTransfer as DBTokenTransfer};
22use sui_types::base_types::{SuiAddress, TransactionDigest};
23
24pub mod config;
25pub mod metrics;
26pub mod postgres_manager;
27mod storage;
28
29mod eth_bridge_indexer;
30
31use indexer_builder::{BackfillStrategy, Datasource, Indexer, IndexerBuilder};
32use metrics::IndexerMetricProvider;
33use progress::{ProgressSavingPolicy, SaveAfterDurationPolicy};
34
35#[derive(Clone)]
36pub enum ProcessedTxnData {
37 TokenTransfer(TokenTransfer),
38 GovernanceAction(GovernanceAction),
39 Error(SuiTxnError),
40}
41
42#[derive(Clone)]
43pub struct SuiTxnError {
44 tx_digest: TransactionDigest,
45 sender: SuiAddress,
46 timestamp_ms: u64,
47 failure_status: String,
48 cmd_idx: Option<u64>,
49}
50
51#[derive(Clone)]
52pub struct TokenTransfer {
53 chain_id: u8,
54 nonce: u64,
55 block_height: u64,
56 timestamp_ms: u64,
57 txn_hash: Vec<u8>,
58 txn_sender: Vec<u8>,
59 status: TokenTransferStatus,
60 gas_usage: i64,
61 data_source: BridgeDataSource,
62 data: Option<TokenTransferData>,
63 is_finalized: bool,
64}
65
66#[derive(Clone)]
67pub struct GovernanceAction {
68 nonce: Option<u64>,
69 data_source: BridgeDataSource,
70 tx_digest: Vec<u8>,
71 sender: Vec<u8>,
72 timestamp_ms: u64,
73 action: GovernanceActionType,
74 data: serde_json::Value,
75}
76
77#[derive(Clone)]
78pub struct TokenTransferData {
79 sender_address: Vec<u8>,
80 destination_chain: u8,
81 recipient_address: Vec<u8>,
82 token_id: u8,
83 amount: u64,
84 is_finalized: bool,
85}
86
87impl TokenTransfer {
88 fn to_db(&self) -> DBTokenTransfer {
89 DBTokenTransfer {
90 chain_id: self.chain_id as i32,
91 nonce: self.nonce as i64,
92 block_height: self.block_height as i64,
93 timestamp_ms: self.timestamp_ms as i64,
94 txn_hash: self.txn_hash.clone(),
95 txn_sender: self.txn_sender.clone(),
96 status: self.status,
97 gas_usage: self.gas_usage,
98 data_source: self.data_source,
99 is_finalized: self.is_finalized,
100 }
101 }
102
103 fn to_data_maybe(&self) -> Option<DBTokenTransferData> {
104 self.data.as_ref().map(|data| DBTokenTransferData {
105 chain_id: self.chain_id as i32,
106 nonce: self.nonce as i64,
107 block_height: self.block_height as i64,
108 timestamp_ms: self.timestamp_ms as i64,
109 txn_hash: self.txn_hash.clone(),
110 sender_address: data.sender_address.clone(),
111 destination_chain: data.destination_chain as i32,
112 recipient_address: data.recipient_address.clone(),
113 token_id: data.token_id as i32,
114 amount: data.amount as i64,
115 is_finalized: data.is_finalized,
116 })
117 }
118}
119
120impl SuiTxnError {
121 fn to_db(&self) -> SuiErrorTransactions {
122 SuiErrorTransactions {
123 txn_digest: self.tx_digest.inner().to_vec(),
124 sender_address: self.sender.to_vec(),
125 timestamp_ms: self.timestamp_ms as i64,
126 failure_status: self.failure_status.clone(),
127 cmd_idx: self.cmd_idx.map(|idx| idx as i64),
128 }
129 }
130}
131
132impl GovernanceAction {
133 fn to_db(&self) -> DBGovernanceAction {
134 DBGovernanceAction {
135 nonce: self.nonce.map(|nonce| nonce as i64),
136 data_source: self.data_source,
137 txn_digest: self.tx_digest.clone(),
138 sender_address: self.sender.to_vec(),
139 timestamp_ms: self.timestamp_ms as i64,
140 action: self.action,
141 data: self.data.clone(),
142 }
143 }
144}
145
146pub async fn create_eth_sync_indexer(
147 pool: PgPool,
148 metrics: BridgeIndexerMetrics,
149 bridge_metrics: Arc<BridgeMetrics>,
150 config: &IndexerConfig,
151 eth_client: Arc<EthClient>,
152) -> Result<Indexer<PgBridgePersistent, EthFinalizedSyncDatasource, EthDataMapper>, anyhow::Error> {
153 let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
154 let eth_sync_datasource = EthFinalizedSyncDatasource::new(
156 bridge_addresses,
157 eth_client.clone(),
158 config.eth_rpc_url.clone(),
159 metrics.clone().boxed(),
160 bridge_metrics.clone(),
161 config.eth_bridge_genesis_block,
162 )
163 .await?;
164 Ok(create_eth_indexer_builder(
165 pool,
166 metrics,
167 eth_sync_datasource,
168 "EthBridgeFinalizedSyncIndexer",
169 )
170 .await?
171 .with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
172 .build())
173}
174
175pub async fn create_eth_subscription_indexer(
176 pool: PgPool,
177 metrics: BridgeIndexerMetrics,
178 config: &IndexerConfig,
179 eth_client: Arc<EthClient>,
180) -> Result<Indexer<PgBridgePersistent, EthSubscriptionDatasource, EthDataMapper>, anyhow::Error> {
181 let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
183 let eth_subscription_datasource = EthSubscriptionDatasource::new(
185 bridge_addresses.clone(),
186 eth_client.clone(),
187 config.eth_ws_url.clone(),
188 metrics.clone().boxed(),
189 config.eth_bridge_genesis_block,
190 )
191 .await?;
192
193 Ok(create_eth_indexer_builder(
194 pool,
195 metrics,
196 eth_subscription_datasource,
197 "EthBridgeSubscriptionIndexer",
198 )
199 .await?
200 .with_backfill_strategy(BackfillStrategy::Disabled)
201 .build())
202}
203
204async fn create_eth_indexer_builder<T: Send, D: Datasource<T>>(
205 pool: PgPool,
206 metrics: BridgeIndexerMetrics,
207 datasource: D,
208 indexer_name: &str,
209) -> Result<IndexerBuilder<D, EthDataMapper, PgBridgePersistent>, anyhow::Error> {
210 let datastore = PgBridgePersistent::new(
211 pool,
212 ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
213 tokio::time::Duration::from_secs(30),
214 )),
215 );
216
217 Ok(IndexerBuilder::new(
219 indexer_name,
220 datasource,
221 EthDataMapper { metrics },
222 datastore.clone(),
223 ))
224}
225
226async fn get_eth_bridge_contract_addresses(
227 config: &IndexerConfig,
228) -> Result<Vec<EthAddress>, anyhow::Error> {
229 let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
230 let eth_provider = get_eth_provider(&config.eth_rpc_url)?;
231 let bridge_addresses = get_eth_contract_addresses(bridge_address, eth_provider).await?;
232 Ok(vec![
233 bridge_address,
234 bridge_addresses.0,
235 bridge_addresses.1,
236 bridge_addresses.2,
237 bridge_addresses.3,
238 ])
239}
240
241mod indexer_builder;
244mod progress;
245const LIVE_TASK_TARGET_CHECKPOINT: i64 = i64::MAX;
246
247#[derive(Clone, Debug)]
248pub struct Task {
249 pub task_name: String,
250 pub start_checkpoint: u64,
251 pub target_checkpoint: u64,
252 pub timestamp: u64,
253 pub is_live_task: bool,
254}
255
256impl Task {
257 pub fn name_prefix(&self) -> &str {
259 self.task_name.split(' ').next().unwrap_or("Unknown")
260 }
261
262 pub fn type_str(&self) -> &str {
263 if self.is_live_task {
264 "live"
265 } else {
266 "backfill"
267 }
268 }
269}
270
271#[derive(Clone, Debug)]
272pub struct Tasks {
273 live_task: Option<Task>,
274 backfill_tasks: Vec<Task>,
275}
276
277impl Tasks {
278 pub fn new(tasks: Vec<Task>) -> anyhow::Result<Self> {
279 let mut live_tasks = vec![];
280 let mut backfill_tasks = vec![];
281 for task in tasks {
282 if task.is_live_task {
283 live_tasks.push(task);
284 } else {
285 backfill_tasks.push(task);
286 }
287 }
288 if live_tasks.len() > 1 {
289 anyhow::bail!("More than one live task found: {:?}", live_tasks);
290 }
291 Ok(Self {
292 live_task: live_tasks.pop(),
293 backfill_tasks,
294 })
295 }
296
297 pub fn live_task(&self) -> Option<Task> {
298 self.live_task.clone()
299 }
300
301 pub fn backfill_tasks_ordered_desc(&self) -> Vec<Task> {
302 let mut tasks = self.backfill_tasks.clone();
303 tasks.sort_by(|t1, t2| t2.start_checkpoint.cmp(&t1.start_checkpoint));
304 tasks
305 }
306}