sui_bridge/
eth_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::abi::EthBridgeEvent;
5use crate::error::{BridgeError, BridgeResult};
6use crate::metered_eth_provider::new_metered_eth_provider;
7use crate::metrics::BridgeMetrics;
8use crate::types::{BridgeAction, EthLog, RawEthLog};
9use crate::utils::EthProvider;
10use alloy::primitives::{Address as EthAddress, TxHash};
11use alloy::providers::Provider;
12use alloy::rpc::types::{Block, Filter, Log};
13use std::collections::HashSet;
14use std::sync::Arc;
15use tap::TapFallible;
16
17#[cfg(test)]
18use crate::eth_mock_provider::EthMockService;
19
20pub struct EthClient {
21    provider: EthProvider,
22    contract_addresses: HashSet<EthAddress>,
23}
24
25impl EthClient {
26    pub async fn new(
27        provider_url: &str,
28        contract_addresses: HashSet<EthAddress>,
29        metrics: Arc<BridgeMetrics>,
30    ) -> anyhow::Result<Self> {
31        let provider = new_metered_eth_provider(provider_url, metrics)?;
32        let self_ = Self {
33            provider,
34            contract_addresses,
35        };
36        self_.describe().await?;
37        Ok(self_)
38    }
39
40    pub fn provider(&self) -> EthProvider {
41        self.provider.clone()
42    }
43}
44
45#[cfg(test)]
46impl EthClient {
47    pub fn new_mocked(
48        mock_service: EthMockService,
49        contract_addresses: HashSet<EthAddress>,
50    ) -> Self {
51        let provider = mock_service.as_provider();
52        Self {
53            provider,
54            contract_addresses,
55        }
56    }
57}
58
59impl EthClient {
60    pub async fn get_chain_id(&self) -> Result<u64, anyhow::Error> {
61        let chain_id = self.provider.get_chain_id().await?;
62        Ok(chain_id)
63    }
64
65    // TODO assert chain identifier
66    async fn describe(&self) -> anyhow::Result<()> {
67        let chain_id = self.get_chain_id().await?;
68        let block_number = self.provider.get_block_number().await?;
69        tracing::info!(
70            "EthClient is connected to chain {chain_id}, current block number: {block_number}"
71        );
72        Ok(())
73    }
74
75    /// Returns BridgeAction from an Eth Transaction with transaction hash
76    /// and the event index. If event is declared in an unrecognized
77    /// contract, return error.
78    pub async fn get_finalized_bridge_action_maybe(
79        &self,
80        tx_hash: TxHash,
81        event_idx: u16,
82    ) -> BridgeResult<BridgeAction> {
83        let receipt = self
84            .provider
85            .get_transaction_receipt(tx_hash)
86            .await
87            .map_err(BridgeError::from)?
88            .ok_or(BridgeError::TxNotFound)?;
89        let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
90            "Provider returns log without block_number".into(),
91        ))?;
92        // TODO: save the latest finalized block id so we don't have to query it every time
93        let last_finalized_block_id = self.get_last_finalized_block_id().await?;
94        if receipt_block_num > last_finalized_block_id {
95            return Err(BridgeError::TxNotFinalized);
96        }
97
98        let log = receipt
99            .logs()
100            .get(event_idx as usize)
101            .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
102
103        // Ignore events emitted from unrecognized contracts
104        if !self.contract_addresses.contains(&log.address()) {
105            return Err(BridgeError::BridgeEventInUnrecognizedEthContract);
106        }
107
108        let eth_log = EthLog {
109            block_number: receipt_block_num,
110            tx_hash,
111            log_index_in_tx: event_idx,
112            log: log.clone(),
113        };
114        let bridge_event = EthBridgeEvent::try_from_eth_log(&eth_log)
115            .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
116        bridge_event
117            .try_into_bridge_action(tx_hash, event_idx)?
118            .ok_or(BridgeError::BridgeEventNotActionable)
119    }
120
121    pub async fn get_last_finalized_block_id(&self) -> BridgeResult<u64> {
122        let block: Option<Block> = self
123            .provider
124            .raw_request("eth_getBlockByNumber".into(), ("finalized", false))
125            .await?;
126        let block = block.ok_or(BridgeError::TransientProviderError(
127            "Provider fails to return last finalized block".into(),
128        ))?;
129        Ok(block.number())
130    }
131
132    // Note: query may fail if range is too big. Callsite is responsible
133    // for chunking the query.
134    pub async fn get_events_in_range(
135        &self,
136        address: alloy::primitives::Address,
137        start_block: u64,
138        end_block: u64,
139    ) -> BridgeResult<Vec<EthLog>> {
140        let filter = Filter::new()
141            .from_block(start_block)
142            .to_block(end_block)
143            .address(address);
144        let logs = self
145            .provider
146            // TODO use get_logs_paginated?
147            .get_logs(&filter)
148            .await
149            .map_err(BridgeError::from)
150            .tap_err(|e| {
151                tracing::error!(
152                    "get_events_in_range failed. Filter: {:?}. Error {:?}",
153                    filter,
154                    e
155                )
156            })?;
157
158        // Safeguard check that all events are emitted from requested contract address
159        if logs.iter().any(|log| log.address() != address) {
160            return Err(BridgeError::ProviderError(format!(
161                "Provider returns logs from different contract address (expected: {:?}): {:?}",
162                address, logs
163            )));
164        }
165        if logs.is_empty() {
166            return Ok(vec![]);
167        }
168
169        let tasks = logs.into_iter().map(|log| self.get_log_tx_details(log));
170        futures::future::join_all(tasks)
171            .await
172            .into_iter()
173            .collect::<Result<Vec<_>, _>>()
174            .tap_err(|e| {
175                tracing::error!(
176                    "get_log_tx_details failed. Filter: {:?}. Error {:?}",
177                    filter,
178                    e
179                )
180            })
181    }
182
183    // Note: query may fail if range is too big. Callsite is responsible
184    // for chunking the query.
185    pub async fn get_raw_events_in_range(
186        &self,
187        addresses: Vec<EthAddress>,
188        start_block: u64,
189        end_block: u64,
190    ) -> BridgeResult<Vec<RawEthLog>> {
191        let filter = Filter::new()
192            .from_block(start_block)
193            .to_block(end_block)
194            .address(addresses.clone());
195        let logs = self
196            .provider
197            .get_logs(&filter)
198            .await
199            .map_err(BridgeError::from)
200            .tap_err(|e| {
201                tracing::error!(
202                    "get_events_in_range failed. Filter: {:?}. Error {:?}",
203                    filter,
204                    e
205                )
206            })?;
207        // Safeguard check that all events are emitted from requested contract addresses
208        logs.into_iter().map(
209            |log| {
210                if !addresses.contains(&log.address()) {
211                    return Err(BridgeError::ProviderError(format!("Provider returns logs from different contract address (expected: {:?}): {:?}", addresses, log)));
212                }
213                Ok(RawEthLog {
214                block_number: log.block_number.ok_or(BridgeError::ProviderError("Provider returns log without block_number".into()))?,
215                tx_hash: log.transaction_hash.ok_or(BridgeError::ProviderError("Provider returns log without transaction_hash".into()))?,
216                log,
217            })}
218        ).collect::<Result<Vec<_>, _>>()
219    }
220
221    /// This function converts a `Log` to `EthLog`, to make sure the `block_num`, `tx_hash` and `log_index_in_tx`
222    /// are available for downstream.
223    // It's frustratingly ugly because of the nulliability of many fields in `Log`.
224    async fn get_log_tx_details(&self, log: Log) -> BridgeResult<EthLog> {
225        let block_number = log.block_number.ok_or(BridgeError::ProviderError(
226            "Provider returns log without block_number".into(),
227        ))?;
228        let tx_hash = log.transaction_hash.ok_or(BridgeError::ProviderError(
229            "Provider returns log without transaction_hash".into(),
230        ))?;
231        // This is the log index in the block, rather than transaction.
232        let log_index = log.log_index.ok_or(BridgeError::ProviderError(
233            "Provider returns log without log_index".into(),
234        ))?;
235
236        // Now get the log's index in the transaction. There is `transaction_log_index` field in
237        // `Log`, but I never saw it populated.
238
239        let receipt = self
240            .provider
241            .get_transaction_receipt(tx_hash)
242            .await
243            .map_err(BridgeError::from)?
244            .ok_or(BridgeError::ProviderError(format!(
245                "Provide cannot find eth transaction for log: {:?})",
246                log
247            )))?;
248
249        let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
250            "Provider returns log without block_number".into(),
251        ))?;
252        if receipt_block_num != block_number {
253            return Err(BridgeError::ProviderError(format!(
254                "Provider returns receipt with different block number from log. Receipt: {:?}, Log: {:?}",
255                receipt, log
256            )));
257        }
258
259        // Find the log index in the transaction
260        let mut log_index_in_tx = None;
261        for (idx, receipt_log) in receipt.logs().iter().enumerate() {
262            // match log index (in the block)
263            if receipt_log.log_index == Some(log_index) {
264                // make sure the topics and data match
265                if receipt_log.topics() != log.topics() || receipt_log.data() != log.data() {
266                    return Err(BridgeError::ProviderError(format!(
267                        "Provider returns receipt with different log from log. Receipt: {:?}, Log: {:?}",
268                        receipt, log
269                    )));
270                }
271                log_index_in_tx = Some(idx);
272            }
273        }
274        let log_index_in_tx = log_index_in_tx.ok_or(BridgeError::ProviderError(format!(
275            "Couldn't find matching log: {:?} in transaction {}",
276            log, tx_hash
277        )))?;
278
279        Ok(EthLog {
280            block_number,
281            tx_hash,
282            log_index_in_tx: log_index_in_tx as u16,
283            log,
284        })
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use alloy::rpc::types::TransactionReceipt;
291    use prometheus::Registry;
292
293    use super::*;
294    use crate::test_utils::{
295        get_test_log_and_action, make_transaction_receipt, mock_last_finalized_block,
296    };
297
298    #[tokio::test]
299    async fn test_get_finalized_bridge_action_maybe() {
300        telemetry_subscribers::init_for_testing();
301        let registry = Registry::new();
302        mysten_metrics::init_metrics(&registry);
303        let mock_service = EthMockService::new();
304        mock_last_finalized_block(&mock_service, 777);
305
306        let client = EthClient::new_mocked(
307            mock_service.clone(),
308            HashSet::from_iter(vec![EthAddress::default()]),
309        );
310        let result = client.get_last_finalized_block_id().await.unwrap();
311        assert_eq!(result, 777);
312
313        let eth_tx_hash = TxHash::random();
314        let log = Log {
315            transaction_hash: Some(eth_tx_hash),
316            block_number: Some(778),
317            ..Default::default()
318        };
319        let (good_log, bridge_action) =
320            get_test_log_and_action(EthAddress::default(), eth_tx_hash, 1);
321        // Mocks `eth_getTransactionReceipt` to return `log` and `good_log` in order
322        mock_service
323            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
324                "eth_getTransactionReceipt",
325                [log.transaction_hash.unwrap()],
326                make_transaction_receipt(
327                    EthAddress::default(),
328                    log.block_number,
329                    vec![log.clone(), good_log],
330                ),
331            )
332            .unwrap();
333
334        let error = client
335            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
336            .await
337            .unwrap_err();
338        match error {
339            BridgeError::TxNotFinalized => {}
340            _ => panic!("expected TxNotFinalized"),
341        };
342
343        // 778 is now finalized
344        mock_last_finalized_block(&mock_service, 778);
345
346        mock_service
347            .add_response(
348                "eth_getBlockByNumber",
349                (format!("0x{:x}", 778), false),
350                make_transaction_receipt(EthAddress::default(), Some(778), vec![log.clone()]),
351            )
352            .unwrap();
353
354        let error = client
355            .get_finalized_bridge_action_maybe(eth_tx_hash, 2)
356            .await
357            .unwrap_err();
358        // Receipt only has 2 logs
359        match error {
360            BridgeError::NoBridgeEventsInTxPosition => {}
361            _ => panic!("expected NoBridgeEventsInTxPosition"),
362        };
363
364        let error = client
365            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
366            .await
367            .unwrap_err();
368        // Same, `log` is not a BridgeEvent
369        match error {
370            BridgeError::NoBridgeEventsInTxPosition => {}
371            _ => panic!("expected NoBridgeEventsInTxPosition"),
372        };
373
374        let action = client
375            .get_finalized_bridge_action_maybe(eth_tx_hash, 1)
376            .await
377            .unwrap();
378        assert_eq!(action, bridge_action);
379    }
380
381    #[tokio::test]
382    async fn test_get_finalized_bridge_action_maybe_unrecognized_contract() {
383        telemetry_subscribers::init_for_testing();
384        let registry = Registry::new();
385        mysten_metrics::init_metrics(&registry);
386        let mock_service = EthMockService::new();
387        mock_last_finalized_block(&mock_service, 777);
388
389        let client = EthClient::new_mocked(
390            mock_service.clone(),
391            HashSet::from_iter(vec![
392                EthAddress::repeat_byte(5),
393                EthAddress::repeat_byte(6),
394                EthAddress::repeat_byte(7),
395            ]),
396        );
397        let result = client.get_last_finalized_block_id().await.unwrap();
398        assert_eq!(result, 777);
399
400        let eth_tx_hash = TxHash::random();
401        // Event emitted from a different contract address
402        let (log, _bridge_action) =
403            get_test_log_and_action(EthAddress::repeat_byte(4), eth_tx_hash, 0);
404        mock_service
405            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
406                "eth_getTransactionReceipt",
407                [log.transaction_hash.unwrap()],
408                make_transaction_receipt(
409                    EthAddress::default(),
410                    log.block_number,
411                    vec![log.clone()],
412                ),
413            )
414            .unwrap();
415        mock_service
416            .add_response(
417                "eth_getBlockByNumber",
418                (format!("0x{:x}", 777), false),
419                make_transaction_receipt(EthAddress::default(), Some(777), vec![log.clone()]),
420            )
421            .unwrap();
422
423        let error = client
424            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
425            .await
426            .unwrap_err();
427        match error {
428            BridgeError::BridgeEventInUnrecognizedEthContract => {}
429            _ => panic!("expected TxNotFinalized"),
430        };
431
432        // Ok if emitted from the right contract
433        let (log, bridge_action) =
434            get_test_log_and_action(EthAddress::repeat_byte(6), eth_tx_hash, 0);
435        mock_service
436            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
437                "eth_getTransactionReceipt",
438                [log.transaction_hash.unwrap()],
439                make_transaction_receipt(
440                    EthAddress::default(),
441                    log.block_number,
442                    vec![log.clone()],
443                ),
444            )
445            .unwrap();
446        mock_service
447            .add_response(
448                "eth_getBlockByNumber",
449                (format!("0x{:x}", 777), false),
450                make_transaction_receipt(EthAddress::default(), Some(777), vec![log.clone()]),
451            )
452            .unwrap();
453        let action = client
454            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
455            .await
456            .unwrap();
457        assert_eq!(action, bridge_action);
458    }
459}