1use crate::abi::EthBridgeEvent;
5use crate::error::{BridgeError, BridgeResult};
6use crate::metered_eth_provider::new_metered_eth_provider;
7use crate::metrics::BridgeMetrics;
8use crate::types::{BridgeAction, EthLog, RawEthLog};
9use crate::utils::EthProvider;
10use alloy::primitives::{Address as EthAddress, TxHash};
11use alloy::providers::Provider;
12use alloy::rpc::types::{Block, Filter, Log};
13use std::collections::HashSet;
14use std::sync::Arc;
15use tap::TapFallible;
16
17#[cfg(test)]
18use crate::eth_mock_provider::EthMockService;
19
20pub struct EthClient {
21 provider: EthProvider,
22 contract_addresses: HashSet<EthAddress>,
23}
24
25impl EthClient {
26 pub async fn new(
27 provider_url: &str,
28 contract_addresses: HashSet<EthAddress>,
29 metrics: Arc<BridgeMetrics>,
30 ) -> anyhow::Result<Self> {
31 let provider = new_metered_eth_provider(provider_url, metrics)?;
32 let self_ = Self {
33 provider,
34 contract_addresses,
35 };
36 self_.describe().await?;
37 Ok(self_)
38 }
39
40 pub async fn from_provider(
43 provider: EthProvider,
44 contract_addresses: HashSet<EthAddress>,
45 ) -> anyhow::Result<Self> {
46 let self_ = Self {
47 provider,
48 contract_addresses,
49 };
50 self_.describe().await?;
51 Ok(self_)
52 }
53
54 pub fn provider(&self) -> EthProvider {
55 self.provider.clone()
56 }
57}
58
59#[cfg(test)]
60impl EthClient {
61 pub fn new_mocked(
62 mock_service: EthMockService,
63 contract_addresses: HashSet<EthAddress>,
64 ) -> Self {
65 let provider = mock_service.as_provider();
66 Self {
67 provider,
68 contract_addresses,
69 }
70 }
71}
72
73impl EthClient {
74 pub async fn get_chain_id(&self) -> Result<u64, anyhow::Error> {
75 let chain_id = self.provider.get_chain_id().await?;
76 Ok(chain_id)
77 }
78
79 async fn describe(&self) -> anyhow::Result<()> {
81 let chain_id = self.get_chain_id().await?;
82 let block_number = self.provider.get_block_number().await?;
83 tracing::info!(
84 "EthClient is connected to chain {chain_id}, current block number: {block_number}"
85 );
86 Ok(())
87 }
88
89 pub async fn get_finalized_bridge_action_maybe(
93 &self,
94 tx_hash: TxHash,
95 event_idx: u16,
96 ) -> BridgeResult<BridgeAction> {
97 let last_finalized_block_id = self.get_last_finalized_block_id().await?;
101
102 let receipt = self
103 .provider
104 .get_transaction_receipt(tx_hash)
105 .await
106 .map_err(BridgeError::from)?
107 .ok_or(BridgeError::TxNotFound)?;
108 let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
109 "Provider returns log without block_number".into(),
110 ))?;
111 if receipt_block_num > last_finalized_block_id {
112 return Err(BridgeError::TxNotFinalized);
113 }
114
115 let log = receipt
116 .logs()
117 .get(event_idx as usize)
118 .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
119
120 if !self.contract_addresses.contains(&log.address()) {
122 return Err(BridgeError::BridgeEventInUnrecognizedEthContract);
123 }
124
125 let eth_log = EthLog {
126 block_number: receipt_block_num,
127 tx_hash,
128 log_index_in_tx: event_idx,
129 log: log.clone(),
130 };
131 let bridge_event = EthBridgeEvent::try_from_eth_log(ð_log)
132 .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
133 bridge_event
134 .try_into_bridge_action(tx_hash, event_idx)?
135 .ok_or(BridgeError::BridgeEventNotActionable)
136 }
137
138 pub async fn get_last_finalized_block_id(&self) -> BridgeResult<u64> {
139 let block: Option<Block> = self
140 .provider
141 .raw_request("eth_getBlockByNumber".into(), ("finalized", false))
142 .await?;
143 let block = block.ok_or(BridgeError::TransientProviderError(
144 "Provider fails to return last finalized block".into(),
145 ))?;
146 Ok(block.number())
147 }
148
149 pub async fn get_events_in_range(
152 &self,
153 address: alloy::primitives::Address,
154 start_block: u64,
155 end_block: u64,
156 ) -> BridgeResult<Vec<EthLog>> {
157 let filter = Filter::new()
158 .from_block(start_block)
159 .to_block(end_block)
160 .address(address);
161 let logs = self
162 .provider
163 .get_logs(&filter)
165 .await
166 .map_err(BridgeError::from)
167 .tap_err(|e| {
168 tracing::error!(
169 "get_events_in_range failed. Filter: {:?}. Error {:?}",
170 filter,
171 e
172 )
173 })?;
174
175 if logs.iter().any(|log| log.address() != address) {
177 return Err(BridgeError::ProviderError(format!(
178 "Provider returns logs from different contract address (expected: {:?}): {:?}",
179 address, logs
180 )));
181 }
182 if logs.is_empty() {
183 return Ok(vec![]);
184 }
185
186 let tasks = logs.into_iter().map(|log| self.get_log_tx_details(log));
187 futures::future::join_all(tasks)
188 .await
189 .into_iter()
190 .collect::<Result<Vec<_>, _>>()
191 .tap_err(|e| {
192 tracing::error!(
193 "get_log_tx_details failed. Filter: {:?}. Error {:?}",
194 filter,
195 e
196 )
197 })
198 }
199
200 pub async fn get_raw_events_in_range(
203 &self,
204 addresses: Vec<EthAddress>,
205 start_block: u64,
206 end_block: u64,
207 ) -> BridgeResult<Vec<RawEthLog>> {
208 let filter = Filter::new()
209 .from_block(start_block)
210 .to_block(end_block)
211 .address(addresses.clone());
212 let logs = self
213 .provider
214 .get_logs(&filter)
215 .await
216 .map_err(BridgeError::from)
217 .tap_err(|e| {
218 tracing::error!(
219 "get_events_in_range failed. Filter: {:?}. Error {:?}",
220 filter,
221 e
222 )
223 })?;
224 logs.into_iter().map(
226 |log| {
227 if !addresses.contains(&log.address()) {
228 return Err(BridgeError::ProviderError(format!("Provider returns logs from different contract address (expected: {:?}): {:?}", addresses, log)));
229 }
230 Ok(RawEthLog {
231 block_number: log.block_number.ok_or(BridgeError::ProviderError("Provider returns log without block_number".into()))?,
232 tx_hash: log.transaction_hash.ok_or(BridgeError::ProviderError("Provider returns log without transaction_hash".into()))?,
233 log,
234 })}
235 ).collect::<Result<Vec<_>, _>>()
236 }
237
238 async fn get_log_tx_details(&self, log: Log) -> BridgeResult<EthLog> {
242 let block_number = log.block_number.ok_or(BridgeError::ProviderError(
243 "Provider returns log without block_number".into(),
244 ))?;
245 let tx_hash = log.transaction_hash.ok_or(BridgeError::ProviderError(
246 "Provider returns log without transaction_hash".into(),
247 ))?;
248 let log_index = log.log_index.ok_or(BridgeError::ProviderError(
250 "Provider returns log without log_index".into(),
251 ))?;
252
253 let receipt = self
257 .provider
258 .get_transaction_receipt(tx_hash)
259 .await
260 .map_err(BridgeError::from)?
261 .ok_or(BridgeError::ProviderError(format!(
262 "Provide cannot find eth transaction for log: {:?})",
263 log
264 )))?;
265
266 let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
267 "Provider returns log without block_number".into(),
268 ))?;
269 if receipt_block_num != block_number {
270 return Err(BridgeError::ProviderError(format!(
271 "Provider returns receipt with different block number from log. Receipt: {:?}, Log: {:?}",
272 receipt, log
273 )));
274 }
275
276 let mut log_index_in_tx = None;
278 for (idx, receipt_log) in receipt.logs().iter().enumerate() {
279 if receipt_log.log_index == Some(log_index) {
281 if receipt_log.topics() != log.topics() || receipt_log.data() != log.data() {
283 return Err(BridgeError::ProviderError(format!(
284 "Provider returns receipt with different log from log. Receipt: {:?}, Log: {:?}",
285 receipt, log
286 )));
287 }
288 log_index_in_tx = Some(idx);
289 }
290 }
291 let log_index_in_tx = log_index_in_tx.ok_or(BridgeError::ProviderError(format!(
292 "Couldn't find matching log: {:?} in transaction {}",
293 log, tx_hash
294 )))?;
295
296 Ok(EthLog {
297 block_number,
298 tx_hash,
299 log_index_in_tx: log_index_in_tx as u16,
300 log,
301 })
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use alloy::rpc::types::TransactionReceipt;
308 use prometheus::Registry;
309
310 use super::*;
311 use crate::test_utils::{
312 get_test_log_and_action, make_transaction_receipt, mock_last_finalized_block,
313 };
314
315 #[tokio::test]
316 async fn test_get_finalized_bridge_action_maybe() {
317 telemetry_subscribers::init_for_testing();
318 let registry = Registry::new();
319 mysten_metrics::init_metrics(®istry);
320 let mock_service = EthMockService::new();
321 mock_last_finalized_block(&mock_service, 777);
322
323 let client = EthClient::new_mocked(
324 mock_service.clone(),
325 HashSet::from_iter(vec![EthAddress::default()]),
326 );
327 let result = client.get_last_finalized_block_id().await.unwrap();
328 assert_eq!(result, 777);
329
330 let eth_tx_hash = TxHash::random();
331 let log = Log {
332 transaction_hash: Some(eth_tx_hash),
333 block_number: Some(778),
334 ..Default::default()
335 };
336 let (good_log, bridge_action) =
337 get_test_log_and_action(EthAddress::default(), eth_tx_hash, 1);
338 mock_service
340 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
341 "eth_getTransactionReceipt",
342 [log.transaction_hash.unwrap()],
343 make_transaction_receipt(
344 EthAddress::default(),
345 log.block_number,
346 vec![log.clone(), good_log],
347 ),
348 )
349 .unwrap();
350
351 let error = client
352 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
353 .await
354 .unwrap_err();
355 match error {
356 BridgeError::TxNotFinalized => {}
357 _ => panic!("expected TxNotFinalized"),
358 };
359
360 mock_last_finalized_block(&mock_service, 778);
362
363 mock_service
364 .add_response(
365 "eth_getBlockByNumber",
366 (format!("0x{:x}", 778), false),
367 make_transaction_receipt(EthAddress::default(), Some(778), vec![log.clone()]),
368 )
369 .unwrap();
370
371 let error = client
372 .get_finalized_bridge_action_maybe(eth_tx_hash, 2)
373 .await
374 .unwrap_err();
375 match error {
377 BridgeError::NoBridgeEventsInTxPosition => {}
378 _ => panic!("expected NoBridgeEventsInTxPosition"),
379 };
380
381 let error = client
382 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
383 .await
384 .unwrap_err();
385 match error {
387 BridgeError::NoBridgeEventsInTxPosition => {}
388 _ => panic!("expected NoBridgeEventsInTxPosition"),
389 };
390
391 let action = client
392 .get_finalized_bridge_action_maybe(eth_tx_hash, 1)
393 .await
394 .unwrap();
395 assert_eq!(action, bridge_action);
396 }
397
398 #[tokio::test]
399 async fn test_get_finalized_bridge_action_maybe_unrecognized_contract() {
400 telemetry_subscribers::init_for_testing();
401 let registry = Registry::new();
402 mysten_metrics::init_metrics(®istry);
403 let mock_service = EthMockService::new();
404 mock_last_finalized_block(&mock_service, 777);
405
406 let client = EthClient::new_mocked(
407 mock_service.clone(),
408 HashSet::from_iter(vec![
409 EthAddress::repeat_byte(5),
410 EthAddress::repeat_byte(6),
411 EthAddress::repeat_byte(7),
412 ]),
413 );
414 let result = client.get_last_finalized_block_id().await.unwrap();
415 assert_eq!(result, 777);
416
417 let eth_tx_hash = TxHash::random();
418 let (log, _bridge_action) =
420 get_test_log_and_action(EthAddress::repeat_byte(4), eth_tx_hash, 0);
421 mock_service
422 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
423 "eth_getTransactionReceipt",
424 [log.transaction_hash.unwrap()],
425 make_transaction_receipt(
426 EthAddress::default(),
427 log.block_number,
428 vec![log.clone()],
429 ),
430 )
431 .unwrap();
432 mock_service
433 .add_response(
434 "eth_getBlockByNumber",
435 (format!("0x{:x}", 777), false),
436 make_transaction_receipt(EthAddress::default(), Some(777), vec![log.clone()]),
437 )
438 .unwrap();
439
440 let error = client
441 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
442 .await
443 .unwrap_err();
444 match error {
445 BridgeError::BridgeEventInUnrecognizedEthContract => {}
446 _ => panic!("expected TxNotFinalized"),
447 };
448
449 let (log, bridge_action) =
451 get_test_log_and_action(EthAddress::repeat_byte(6), eth_tx_hash, 0);
452 mock_service
453 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
454 "eth_getTransactionReceipt",
455 [log.transaction_hash.unwrap()],
456 make_transaction_receipt(
457 EthAddress::default(),
458 log.block_number,
459 vec![log.clone()],
460 ),
461 )
462 .unwrap();
463 mock_service
464 .add_response(
465 "eth_getBlockByNumber",
466 (format!("0x{:x}", 777), false),
467 make_transaction_receipt(EthAddress::default(), Some(777), vec![log.clone()]),
468 )
469 .unwrap();
470 let action = client
471 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
472 .await
473 .unwrap();
474 assert_eq!(action, bridge_action);
475 }
476}