1use crate::config::IndexerConfig;
5use crate::eth_bridge_indexer::{
6 EthDataMapper, EthFinalizedSyncDatasource, EthSubscriptionDatasource,
7};
8use crate::metrics::BridgeIndexerMetrics;
9use crate::postgres_manager::PgPool;
10use crate::storage::PgBridgePersistent;
11use alloy::primitives::Address as EthAddress;
12use std::str::FromStr;
13use std::sync::Arc;
14use sui_bridge::eth_client::EthClient;
15use sui_bridge::metrics::BridgeMetrics;
16use sui_bridge::utils::{get_eth_contract_addresses, get_eth_provider};
17use sui_bridge_schema::models::{
18 BridgeDataSource, GovernanceAction as DBGovernanceAction, TokenTransferStatus,
19};
20use sui_bridge_schema::models::{GovernanceActionType, TokenTransferData as DBTokenTransferData};
21use sui_bridge_schema::models::{SuiErrorTransactions, TokenTransfer as DBTokenTransfer};
22use sui_types::base_types::{SuiAddress, TransactionDigest};
23
24pub mod config;
25pub mod metrics;
26pub mod postgres_manager;
27mod storage;
28
29mod eth_bridge_indexer;
30
31use indexer_builder::{BackfillStrategy, Datasource, Indexer, IndexerBuilder};
32use metrics::IndexerMetricProvider;
33use progress::{ProgressSavingPolicy, SaveAfterDurationPolicy};
34
35#[derive(Clone)]
36pub enum ProcessedTxnData {
37 TokenTransfer(TokenTransfer),
38 GovernanceAction(GovernanceAction),
39 Error(SuiTxnError),
40}
41
42#[derive(Clone)]
43pub struct SuiTxnError {
44 tx_digest: TransactionDigest,
45 sender: SuiAddress,
46 timestamp_ms: u64,
47 failure_status: String,
48 cmd_idx: Option<u64>,
49}
50
51#[derive(Clone)]
52pub struct TokenTransfer {
53 chain_id: u8,
54 nonce: u64,
55 block_height: u64,
56 timestamp_ms: u64,
57 txn_hash: Vec<u8>,
58 txn_sender: Vec<u8>,
59 status: TokenTransferStatus,
60 gas_usage: i64,
61 data_source: BridgeDataSource,
62 data: Option<TokenTransferData>,
63 is_finalized: bool,
64}
65
66#[derive(Clone)]
67pub struct GovernanceAction {
68 nonce: Option<u64>,
69 data_source: BridgeDataSource,
70 tx_digest: Vec<u8>,
71 sender: Vec<u8>,
72 timestamp_ms: u64,
73 action: GovernanceActionType,
74 data: serde_json::Value,
75}
76
77#[derive(Clone)]
78pub struct TokenTransferData {
79 sender_address: Vec<u8>,
80 destination_chain: u8,
81 recipient_address: Vec<u8>,
82 token_id: u8,
83 amount: u64,
84 is_finalized: bool,
85 message_timestamp_ms: Option<u64>,
88}
89
90impl TokenTransfer {
91 fn to_db(&self) -> DBTokenTransfer {
92 DBTokenTransfer {
93 chain_id: self.chain_id as i32,
94 nonce: self.nonce as i64,
95 block_height: self.block_height as i64,
96 timestamp_ms: self.timestamp_ms as i64,
97 txn_hash: self.txn_hash.clone(),
98 txn_sender: self.txn_sender.clone(),
99 status: self.status,
100 gas_usage: self.gas_usage,
101 data_source: self.data_source,
102 is_finalized: self.is_finalized,
103 }
104 }
105
106 fn to_data_maybe(&self) -> Option<DBTokenTransferData> {
107 self.data.as_ref().map(|data| DBTokenTransferData {
108 chain_id: self.chain_id as i32,
109 nonce: self.nonce as i64,
110 block_height: self.block_height as i64,
111 timestamp_ms: self.timestamp_ms as i64,
112 txn_hash: self.txn_hash.clone(),
113 sender_address: data.sender_address.clone(),
114 destination_chain: data.destination_chain as i32,
115 recipient_address: data.recipient_address.clone(),
116 token_id: data.token_id as i32,
117 amount: data.amount as i64,
118 is_finalized: data.is_finalized,
119 message_timestamp_ms: data.message_timestamp_ms.map(|ts| ts as i64),
120 })
121 }
122}
123
124impl SuiTxnError {
125 fn to_db(&self) -> SuiErrorTransactions {
126 SuiErrorTransactions {
127 txn_digest: self.tx_digest.inner().to_vec(),
128 sender_address: self.sender.to_vec(),
129 timestamp_ms: self.timestamp_ms as i64,
130 failure_status: self.failure_status.clone(),
131 cmd_idx: self.cmd_idx.map(|idx| idx as i64),
132 }
133 }
134}
135
136impl GovernanceAction {
137 fn to_db(&self) -> DBGovernanceAction {
138 DBGovernanceAction {
139 nonce: self.nonce.map(|nonce| nonce as i64),
140 data_source: self.data_source,
141 txn_digest: self.tx_digest.clone(),
142 sender_address: self.sender.to_vec(),
143 timestamp_ms: self.timestamp_ms as i64,
144 action: self.action,
145 data: self.data.clone(),
146 }
147 }
148}
149
150pub async fn create_eth_sync_indexer(
151 pool: PgPool,
152 metrics: BridgeIndexerMetrics,
153 bridge_metrics: Arc<BridgeMetrics>,
154 config: &IndexerConfig,
155 eth_client: Arc<EthClient>,
156) -> Result<Indexer<PgBridgePersistent, EthFinalizedSyncDatasource, EthDataMapper>, anyhow::Error> {
157 let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
158 let eth_sync_datasource = EthFinalizedSyncDatasource::new(
160 bridge_addresses,
161 eth_client.clone(),
162 config.eth_rpc_url.clone(),
163 metrics.clone().boxed(),
164 bridge_metrics.clone(),
165 config.eth_bridge_genesis_block,
166 )
167 .await?;
168 Ok(create_eth_indexer_builder(
169 pool,
170 metrics,
171 eth_sync_datasource,
172 "EthBridgeFinalizedSyncIndexer",
173 )
174 .await?
175 .with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
176 .build())
177}
178
179pub async fn create_eth_subscription_indexer(
180 pool: PgPool,
181 metrics: BridgeIndexerMetrics,
182 config: &IndexerConfig,
183 eth_client: Arc<EthClient>,
184) -> Result<Indexer<PgBridgePersistent, EthSubscriptionDatasource, EthDataMapper>, anyhow::Error> {
185 let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
187 let eth_subscription_datasource = EthSubscriptionDatasource::new(
189 bridge_addresses.clone(),
190 eth_client.clone(),
191 config.eth_ws_url.clone(),
192 metrics.clone().boxed(),
193 config.eth_bridge_genesis_block,
194 )
195 .await?;
196
197 Ok(create_eth_indexer_builder(
198 pool,
199 metrics,
200 eth_subscription_datasource,
201 "EthBridgeSubscriptionIndexer",
202 )
203 .await?
204 .with_backfill_strategy(BackfillStrategy::Disabled)
205 .build())
206}
207
208async fn create_eth_indexer_builder<T: Send, D: Datasource<T>>(
209 pool: PgPool,
210 metrics: BridgeIndexerMetrics,
211 datasource: D,
212 indexer_name: &str,
213) -> Result<IndexerBuilder<D, EthDataMapper, PgBridgePersistent>, anyhow::Error> {
214 let datastore = PgBridgePersistent::new(
215 pool,
216 ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
217 tokio::time::Duration::from_secs(30),
218 )),
219 );
220
221 Ok(IndexerBuilder::new(
223 indexer_name,
224 datasource,
225 EthDataMapper { metrics },
226 datastore.clone(),
227 ))
228}
229
230async fn get_eth_bridge_contract_addresses(
231 config: &IndexerConfig,
232) -> Result<Vec<EthAddress>, anyhow::Error> {
233 let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
234 let eth_provider = get_eth_provider(&config.eth_rpc_url)?;
235 let bridge_addresses = get_eth_contract_addresses(bridge_address, eth_provider).await?;
236 Ok(vec![
237 bridge_address,
238 bridge_addresses.0,
239 bridge_addresses.1,
240 bridge_addresses.2,
241 bridge_addresses.3,
242 ])
243}
244
245mod indexer_builder;
248mod progress;
249const LIVE_TASK_TARGET_CHECKPOINT: i64 = i64::MAX;
250
251#[derive(Clone, Debug)]
252pub struct Task {
253 pub task_name: String,
254 pub start_checkpoint: u64,
255 pub target_checkpoint: u64,
256 pub timestamp: u64,
257 pub is_live_task: bool,
258}
259
260impl Task {
261 pub fn name_prefix(&self) -> &str {
263 self.task_name.split(' ').next().unwrap_or("Unknown")
264 }
265
266 pub fn type_str(&self) -> &str {
267 if self.is_live_task {
268 "live"
269 } else {
270 "backfill"
271 }
272 }
273}
274
275#[derive(Clone, Debug)]
276pub struct Tasks {
277 live_task: Option<Task>,
278 backfill_tasks: Vec<Task>,
279}
280
281impl Tasks {
282 pub fn new(tasks: Vec<Task>) -> anyhow::Result<Self> {
283 let mut live_tasks = vec![];
284 let mut backfill_tasks = vec![];
285 for task in tasks {
286 if task.is_live_task {
287 live_tasks.push(task);
288 } else {
289 backfill_tasks.push(task);
290 }
291 }
292 if live_tasks.len() > 1 {
293 anyhow::bail!("More than one live task found: {:?}", live_tasks);
294 }
295 Ok(Self {
296 live_task: live_tasks.pop(),
297 backfill_tasks,
298 })
299 }
300
301 pub fn live_task(&self) -> Option<Task> {
302 self.live_task.clone()
303 }
304
305 pub fn backfill_tasks_ordered_desc(&self) -> Vec<Task> {
306 let mut tasks = self.backfill_tasks.clone();
307 tasks.sort_by(|t1, t2| t2.start_checkpoint.cmp(&t1.start_checkpoint));
308 tasks
309 }
310}