1use std::collections::HashSet;
5use std::sync::Arc;
6
7use crate::abi::EthBridgeEvent;
8use crate::error::{BridgeError, BridgeResult};
9use crate::metered_eth_provider::{MeteredEthHttpProvider, new_metered_eth_provider};
10use crate::metrics::BridgeMetrics;
11use crate::types::{BridgeAction, EthLog, RawEthLog};
12use ethers::providers::{JsonRpcClient, Middleware, Provider};
13use ethers::types::TxHash;
14use ethers::types::{Block, Filter};
15use tap::TapFallible;
16
17#[cfg(test)]
18use crate::eth_mock_provider::EthMockProvider;
19use ethers::types::Address as EthAddress;
20pub struct EthClient<P> {
21 provider: Provider<P>,
22 contract_addresses: HashSet<EthAddress>,
23}
24
25impl EthClient<MeteredEthHttpProvider> {
26 pub async fn new(
27 provider_url: &str,
28 contract_addresses: HashSet<EthAddress>,
29 metrics: Arc<BridgeMetrics>,
30 ) -> anyhow::Result<Self> {
31 let provider = new_metered_eth_provider(provider_url, metrics)?;
32 let self_ = Self {
33 provider,
34 contract_addresses,
35 };
36 self_.describe().await?;
37 Ok(self_)
38 }
39
40 pub fn provider(&self) -> Arc<Provider<MeteredEthHttpProvider>> {
41 Arc::new(self.provider.clone())
42 }
43}
44
45#[cfg(test)]
46impl EthClient<EthMockProvider> {
47 pub fn new_mocked(provider: EthMockProvider, contract_addresses: HashSet<EthAddress>) -> Self {
48 let provider = Provider::new(provider);
49 Self {
50 provider,
51 contract_addresses,
52 }
53 }
54}
55
56impl<P> EthClient<P>
57where
58 P: JsonRpcClient,
59{
60 pub async fn get_chain_id(&self) -> Result<u64, anyhow::Error> {
61 let chain_id = self.provider.get_chainid().await?;
62 Ok(chain_id.as_u64())
63 }
64
65 async fn describe(&self) -> anyhow::Result<()> {
67 let chain_id = self.get_chain_id().await?;
68 let block_number = self.provider.get_block_number().await?;
69 tracing::info!(
70 "EthClient is connected to chain {chain_id}, current block number: {block_number}"
71 );
72 Ok(())
73 }
74
75 pub async fn get_finalized_bridge_action_maybe(
79 &self,
80 tx_hash: TxHash,
81 event_idx: u16,
82 ) -> BridgeResult<BridgeAction> {
83 let receipt = self
84 .provider
85 .get_transaction_receipt(tx_hash)
86 .await
87 .map_err(BridgeError::from)?
88 .ok_or(BridgeError::TxNotFound)?;
89 let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
90 "Provider returns log without block_number".into(),
91 ))?;
92 let last_finalized_block_id = self.get_last_finalized_block_id().await?;
94 if receipt_block_num.as_u64() > last_finalized_block_id {
95 return Err(BridgeError::TxNotFinalized);
96 }
97 let log = receipt
98 .logs
99 .get(event_idx as usize)
100 .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
101
102 if !self.contract_addresses.contains(&log.address) {
104 return Err(BridgeError::BridgeEventInUnrecognizedEthContract);
105 }
106
107 let eth_log = EthLog {
108 block_number: receipt_block_num.as_u64(),
109 tx_hash,
110 log_index_in_tx: event_idx,
111 log: log.clone(),
112 };
113 let bridge_event = EthBridgeEvent::try_from_eth_log(ð_log)
114 .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
115 bridge_event
116 .try_into_bridge_action(tx_hash, event_idx)?
117 .ok_or(BridgeError::BridgeEventNotActionable)
118 }
119
120 pub async fn get_last_finalized_block_id(&self) -> BridgeResult<u64> {
121 let block: Result<Option<Block<ethers::types::TxHash>>, ethers::prelude::ProviderError> =
122 self.provider
123 .request("eth_getBlockByNumber", ("finalized", false))
124 .await;
125 let block = block?.ok_or(BridgeError::TransientProviderError(
126 "Provider fails to return last finalized block".into(),
127 ))?;
128 let number = block.number.ok_or(BridgeError::TransientProviderError(
129 "Provider returns block without number".into(),
130 ))?;
131 Ok(number.as_u64())
132 }
133
134 pub async fn get_events_in_range(
137 &self,
138 address: ethers::types::Address,
139 start_block: u64,
140 end_block: u64,
141 ) -> BridgeResult<Vec<EthLog>> {
142 let filter = Filter::new()
143 .from_block(start_block)
144 .to_block(end_block)
145 .address(address);
146 let logs = self
147 .provider
148 .get_logs(&filter)
150 .await
151 .map_err(BridgeError::from)
152 .tap_err(|e| {
153 tracing::error!(
154 "get_events_in_range failed. Filter: {:?}. Error {:?}",
155 filter,
156 e
157 )
158 })?;
159
160 if logs.iter().any(|log| log.address != address) {
162 return Err(BridgeError::ProviderError(format!(
163 "Provider returns logs from different contract address (expected: {:?}): {:?}",
164 address, logs
165 )));
166 }
167 if logs.is_empty() {
168 return Ok(vec![]);
169 }
170
171 let tasks = logs.into_iter().map(|log| self.get_log_tx_details(log));
172 futures::future::join_all(tasks)
173 .await
174 .into_iter()
175 .collect::<Result<Vec<_>, _>>()
176 .tap_err(|e| {
177 tracing::error!(
178 "get_log_tx_details failed. Filter: {:?}. Error {:?}",
179 filter,
180 e
181 )
182 })
183 }
184
185 pub async fn get_raw_events_in_range(
188 &self,
189 addresses: Vec<ethers::types::Address>,
190 start_block: u64,
191 end_block: u64,
192 ) -> BridgeResult<Vec<RawEthLog>> {
193 let filter = Filter::new()
194 .from_block(start_block)
195 .to_block(end_block)
196 .address(addresses.clone());
197 let logs = self
198 .provider
199 .get_logs(&filter)
200 .await
201 .map_err(BridgeError::from)
202 .tap_err(|e| {
203 tracing::error!(
204 "get_events_in_range failed. Filter: {:?}. Error {:?}",
205 filter,
206 e
207 )
208 })?;
209 logs.into_iter().map(
211 |log| {
212 if !addresses.contains(&log.address) {
213 return Err(BridgeError::ProviderError(format!("Provider returns logs from different contract address (expected: {:?}): {:?}", addresses, log)));
214 }
215 Ok(RawEthLog {
216 block_number: log.block_number.ok_or(BridgeError::ProviderError("Provider returns log without block_number".into()))?.as_u64(),
217 tx_hash: log.transaction_hash.ok_or(BridgeError::ProviderError("Provider returns log without transaction_hash".into()))?,
218 log,
219 })}
220 ).collect::<Result<Vec<_>, _>>()
221 }
222
223 async fn get_log_tx_details(&self, log: ethers::types::Log) -> BridgeResult<EthLog> {
227 let block_number = log
228 .block_number
229 .ok_or(BridgeError::ProviderError(
230 "Provider returns log without block_number".into(),
231 ))?
232 .as_u64();
233 let tx_hash = log.transaction_hash.ok_or(BridgeError::ProviderError(
234 "Provider returns log without transaction_hash".into(),
235 ))?;
236 let log_index = log.log_index.ok_or(BridgeError::ProviderError(
238 "Provider returns log without log_index".into(),
239 ))?;
240
241 let receipt = self
245 .provider
246 .get_transaction_receipt(tx_hash)
247 .await
248 .map_err(BridgeError::from)?
249 .ok_or(BridgeError::ProviderError(format!(
250 "Provide cannot find eth transaction for log: {:?})",
251 log
252 )))?;
253
254 let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
255 "Provider returns log without block_number".into(),
256 ))?;
257 if receipt_block_num.as_u64() != block_number {
258 return Err(BridgeError::ProviderError(format!(
259 "Provider returns receipt with different block number from log. Receipt: {:?}, Log: {:?}",
260 receipt, log
261 )));
262 }
263
264 let mut log_index_in_tx = None;
266 for (idx, receipt_log) in receipt.logs.iter().enumerate() {
267 if receipt_log.log_index == Some(log_index) {
269 if receipt_log.topics != log.topics || receipt_log.data != log.data {
271 return Err(BridgeError::ProviderError(format!(
272 "Provider returns receipt with different log from log. Receipt: {:?}, Log: {:?}",
273 receipt, log
274 )));
275 }
276 log_index_in_tx = Some(idx);
277 }
278 }
279 let log_index_in_tx = log_index_in_tx.ok_or(BridgeError::ProviderError(format!(
280 "Couldn't find matching log: {:?} in transaction {}",
281 log, tx_hash
282 )))?;
283
284 Ok(EthLog {
285 block_number,
286 tx_hash,
287 log_index_in_tx: log_index_in_tx as u16,
288 log,
289 })
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use ethers::types::{Address as EthAddress, Log, TransactionReceipt, U64};
296 use prometheus::Registry;
297
298 use super::*;
299 use crate::test_utils::{get_test_log_and_action, mock_last_finalized_block};
300
301 #[tokio::test]
302 async fn test_get_finalized_bridge_action_maybe() {
303 telemetry_subscribers::init_for_testing();
304 let registry = Registry::new();
305 mysten_metrics::init_metrics(®istry);
306 let mock_provider = EthMockProvider::new();
307 mock_last_finalized_block(&mock_provider, 777);
308
309 let client = EthClient::new_mocked(
310 mock_provider.clone(),
311 HashSet::from_iter(vec![EthAddress::zero()]),
312 );
313 let result = client.get_last_finalized_block_id().await.unwrap();
314 assert_eq!(result, 777);
315
316 let eth_tx_hash = TxHash::random();
317 let log = Log {
318 transaction_hash: Some(eth_tx_hash),
319 block_number: Some(U64::from(778)),
320 ..Default::default()
321 };
322 let (good_log, bridge_action) = get_test_log_and_action(EthAddress::zero(), eth_tx_hash, 1);
323 mock_provider
325 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
326 "eth_getTransactionReceipt",
327 [log.transaction_hash.unwrap()],
328 TransactionReceipt {
329 block_number: log.block_number,
330 logs: vec![log, good_log],
331 ..Default::default()
332 },
333 )
334 .unwrap();
335
336 let error = client
337 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
338 .await
339 .unwrap_err();
340 match error {
341 BridgeError::TxNotFinalized => {}
342 _ => panic!("expected TxNotFinalized"),
343 };
344
345 mock_last_finalized_block(&mock_provider, 778);
347
348 let error = client
349 .get_finalized_bridge_action_maybe(eth_tx_hash, 2)
350 .await
351 .unwrap_err();
352 match error {
354 BridgeError::NoBridgeEventsInTxPosition => {}
355 _ => panic!("expected NoBridgeEventsInTxPosition"),
356 };
357
358 let error = client
359 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
360 .await
361 .unwrap_err();
362 match error {
364 BridgeError::NoBridgeEventsInTxPosition => {}
365 _ => panic!("expected NoBridgeEventsInTxPosition"),
366 };
367
368 let action = client
369 .get_finalized_bridge_action_maybe(eth_tx_hash, 1)
370 .await
371 .unwrap();
372 assert_eq!(action, bridge_action);
373 }
374
375 #[tokio::test]
376 async fn test_get_finalized_bridge_action_maybe_unrecognized_contract() {
377 telemetry_subscribers::init_for_testing();
378 let registry = Registry::new();
379 mysten_metrics::init_metrics(®istry);
380 let mock_provider = EthMockProvider::new();
381 mock_last_finalized_block(&mock_provider, 777);
382
383 let client = EthClient::new_mocked(
384 mock_provider.clone(),
385 HashSet::from_iter(vec![
386 EthAddress::repeat_byte(5),
387 EthAddress::repeat_byte(6),
388 EthAddress::repeat_byte(7),
389 ]),
390 );
391 let result = client.get_last_finalized_block_id().await.unwrap();
392 assert_eq!(result, 777);
393
394 let eth_tx_hash = TxHash::random();
395 let (log, _bridge_action) =
397 get_test_log_and_action(EthAddress::repeat_byte(4), eth_tx_hash, 0);
398 mock_provider
399 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
400 "eth_getTransactionReceipt",
401 [log.transaction_hash.unwrap()],
402 TransactionReceipt {
403 block_number: log.block_number,
404 logs: vec![log],
405 ..Default::default()
406 },
407 )
408 .unwrap();
409
410 let error = client
411 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
412 .await
413 .unwrap_err();
414 match error {
415 BridgeError::BridgeEventInUnrecognizedEthContract => {}
416 _ => panic!("expected TxNotFinalized"),
417 };
418
419 let (log, bridge_action) =
421 get_test_log_and_action(EthAddress::repeat_byte(6), eth_tx_hash, 0);
422 mock_provider
423 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
424 "eth_getTransactionReceipt",
425 [log.transaction_hash.unwrap()],
426 TransactionReceipt {
427 block_number: log.block_number,
428 logs: vec![log],
429 ..Default::default()
430 },
431 )
432 .unwrap();
433 let action = client
434 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
435 .await
436 .unwrap();
437 assert_eq!(action, bridge_action);
438 }
439}