1use crate::abi::EthBridgeEvent;
5use crate::error::{BridgeError, BridgeResult};
6use crate::metered_eth_provider::new_metered_eth_provider;
7use crate::metrics::BridgeMetrics;
8use crate::types::{BridgeAction, EthLog, RawEthLog};
9use crate::utils::EthProvider;
10use alloy::primitives::{Address as EthAddress, TxHash};
11use alloy::providers::Provider;
12use alloy::rpc::types::{Block, Filter, Log};
13use std::collections::HashSet;
14use std::sync::Arc;
15use tap::TapFallible;
16
17#[cfg(test)]
18use crate::eth_mock_provider::EthMockService;
19
20pub struct EthClient {
21 provider: EthProvider,
22 contract_addresses: HashSet<EthAddress>,
23}
24
25impl EthClient {
26 pub async fn new(
27 provider_url: &str,
28 contract_addresses: HashSet<EthAddress>,
29 metrics: Arc<BridgeMetrics>,
30 ) -> anyhow::Result<Self> {
31 let provider = new_metered_eth_provider(provider_url, metrics)?;
32 let self_ = Self {
33 provider,
34 contract_addresses,
35 };
36 self_.describe().await?;
37 Ok(self_)
38 }
39
40 pub fn provider(&self) -> EthProvider {
41 self.provider.clone()
42 }
43}
44
45#[cfg(test)]
46impl EthClient {
47 pub fn new_mocked(
48 mock_service: EthMockService,
49 contract_addresses: HashSet<EthAddress>,
50 ) -> Self {
51 let provider = mock_service.as_provider();
52 Self {
53 provider,
54 contract_addresses,
55 }
56 }
57}
58
59impl EthClient {
60 pub async fn get_chain_id(&self) -> Result<u64, anyhow::Error> {
61 let chain_id = self.provider.get_chain_id().await?;
62 Ok(chain_id)
63 }
64
65 async fn describe(&self) -> anyhow::Result<()> {
67 let chain_id = self.get_chain_id().await?;
68 let block_number = self.provider.get_block_number().await?;
69 tracing::info!(
70 "EthClient is connected to chain {chain_id}, current block number: {block_number}"
71 );
72 Ok(())
73 }
74
75 pub async fn get_finalized_bridge_action_maybe(
79 &self,
80 tx_hash: TxHash,
81 event_idx: u16,
82 ) -> BridgeResult<BridgeAction> {
83 let receipt = self
84 .provider
85 .get_transaction_receipt(tx_hash)
86 .await
87 .map_err(BridgeError::from)?
88 .ok_or(BridgeError::TxNotFound)?;
89 let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
90 "Provider returns log without block_number".into(),
91 ))?;
92 let last_finalized_block_id = self.get_last_finalized_block_id().await?;
94 if receipt_block_num > last_finalized_block_id {
95 return Err(BridgeError::TxNotFinalized);
96 }
97
98 let log = receipt
99 .logs()
100 .get(event_idx as usize)
101 .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
102
103 if !self.contract_addresses.contains(&log.address()) {
105 return Err(BridgeError::BridgeEventInUnrecognizedEthContract);
106 }
107
108 let eth_log = EthLog {
109 block_number: receipt_block_num,
110 tx_hash,
111 log_index_in_tx: event_idx,
112 log: log.clone(),
113 };
114 let bridge_event = EthBridgeEvent::try_from_eth_log(ð_log)
115 .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
116 bridge_event
117 .try_into_bridge_action(tx_hash, event_idx)?
118 .ok_or(BridgeError::BridgeEventNotActionable)
119 }
120
121 pub async fn get_last_finalized_block_id(&self) -> BridgeResult<u64> {
122 let block: Option<Block> = self
123 .provider
124 .raw_request("eth_getBlockByNumber".into(), ("finalized", false))
125 .await?;
126 let block = block.ok_or(BridgeError::TransientProviderError(
127 "Provider fails to return last finalized block".into(),
128 ))?;
129 Ok(block.number())
130 }
131
132 pub async fn get_events_in_range(
135 &self,
136 address: alloy::primitives::Address,
137 start_block: u64,
138 end_block: u64,
139 ) -> BridgeResult<Vec<EthLog>> {
140 let filter = Filter::new()
141 .from_block(start_block)
142 .to_block(end_block)
143 .address(address);
144 let logs = self
145 .provider
146 .get_logs(&filter)
148 .await
149 .map_err(BridgeError::from)
150 .tap_err(|e| {
151 tracing::error!(
152 "get_events_in_range failed. Filter: {:?}. Error {:?}",
153 filter,
154 e
155 )
156 })?;
157
158 if logs.iter().any(|log| log.address() != address) {
160 return Err(BridgeError::ProviderError(format!(
161 "Provider returns logs from different contract address (expected: {:?}): {:?}",
162 address, logs
163 )));
164 }
165 if logs.is_empty() {
166 return Ok(vec![]);
167 }
168
169 let tasks = logs.into_iter().map(|log| self.get_log_tx_details(log));
170 futures::future::join_all(tasks)
171 .await
172 .into_iter()
173 .collect::<Result<Vec<_>, _>>()
174 .tap_err(|e| {
175 tracing::error!(
176 "get_log_tx_details failed. Filter: {:?}. Error {:?}",
177 filter,
178 e
179 )
180 })
181 }
182
183 pub async fn get_raw_events_in_range(
186 &self,
187 addresses: Vec<EthAddress>,
188 start_block: u64,
189 end_block: u64,
190 ) -> BridgeResult<Vec<RawEthLog>> {
191 let filter = Filter::new()
192 .from_block(start_block)
193 .to_block(end_block)
194 .address(addresses.clone());
195 let logs = self
196 .provider
197 .get_logs(&filter)
198 .await
199 .map_err(BridgeError::from)
200 .tap_err(|e| {
201 tracing::error!(
202 "get_events_in_range failed. Filter: {:?}. Error {:?}",
203 filter,
204 e
205 )
206 })?;
207 logs.into_iter().map(
209 |log| {
210 if !addresses.contains(&log.address()) {
211 return Err(BridgeError::ProviderError(format!("Provider returns logs from different contract address (expected: {:?}): {:?}", addresses, log)));
212 }
213 Ok(RawEthLog {
214 block_number: log.block_number.ok_or(BridgeError::ProviderError("Provider returns log without block_number".into()))?,
215 tx_hash: log.transaction_hash.ok_or(BridgeError::ProviderError("Provider returns log without transaction_hash".into()))?,
216 log,
217 })}
218 ).collect::<Result<Vec<_>, _>>()
219 }
220
221 async fn get_log_tx_details(&self, log: Log) -> BridgeResult<EthLog> {
225 let block_number = log.block_number.ok_or(BridgeError::ProviderError(
226 "Provider returns log without block_number".into(),
227 ))?;
228 let tx_hash = log.transaction_hash.ok_or(BridgeError::ProviderError(
229 "Provider returns log without transaction_hash".into(),
230 ))?;
231 let log_index = log.log_index.ok_or(BridgeError::ProviderError(
233 "Provider returns log without log_index".into(),
234 ))?;
235
236 let receipt = self
240 .provider
241 .get_transaction_receipt(tx_hash)
242 .await
243 .map_err(BridgeError::from)?
244 .ok_or(BridgeError::ProviderError(format!(
245 "Provide cannot find eth transaction for log: {:?})",
246 log
247 )))?;
248
249 let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
250 "Provider returns log without block_number".into(),
251 ))?;
252 if receipt_block_num != block_number {
253 return Err(BridgeError::ProviderError(format!(
254 "Provider returns receipt with different block number from log. Receipt: {:?}, Log: {:?}",
255 receipt, log
256 )));
257 }
258
259 let mut log_index_in_tx = None;
261 for (idx, receipt_log) in receipt.logs().iter().enumerate() {
262 if receipt_log.log_index == Some(log_index) {
264 if receipt_log.topics() != log.topics() || receipt_log.data() != log.data() {
266 return Err(BridgeError::ProviderError(format!(
267 "Provider returns receipt with different log from log. Receipt: {:?}, Log: {:?}",
268 receipt, log
269 )));
270 }
271 log_index_in_tx = Some(idx);
272 }
273 }
274 let log_index_in_tx = log_index_in_tx.ok_or(BridgeError::ProviderError(format!(
275 "Couldn't find matching log: {:?} in transaction {}",
276 log, tx_hash
277 )))?;
278
279 Ok(EthLog {
280 block_number,
281 tx_hash,
282 log_index_in_tx: log_index_in_tx as u16,
283 log,
284 })
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use alloy::rpc::types::TransactionReceipt;
291 use prometheus::Registry;
292
293 use super::*;
294 use crate::test_utils::{
295 get_test_log_and_action, make_transaction_receipt, mock_last_finalized_block,
296 };
297
298 #[tokio::test]
299 async fn test_get_finalized_bridge_action_maybe() {
300 telemetry_subscribers::init_for_testing();
301 let registry = Registry::new();
302 mysten_metrics::init_metrics(®istry);
303 let mock_service = EthMockService::new();
304 mock_last_finalized_block(&mock_service, 777);
305
306 let client = EthClient::new_mocked(
307 mock_service.clone(),
308 HashSet::from_iter(vec![EthAddress::default()]),
309 );
310 let result = client.get_last_finalized_block_id().await.unwrap();
311 assert_eq!(result, 777);
312
313 let eth_tx_hash = TxHash::random();
314 let log = Log {
315 transaction_hash: Some(eth_tx_hash),
316 block_number: Some(778),
317 ..Default::default()
318 };
319 let (good_log, bridge_action) =
320 get_test_log_and_action(EthAddress::default(), eth_tx_hash, 1);
321 mock_service
323 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
324 "eth_getTransactionReceipt",
325 [log.transaction_hash.unwrap()],
326 make_transaction_receipt(
327 EthAddress::default(),
328 log.block_number,
329 vec![log.clone(), good_log],
330 ),
331 )
332 .unwrap();
333
334 let error = client
335 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
336 .await
337 .unwrap_err();
338 match error {
339 BridgeError::TxNotFinalized => {}
340 _ => panic!("expected TxNotFinalized"),
341 };
342
343 mock_last_finalized_block(&mock_service, 778);
345
346 mock_service
347 .add_response(
348 "eth_getBlockByNumber",
349 (format!("0x{:x}", 778), false),
350 make_transaction_receipt(EthAddress::default(), Some(778), vec![log.clone()]),
351 )
352 .unwrap();
353
354 let error = client
355 .get_finalized_bridge_action_maybe(eth_tx_hash, 2)
356 .await
357 .unwrap_err();
358 match error {
360 BridgeError::NoBridgeEventsInTxPosition => {}
361 _ => panic!("expected NoBridgeEventsInTxPosition"),
362 };
363
364 let error = client
365 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
366 .await
367 .unwrap_err();
368 match error {
370 BridgeError::NoBridgeEventsInTxPosition => {}
371 _ => panic!("expected NoBridgeEventsInTxPosition"),
372 };
373
374 let action = client
375 .get_finalized_bridge_action_maybe(eth_tx_hash, 1)
376 .await
377 .unwrap();
378 assert_eq!(action, bridge_action);
379 }
380
381 #[tokio::test]
382 async fn test_get_finalized_bridge_action_maybe_unrecognized_contract() {
383 telemetry_subscribers::init_for_testing();
384 let registry = Registry::new();
385 mysten_metrics::init_metrics(®istry);
386 let mock_service = EthMockService::new();
387 mock_last_finalized_block(&mock_service, 777);
388
389 let client = EthClient::new_mocked(
390 mock_service.clone(),
391 HashSet::from_iter(vec![
392 EthAddress::repeat_byte(5),
393 EthAddress::repeat_byte(6),
394 EthAddress::repeat_byte(7),
395 ]),
396 );
397 let result = client.get_last_finalized_block_id().await.unwrap();
398 assert_eq!(result, 777);
399
400 let eth_tx_hash = TxHash::random();
401 let (log, _bridge_action) =
403 get_test_log_and_action(EthAddress::repeat_byte(4), eth_tx_hash, 0);
404 mock_service
405 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
406 "eth_getTransactionReceipt",
407 [log.transaction_hash.unwrap()],
408 make_transaction_receipt(
409 EthAddress::default(),
410 log.block_number,
411 vec![log.clone()],
412 ),
413 )
414 .unwrap();
415 mock_service
416 .add_response(
417 "eth_getBlockByNumber",
418 (format!("0x{:x}", 777), false),
419 make_transaction_receipt(EthAddress::default(), Some(777), vec![log.clone()]),
420 )
421 .unwrap();
422
423 let error = client
424 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
425 .await
426 .unwrap_err();
427 match error {
428 BridgeError::BridgeEventInUnrecognizedEthContract => {}
429 _ => panic!("expected TxNotFinalized"),
430 };
431
432 let (log, bridge_action) =
434 get_test_log_and_action(EthAddress::repeat_byte(6), eth_tx_hash, 0);
435 mock_service
436 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
437 "eth_getTransactionReceipt",
438 [log.transaction_hash.unwrap()],
439 make_transaction_receipt(
440 EthAddress::default(),
441 log.block_number,
442 vec![log.clone()],
443 ),
444 )
445 .unwrap();
446 mock_service
447 .add_response(
448 "eth_getBlockByNumber",
449 (format!("0x{:x}", 777), false),
450 make_transaction_receipt(EthAddress::default(), Some(777), vec![log.clone()]),
451 )
452 .unwrap();
453 let action = client
454 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
455 .await
456 .unwrap();
457 assert_eq!(action, bridge_action);
458 }
459}