1use std::collections::HashSet;
5use std::sync::Arc;
6
7use crate::abi::EthBridgeEvent;
8use crate::error::{BridgeError, BridgeResult};
9use crate::metered_eth_provider::{MeteredEthHttpProvier, new_metered_eth_provider};
10use crate::metrics::BridgeMetrics;
11use crate::types::{BridgeAction, EthLog, RawEthLog};
12use ethers::providers::{JsonRpcClient, Middleware, Provider};
13use ethers::types::TxHash;
14use ethers::types::{Block, Filter};
15use tap::TapFallible;
16
17#[cfg(test)]
18use crate::eth_mock_provider::EthMockProvider;
19use ethers::types::Address as EthAddress;
20pub struct EthClient<P> {
21    provider: Provider<P>,
22    contract_addresses: HashSet<EthAddress>,
23}
24
25impl EthClient<MeteredEthHttpProvier> {
26    pub async fn new(
27        provider_url: &str,
28        contract_addresses: HashSet<EthAddress>,
29        metrics: Arc<BridgeMetrics>,
30    ) -> anyhow::Result<Self> {
31        let provider = new_metered_eth_provider(provider_url, metrics)?;
32        let self_ = Self {
33            provider,
34            contract_addresses,
35        };
36        self_.describe().await?;
37        Ok(self_)
38    }
39
40    pub fn provider(&self) -> Arc<Provider<MeteredEthHttpProvier>> {
41        Arc::new(self.provider.clone())
42    }
43}
44
45#[cfg(test)]
46impl EthClient<EthMockProvider> {
47    pub fn new_mocked(provider: EthMockProvider, contract_addresses: HashSet<EthAddress>) -> Self {
48        let provider = Provider::new(provider);
49        Self {
50            provider,
51            contract_addresses,
52        }
53    }
54}
55
56impl<P> EthClient<P>
57where
58    P: JsonRpcClient,
59{
60    pub async fn get_chain_id(&self) -> Result<u64, anyhow::Error> {
61        let chain_id = self.provider.get_chainid().await?;
62        Ok(chain_id.as_u64())
63    }
64
65    async fn describe(&self) -> anyhow::Result<()> {
67        let chain_id = self.get_chain_id().await?;
68        let block_number = self.provider.get_block_number().await?;
69        tracing::info!(
70            "EthClient is connected to chain {chain_id}, current block number: {block_number}"
71        );
72        Ok(())
73    }
74
75    pub async fn get_finalized_bridge_action_maybe(
79        &self,
80        tx_hash: TxHash,
81        event_idx: u16,
82    ) -> BridgeResult<BridgeAction> {
83        let receipt = self
84            .provider
85            .get_transaction_receipt(tx_hash)
86            .await
87            .map_err(BridgeError::from)?
88            .ok_or(BridgeError::TxNotFound)?;
89        let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
90            "Provider returns log without block_number".into(),
91        ))?;
92        let last_finalized_block_id = self.get_last_finalized_block_id().await?;
94        if receipt_block_num.as_u64() > last_finalized_block_id {
95            return Err(BridgeError::TxNotFinalized);
96        }
97        let log = receipt
98            .logs
99            .get(event_idx as usize)
100            .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
101
102        if !self.contract_addresses.contains(&log.address) {
104            return Err(BridgeError::BridgeEventInUnrecognizedEthContract);
105        }
106
107        let eth_log = EthLog {
108            block_number: receipt_block_num.as_u64(),
109            tx_hash,
110            log_index_in_tx: event_idx,
111            log: log.clone(),
112        };
113        let bridge_event = EthBridgeEvent::try_from_eth_log(ð_log)
114            .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
115        bridge_event
116            .try_into_bridge_action(tx_hash, event_idx)?
117            .ok_or(BridgeError::BridgeEventNotActionable)
118    }
119
120    pub async fn get_last_finalized_block_id(&self) -> BridgeResult<u64> {
121        let block: Result<Option<Block<ethers::types::TxHash>>, ethers::prelude::ProviderError> =
122            self.provider
123                .request("eth_getBlockByNumber", ("finalized", false))
124                .await;
125        let block = block?.ok_or(BridgeError::TransientProviderError(
126            "Provider fails to return last finalized block".into(),
127        ))?;
128        let number = block.number.ok_or(BridgeError::TransientProviderError(
129            "Provider returns block without number".into(),
130        ))?;
131        Ok(number.as_u64())
132    }
133
134    pub async fn get_events_in_range(
137        &self,
138        address: ethers::types::Address,
139        start_block: u64,
140        end_block: u64,
141    ) -> BridgeResult<Vec<EthLog>> {
142        let filter = Filter::new()
143            .from_block(start_block)
144            .to_block(end_block)
145            .address(address);
146        let logs = self
147            .provider
148            .get_logs(&filter)
150            .await
151            .map_err(BridgeError::from)
152            .tap_err(|e| {
153                tracing::error!(
154                    "get_events_in_range failed. Filter: {:?}. Error {:?}",
155                    filter,
156                    e
157                )
158            })?;
159
160        if logs.iter().any(|log| log.address != address) {
162            return Err(BridgeError::ProviderError(format!(
163                "Provider returns logs from different contract address (expected: {:?}): {:?}",
164                address, logs
165            )));
166        }
167        if logs.is_empty() {
168            return Ok(vec![]);
169        }
170
171        let tasks = logs.into_iter().map(|log| self.get_log_tx_details(log));
172        futures::future::join_all(tasks)
173            .await
174            .into_iter()
175            .collect::<Result<Vec<_>, _>>()
176            .tap_err(|e| {
177                tracing::error!(
178                    "get_log_tx_details failed. Filter: {:?}. Error {:?}",
179                    filter,
180                    e
181                )
182            })
183    }
184
185    pub async fn get_raw_events_in_range(
188        &self,
189        addresses: Vec<ethers::types::Address>,
190        start_block: u64,
191        end_block: u64,
192    ) -> BridgeResult<Vec<RawEthLog>> {
193        let filter = Filter::new()
194            .from_block(start_block)
195            .to_block(end_block)
196            .address(addresses.clone());
197        let logs = self
198            .provider
199            .get_logs(&filter)
200            .await
201            .map_err(BridgeError::from)
202            .tap_err(|e| {
203                tracing::error!(
204                    "get_events_in_range failed. Filter: {:?}. Error {:?}",
205                    filter,
206                    e
207                )
208            })?;
209        logs.into_iter().map(
211            |log| {
212                if !addresses.contains(&log.address) {
213                    return Err(BridgeError::ProviderError(format!("Provider returns logs from different contract address (expected: {:?}): {:?}", addresses, log)));
214                }
215                Ok(RawEthLog {
216                block_number: log.block_number.ok_or(BridgeError::ProviderError("Provider returns log without block_number".into()))?.as_u64(),
217                tx_hash: log.transaction_hash.ok_or(BridgeError::ProviderError("Provider returns log without transaction_hash".into()))?,
218                log,
219            })}
220        ).collect::<Result<Vec<_>, _>>()
221    }
222
223    async fn get_log_tx_details(&self, log: ethers::types::Log) -> BridgeResult<EthLog> {
227        let block_number = log
228            .block_number
229            .ok_or(BridgeError::ProviderError(
230                "Provider returns log without block_number".into(),
231            ))?
232            .as_u64();
233        let tx_hash = log.transaction_hash.ok_or(BridgeError::ProviderError(
234            "Provider returns log without transaction_hash".into(),
235        ))?;
236        let log_index = log.log_index.ok_or(BridgeError::ProviderError(
238            "Provider returns log without log_index".into(),
239        ))?;
240
241        let receipt = self
245            .provider
246            .get_transaction_receipt(tx_hash)
247            .await
248            .map_err(BridgeError::from)?
249            .ok_or(BridgeError::ProviderError(format!(
250                "Provide cannot find eth transaction for log: {:?})",
251                log
252            )))?;
253
254        let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
255            "Provider returns log without block_number".into(),
256        ))?;
257        if receipt_block_num.as_u64() != block_number {
258            return Err(BridgeError::ProviderError(format!(
259                "Provider returns receipt with different block number from log. Receipt: {:?}, Log: {:?}",
260                receipt, log
261            )));
262        }
263
264        let mut log_index_in_tx = None;
266        for (idx, receipt_log) in receipt.logs.iter().enumerate() {
267            if receipt_log.log_index == Some(log_index) {
269                if receipt_log.topics != log.topics || receipt_log.data != log.data {
271                    return Err(BridgeError::ProviderError(format!(
272                        "Provider returns receipt with different log from log. Receipt: {:?}, Log: {:?}",
273                        receipt, log
274                    )));
275                }
276                log_index_in_tx = Some(idx);
277            }
278        }
279        let log_index_in_tx = log_index_in_tx.ok_or(BridgeError::ProviderError(format!(
280            "Couldn't find matching log: {:?} in transaction {}",
281            log, tx_hash
282        )))?;
283
284        Ok(EthLog {
285            block_number,
286            tx_hash,
287            log_index_in_tx: log_index_in_tx as u16,
288            log,
289        })
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use ethers::types::{Address as EthAddress, Log, TransactionReceipt, U64};
296    use prometheus::Registry;
297
298    use super::*;
299    use crate::test_utils::{get_test_log_and_action, mock_last_finalized_block};
300
301    #[tokio::test]
302    async fn test_get_finalized_bridge_action_maybe() {
303        telemetry_subscribers::init_for_testing();
304        let registry = Registry::new();
305        mysten_metrics::init_metrics(®istry);
306        let mock_provider = EthMockProvider::new();
307        mock_last_finalized_block(&mock_provider, 777);
308
309        let client = EthClient::new_mocked(
310            mock_provider.clone(),
311            HashSet::from_iter(vec![EthAddress::zero()]),
312        );
313        let result = client.get_last_finalized_block_id().await.unwrap();
314        assert_eq!(result, 777);
315
316        let eth_tx_hash = TxHash::random();
317        let log = Log {
318            transaction_hash: Some(eth_tx_hash),
319            block_number: Some(U64::from(778)),
320            ..Default::default()
321        };
322        let (good_log, bridge_action) = get_test_log_and_action(EthAddress::zero(), eth_tx_hash, 1);
323        mock_provider
325            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
326                "eth_getTransactionReceipt",
327                [log.transaction_hash.unwrap()],
328                TransactionReceipt {
329                    block_number: log.block_number,
330                    logs: vec![log, good_log],
331                    ..Default::default()
332                },
333            )
334            .unwrap();
335
336        let error = client
337            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
338            .await
339            .unwrap_err();
340        match error {
341            BridgeError::TxNotFinalized => {}
342            _ => panic!("expected TxNotFinalized"),
343        };
344
345        mock_last_finalized_block(&mock_provider, 778);
347
348        let error = client
349            .get_finalized_bridge_action_maybe(eth_tx_hash, 2)
350            .await
351            .unwrap_err();
352        match error {
354            BridgeError::NoBridgeEventsInTxPosition => {}
355            _ => panic!("expected NoBridgeEventsInTxPosition"),
356        };
357
358        let error = client
359            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
360            .await
361            .unwrap_err();
362        match error {
364            BridgeError::NoBridgeEventsInTxPosition => {}
365            _ => panic!("expected NoBridgeEventsInTxPosition"),
366        };
367
368        let action = client
369            .get_finalized_bridge_action_maybe(eth_tx_hash, 1)
370            .await
371            .unwrap();
372        assert_eq!(action, bridge_action);
373    }
374
375    #[tokio::test]
376    async fn test_get_finalized_bridge_action_maybe_unrecognized_contract() {
377        telemetry_subscribers::init_for_testing();
378        let registry = Registry::new();
379        mysten_metrics::init_metrics(®istry);
380        let mock_provider = EthMockProvider::new();
381        mock_last_finalized_block(&mock_provider, 777);
382
383        let client = EthClient::new_mocked(
384            mock_provider.clone(),
385            HashSet::from_iter(vec![
386                EthAddress::repeat_byte(5),
387                EthAddress::repeat_byte(6),
388                EthAddress::repeat_byte(7),
389            ]),
390        );
391        let result = client.get_last_finalized_block_id().await.unwrap();
392        assert_eq!(result, 777);
393
394        let eth_tx_hash = TxHash::random();
395        let (log, _bridge_action) =
397            get_test_log_and_action(EthAddress::repeat_byte(4), eth_tx_hash, 0);
398        mock_provider
399            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
400                "eth_getTransactionReceipt",
401                [log.transaction_hash.unwrap()],
402                TransactionReceipt {
403                    block_number: log.block_number,
404                    logs: vec![log],
405                    ..Default::default()
406                },
407            )
408            .unwrap();
409
410        let error = client
411            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
412            .await
413            .unwrap_err();
414        match error {
415            BridgeError::BridgeEventInUnrecognizedEthContract => {}
416            _ => panic!("expected TxNotFinalized"),
417        };
418
419        let (log, bridge_action) =
421            get_test_log_and_action(EthAddress::repeat_byte(6), eth_tx_hash, 0);
422        mock_provider
423            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
424                "eth_getTransactionReceipt",
425                [log.transaction_hash.unwrap()],
426                TransactionReceipt {
427                    block_number: log.block_number,
428                    logs: vec![log],
429                    ..Default::default()
430                },
431            )
432            .unwrap();
433        let action = client
434            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
435            .await
436            .unwrap();
437        assert_eq!(action, bridge_action);
438    }
439}