sui_bridge/
eth_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::abi::EthBridgeEvent;
5use crate::error::{BridgeError, BridgeResult};
6use crate::metered_eth_provider::new_metered_eth_provider;
7use crate::metrics::BridgeMetrics;
8use crate::types::{BridgeAction, EthLog, RawEthLog};
9use crate::utils::EthProvider;
10use alloy::primitives::{Address as EthAddress, TxHash};
11use alloy::providers::Provider;
12use alloy::rpc::types::{Block, Filter, Log};
13use std::collections::HashSet;
14use std::sync::Arc;
15use tap::TapFallible;
16
17#[cfg(test)]
18use crate::eth_mock_provider::EthMockService;
19
20pub struct EthClient {
21    provider: EthProvider,
22    contract_addresses: HashSet<EthAddress>,
23}
24
25impl EthClient {
26    pub async fn new(
27        provider_url: &str,
28        contract_addresses: HashSet<EthAddress>,
29        metrics: Arc<BridgeMetrics>,
30    ) -> anyhow::Result<Self> {
31        let provider = new_metered_eth_provider(provider_url, metrics)?;
32        let self_ = Self {
33            provider,
34            contract_addresses,
35        };
36        self_.describe().await?;
37        Ok(self_)
38    }
39
40    /// Construct from a pre-built provider.
41    /// Use this when the provider has already been created, to avoid creating a new connection.
42    pub async fn from_provider(
43        provider: EthProvider,
44        contract_addresses: HashSet<EthAddress>,
45    ) -> anyhow::Result<Self> {
46        let self_ = Self {
47            provider,
48            contract_addresses,
49        };
50        self_.describe().await?;
51        Ok(self_)
52    }
53
54    pub fn provider(&self) -> EthProvider {
55        self.provider.clone()
56    }
57}
58
59#[cfg(test)]
60impl EthClient {
61    pub fn new_mocked(
62        mock_service: EthMockService,
63        contract_addresses: HashSet<EthAddress>,
64    ) -> Self {
65        let provider = mock_service.as_provider();
66        Self {
67            provider,
68            contract_addresses,
69        }
70    }
71}
72
73impl EthClient {
74    pub async fn get_chain_id(&self) -> Result<u64, anyhow::Error> {
75        let chain_id = self.provider.get_chain_id().await?;
76        Ok(chain_id)
77    }
78
79    // TODO assert chain identifier
80    async fn describe(&self) -> anyhow::Result<()> {
81        let chain_id = self.get_chain_id().await?;
82        let block_number = self.provider.get_block_number().await?;
83        tracing::info!(
84            "EthClient is connected to chain {chain_id}, current block number: {block_number}"
85        );
86        Ok(())
87    }
88
89    /// Returns BridgeAction from an Eth Transaction with transaction hash
90    /// and the event index. If event is declared in an unrecognized
91    /// contract, return error.
92    pub async fn get_finalized_bridge_action_maybe(
93        &self,
94        tx_hash: TxHash,
95        event_idx: u16,
96    ) -> BridgeResult<BridgeAction> {
97        // Query the finalized head first, then fetch receipt, to avoid a race where
98        // the receipt is observed before finalization and then reorged out.
99        // TODO: save the latest finalized block id so we don't have to query it every time
100        let last_finalized_block_id = self.get_last_finalized_block_id().await?;
101
102        let receipt = self
103            .provider
104            .get_transaction_receipt(tx_hash)
105            .await
106            .map_err(BridgeError::from)?
107            .ok_or(BridgeError::TxNotFound)?;
108        let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
109            "Provider returns log without block_number".into(),
110        ))?;
111        if receipt_block_num > last_finalized_block_id {
112            return Err(BridgeError::TxNotFinalized);
113        }
114
115        let log = receipt
116            .logs()
117            .get(event_idx as usize)
118            .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
119
120        // Ignore events emitted from unrecognized contracts
121        if !self.contract_addresses.contains(&log.address()) {
122            return Err(BridgeError::BridgeEventInUnrecognizedEthContract);
123        }
124
125        let eth_log = EthLog {
126            block_number: receipt_block_num,
127            tx_hash,
128            log_index_in_tx: event_idx,
129            log: log.clone(),
130        };
131        let bridge_event = EthBridgeEvent::try_from_eth_log(&eth_log)
132            .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
133        bridge_event
134            .try_into_bridge_action(tx_hash, event_idx)?
135            .ok_or(BridgeError::BridgeEventNotActionable)
136    }
137
138    pub async fn get_last_finalized_block_id(&self) -> BridgeResult<u64> {
139        let block: Option<Block> = self
140            .provider
141            .raw_request("eth_getBlockByNumber".into(), ("finalized", false))
142            .await?;
143        let block = block.ok_or(BridgeError::TransientProviderError(
144            "Provider fails to return last finalized block".into(),
145        ))?;
146        Ok(block.number())
147    }
148
149    // Note: query may fail if range is too big. Callsite is responsible
150    // for chunking the query.
151    pub async fn get_events_in_range(
152        &self,
153        address: alloy::primitives::Address,
154        start_block: u64,
155        end_block: u64,
156    ) -> BridgeResult<Vec<EthLog>> {
157        let filter = Filter::new()
158            .from_block(start_block)
159            .to_block(end_block)
160            .address(address);
161        let logs = self
162            .provider
163            // TODO use get_logs_paginated?
164            .get_logs(&filter)
165            .await
166            .map_err(BridgeError::from)
167            .tap_err(|e| {
168                tracing::error!(
169                    "get_events_in_range failed. Filter: {:?}. Error {:?}",
170                    filter,
171                    e
172                )
173            })?;
174
175        // Safeguard check that all events are emitted from requested contract address
176        if logs.iter().any(|log| log.address() != address) {
177            return Err(BridgeError::ProviderError(format!(
178                "Provider returns logs from different contract address (expected: {:?}): {:?}",
179                address, logs
180            )));
181        }
182        if logs.is_empty() {
183            return Ok(vec![]);
184        }
185
186        let tasks = logs.into_iter().map(|log| self.get_log_tx_details(log));
187        futures::future::join_all(tasks)
188            .await
189            .into_iter()
190            .collect::<Result<Vec<_>, _>>()
191            .tap_err(|e| {
192                tracing::error!(
193                    "get_log_tx_details failed. Filter: {:?}. Error {:?}",
194                    filter,
195                    e
196                )
197            })
198    }
199
200    // Note: query may fail if range is too big. Callsite is responsible
201    // for chunking the query.
202    pub async fn get_raw_events_in_range(
203        &self,
204        addresses: Vec<EthAddress>,
205        start_block: u64,
206        end_block: u64,
207    ) -> BridgeResult<Vec<RawEthLog>> {
208        let filter = Filter::new()
209            .from_block(start_block)
210            .to_block(end_block)
211            .address(addresses.clone());
212        let logs = self
213            .provider
214            .get_logs(&filter)
215            .await
216            .map_err(BridgeError::from)
217            .tap_err(|e| {
218                tracing::error!(
219                    "get_events_in_range failed. Filter: {:?}. Error {:?}",
220                    filter,
221                    e
222                )
223            })?;
224        // Safeguard check that all events are emitted from requested contract addresses
225        logs.into_iter().map(
226            |log| {
227                if !addresses.contains(&log.address()) {
228                    return Err(BridgeError::ProviderError(format!("Provider returns logs from different contract address (expected: {:?}): {:?}", addresses, log)));
229                }
230                Ok(RawEthLog {
231                block_number: log.block_number.ok_or(BridgeError::ProviderError("Provider returns log without block_number".into()))?,
232                tx_hash: log.transaction_hash.ok_or(BridgeError::ProviderError("Provider returns log without transaction_hash".into()))?,
233                log,
234            })}
235        ).collect::<Result<Vec<_>, _>>()
236    }
237
238    /// This function converts a `Log` to `EthLog`, to make sure the `block_num`, `tx_hash` and `log_index_in_tx`
239    /// are available for downstream.
240    // It's frustratingly ugly because of the nulliability of many fields in `Log`.
241    async fn get_log_tx_details(&self, log: Log) -> BridgeResult<EthLog> {
242        let block_number = log.block_number.ok_or(BridgeError::ProviderError(
243            "Provider returns log without block_number".into(),
244        ))?;
245        let tx_hash = log.transaction_hash.ok_or(BridgeError::ProviderError(
246            "Provider returns log without transaction_hash".into(),
247        ))?;
248        // This is the log index in the block, rather than transaction.
249        let log_index = log.log_index.ok_or(BridgeError::ProviderError(
250            "Provider returns log without log_index".into(),
251        ))?;
252
253        // Now get the log's index in the transaction. There is `transaction_log_index` field in
254        // `Log`, but I never saw it populated.
255
256        let receipt = self
257            .provider
258            .get_transaction_receipt(tx_hash)
259            .await
260            .map_err(BridgeError::from)?
261            .ok_or(BridgeError::ProviderError(format!(
262                "Provide cannot find eth transaction for log: {:?})",
263                log
264            )))?;
265
266        let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError(
267            "Provider returns log without block_number".into(),
268        ))?;
269        if receipt_block_num != block_number {
270            return Err(BridgeError::ProviderError(format!(
271                "Provider returns receipt with different block number from log. Receipt: {:?}, Log: {:?}",
272                receipt, log
273            )));
274        }
275
276        // Find the log index in the transaction
277        let mut log_index_in_tx = None;
278        for (idx, receipt_log) in receipt.logs().iter().enumerate() {
279            // match log index (in the block)
280            if receipt_log.log_index == Some(log_index) {
281                // make sure the topics and data match
282                if receipt_log.topics() != log.topics() || receipt_log.data() != log.data() {
283                    return Err(BridgeError::ProviderError(format!(
284                        "Provider returns receipt with different log from log. Receipt: {:?}, Log: {:?}",
285                        receipt, log
286                    )));
287                }
288                log_index_in_tx = Some(idx);
289            }
290        }
291        let log_index_in_tx = log_index_in_tx.ok_or(BridgeError::ProviderError(format!(
292            "Couldn't find matching log: {:?} in transaction {}",
293            log, tx_hash
294        )))?;
295
296        Ok(EthLog {
297            block_number,
298            tx_hash,
299            log_index_in_tx: log_index_in_tx as u16,
300            log,
301        })
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use alloy::rpc::types::TransactionReceipt;
308    use prometheus::Registry;
309
310    use super::*;
311    use crate::test_utils::{
312        get_test_log_and_action, make_transaction_receipt, mock_last_finalized_block,
313    };
314
315    #[tokio::test]
316    async fn test_get_finalized_bridge_action_maybe() {
317        telemetry_subscribers::init_for_testing();
318        let registry = Registry::new();
319        mysten_metrics::init_metrics(&registry);
320        let mock_service = EthMockService::new();
321        mock_last_finalized_block(&mock_service, 777);
322
323        let client = EthClient::new_mocked(
324            mock_service.clone(),
325            HashSet::from_iter(vec![EthAddress::default()]),
326        );
327        let result = client.get_last_finalized_block_id().await.unwrap();
328        assert_eq!(result, 777);
329
330        let eth_tx_hash = TxHash::random();
331        let log = Log {
332            transaction_hash: Some(eth_tx_hash),
333            block_number: Some(778),
334            ..Default::default()
335        };
336        let (good_log, bridge_action) =
337            get_test_log_and_action(EthAddress::default(), eth_tx_hash, 1);
338        // Mocks `eth_getTransactionReceipt` to return `log` and `good_log` in order
339        mock_service
340            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
341                "eth_getTransactionReceipt",
342                [log.transaction_hash.unwrap()],
343                make_transaction_receipt(
344                    EthAddress::default(),
345                    log.block_number,
346                    vec![log.clone(), good_log],
347                ),
348            )
349            .unwrap();
350
351        let error = client
352            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
353            .await
354            .unwrap_err();
355        match error {
356            BridgeError::TxNotFinalized => {}
357            _ => panic!("expected TxNotFinalized"),
358        };
359
360        // 778 is now finalized
361        mock_last_finalized_block(&mock_service, 778);
362
363        mock_service
364            .add_response(
365                "eth_getBlockByNumber",
366                (format!("0x{:x}", 778), false),
367                make_transaction_receipt(EthAddress::default(), Some(778), vec![log.clone()]),
368            )
369            .unwrap();
370
371        let error = client
372            .get_finalized_bridge_action_maybe(eth_tx_hash, 2)
373            .await
374            .unwrap_err();
375        // Receipt only has 2 logs
376        match error {
377            BridgeError::NoBridgeEventsInTxPosition => {}
378            _ => panic!("expected NoBridgeEventsInTxPosition"),
379        };
380
381        let error = client
382            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
383            .await
384            .unwrap_err();
385        // Same, `log` is not a BridgeEvent
386        match error {
387            BridgeError::NoBridgeEventsInTxPosition => {}
388            _ => panic!("expected NoBridgeEventsInTxPosition"),
389        };
390
391        let action = client
392            .get_finalized_bridge_action_maybe(eth_tx_hash, 1)
393            .await
394            .unwrap();
395        assert_eq!(action, bridge_action);
396    }
397
398    #[tokio::test]
399    async fn test_get_finalized_bridge_action_maybe_unrecognized_contract() {
400        telemetry_subscribers::init_for_testing();
401        let registry = Registry::new();
402        mysten_metrics::init_metrics(&registry);
403        let mock_service = EthMockService::new();
404        mock_last_finalized_block(&mock_service, 777);
405
406        let client = EthClient::new_mocked(
407            mock_service.clone(),
408            HashSet::from_iter(vec![
409                EthAddress::repeat_byte(5),
410                EthAddress::repeat_byte(6),
411                EthAddress::repeat_byte(7),
412            ]),
413        );
414        let result = client.get_last_finalized_block_id().await.unwrap();
415        assert_eq!(result, 777);
416
417        let eth_tx_hash = TxHash::random();
418        // Event emitted from a different contract address
419        let (log, _bridge_action) =
420            get_test_log_and_action(EthAddress::repeat_byte(4), eth_tx_hash, 0);
421        mock_service
422            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
423                "eth_getTransactionReceipt",
424                [log.transaction_hash.unwrap()],
425                make_transaction_receipt(
426                    EthAddress::default(),
427                    log.block_number,
428                    vec![log.clone()],
429                ),
430            )
431            .unwrap();
432        mock_service
433            .add_response(
434                "eth_getBlockByNumber",
435                (format!("0x{:x}", 777), false),
436                make_transaction_receipt(EthAddress::default(), Some(777), vec![log.clone()]),
437            )
438            .unwrap();
439
440        let error = client
441            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
442            .await
443            .unwrap_err();
444        match error {
445            BridgeError::BridgeEventInUnrecognizedEthContract => {}
446            _ => panic!("expected TxNotFinalized"),
447        };
448
449        // Ok if emitted from the right contract
450        let (log, bridge_action) =
451            get_test_log_and_action(EthAddress::repeat_byte(6), eth_tx_hash, 0);
452        mock_service
453            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
454                "eth_getTransactionReceipt",
455                [log.transaction_hash.unwrap()],
456                make_transaction_receipt(
457                    EthAddress::default(),
458                    log.block_number,
459                    vec![log.clone()],
460                ),
461            )
462            .unwrap();
463        mock_service
464            .add_response(
465                "eth_getBlockByNumber",
466                (format!("0x{:x}", 777), false),
467                make_transaction_receipt(EthAddress::default(), Some(777), vec![log.clone()]),
468            )
469            .unwrap();
470        let action = client
471            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
472            .await
473            .unwrap();
474        assert_eq!(action, bridge_action);
475    }
476}