sui_bridge/
eth_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The EthSyncer module is responsible for synchronizing Events emitted on Ethereum blockchain from
5//! concerned contracts. Each contract is associated with a start block number, and the syncer will
6//! only query from that block number onwards. The syncer also keeps track of the last finalized
7//! block on Ethereum and will only query for events up to that block number.
8
9use crate::error::BridgeResult;
10use crate::eth_client::EthClient;
11use crate::metrics::BridgeMetrics;
12use crate::retry_with_max_elapsed_time;
13use crate::types::EthLog;
14use alloy::primitives::Address as EthAddress;
15use mysten_metrics::spawn_logged_monitored_task;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tokio::task::JoinHandle;
20use tokio::time::{self, Duration, Instant};
21use tracing::error;
22
23const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
24const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
25const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(5);
26
27pub struct EthSyncer {
28    eth_client: Arc<EthClient>,
29    contract_addresses: EthTargetAddresses,
30}
31
32/// Map from contract address to their start block.
33pub type EthTargetAddresses = HashMap<EthAddress, u64>;
34
35#[allow(clippy::new_without_default)]
36impl EthSyncer {
37    pub fn new(eth_client: Arc<EthClient>, contract_addresses: EthTargetAddresses) -> Self {
38        Self {
39            eth_client,
40            contract_addresses,
41        }
42    }
43
44    pub async fn run(
45        self,
46        metrics: Arc<BridgeMetrics>,
47    ) -> BridgeResult<(
48        Vec<JoinHandle<()>>,
49        mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
50        watch::Receiver<u64>,
51    )> {
52        let (eth_evnets_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
53            ETH_EVENTS_CHANNEL_SIZE,
54            &mysten_metrics::get_metrics()
55                .unwrap()
56                .channel_inflight
57                .with_label_values(&["eth_events_queue"]),
58        );
59        let last_finalized_block = self.eth_client.get_last_finalized_block_id().await?;
60        let (last_finalized_block_tx, last_finalized_block_rx) =
61            watch::channel(last_finalized_block);
62        let mut task_handles = vec![];
63        let eth_client_clone = self.eth_client.clone();
64        let metrics_clone = metrics.clone();
65        task_handles.push(spawn_logged_monitored_task!(
66            Self::run_finalized_block_refresh_task(
67                last_finalized_block_tx,
68                eth_client_clone,
69                metrics_clone
70            )
71        ));
72        for (contract_address, start_block) in self.contract_addresses {
73            let eth_evnets_tx_clone = eth_evnets_tx.clone();
74            let last_finalized_block_rx_clone = last_finalized_block_rx.clone();
75            let eth_client_clone = self.eth_client.clone();
76            let metrics_clone = metrics.clone();
77            task_handles.push(spawn_logged_monitored_task!(
78                Self::run_event_listening_task(
79                    contract_address,
80                    start_block,
81                    last_finalized_block_rx_clone,
82                    eth_evnets_tx_clone,
83                    eth_client_clone,
84                    metrics_clone,
85                )
86            ));
87        }
88        Ok((task_handles, eth_events_rx, last_finalized_block_rx))
89    }
90
91    async fn run_finalized_block_refresh_task(
92        last_finalized_block_sender: watch::Sender<u64>,
93        eth_client: Arc<EthClient>,
94        metrics: Arc<BridgeMetrics>,
95    ) {
96        tracing::info!("Starting finalized block refresh task.");
97        let mut last_block_number = 0;
98        let mut interval = time::interval(FINALIZED_BLOCK_QUERY_INTERVAL);
99        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
100        loop {
101            interval.tick().await;
102            // TODO: allow to pass custom initial interval
103            let Ok(Ok(new_value)) = retry_with_max_elapsed_time!(
104                eth_client.get_last_finalized_block_id(),
105                time::Duration::from_secs(600)
106            ) else {
107                error!("Failed to get last finalized block from eth client after retry");
108                continue;
109            };
110            tracing::debug!("Last finalized block: {}", new_value);
111            metrics.last_finalized_eth_block.set(new_value as i64);
112
113            if new_value > last_block_number {
114                last_finalized_block_sender
115                    .send(new_value)
116                    .expect("last_finalized_block channel receiver is closed");
117                tracing::info!("Observed new finalized eth block: {}", new_value);
118                last_block_number = new_value;
119            }
120        }
121    }
122
123    // TODO: define a type for block number for readability
124    // TODO: add a metrics for current start block
125    async fn run_event_listening_task(
126        contract_address: EthAddress,
127        mut start_block: u64,
128        mut last_finalized_block_receiver: watch::Receiver<u64>,
129        events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
130        eth_client: Arc<EthClient>,
131        metrics: Arc<BridgeMetrics>,
132    ) {
133        tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
134        let contract_address_str = contract_address.to_string();
135        let mut more_blocks = false;
136        loop {
137            // If no more known blocks, wait for the next finalized block.
138            if !more_blocks {
139                last_finalized_block_receiver
140                    .changed()
141                    .await
142                    .expect("last_finalized_block channel sender is closed");
143            }
144            let new_finalized_block = *last_finalized_block_receiver.borrow();
145            if new_finalized_block < start_block {
146                tracing::info!(
147                    contract_address=?contract_address,
148                    "New finalized block {} is smaller than start block {}, ignore",
149                    new_finalized_block,
150                    start_block,
151                );
152                continue;
153            }
154            // Each query does at most ETH_LOG_QUERY_MAX_BLOCK_RANGE blocks.
155            let end_block = std::cmp::min(
156                start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
157                new_finalized_block,
158            );
159            more_blocks = end_block < new_finalized_block;
160            let timer = Instant::now();
161            let Ok(Ok(events)) = retry_with_max_elapsed_time!(
162                eth_client.get_events_in_range(contract_address, start_block, end_block),
163                Duration::from_secs(600)
164            ) else {
165                error!("Failed to get events from eth client after retry");
166                continue;
167            };
168            tracing::debug!(
169                ?contract_address,
170                start_block,
171                end_block,
172                "Querying eth events took {:?}",
173                timer.elapsed()
174            );
175            let len = events.len();
176            let last_block = events.last().map(|e| e.block_number);
177
178            // Note 1: we always events to the channel even when it is empty. This is because of
179            // how `eth_getLogs` api is designed - we want cursor to move forward continuously.
180
181            // Note 2: it's extremely critical to make sure the Logs we send via this channel
182            // are complete per block height. Namely, we should never send a partial list
183            // of events for a block. Otherwise, we may end up missing events.
184            events_sender
185                .send((contract_address, end_block, events))
186                .await
187                .expect("All Eth event channel receivers are closed");
188            if len != 0 {
189                tracing::info!(
190                    ?contract_address,
191                    start_block,
192                    end_block,
193                    "Observed {len} new Eth events",
194                );
195            }
196            metrics
197                .last_synced_eth_blocks
198                .with_label_values(&[&contract_address_str])
199                .set(last_block.unwrap_or(end_block) as i64);
200            start_block = end_block + 1;
201        }
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::{
209        eth_mock_provider::EthMockService,
210        test_utils::{mock_get_logs, mock_last_finalized_block},
211    };
212    use alloy::{primitives::TxHash, rpc::types::Log};
213    use prometheus::Registry;
214    use std::{collections::HashSet, str::FromStr};
215    use tokio::sync::mpsc::error::TryRecvError;
216
217    #[tokio::test]
218    async fn test_last_finalized_block() -> anyhow::Result<()> {
219        telemetry_subscribers::init_for_testing();
220        let registry = Registry::new();
221        mysten_metrics::init_metrics(&registry);
222        let mock_service = EthMockService::new();
223        mock_last_finalized_block(&mock_service, 777);
224        let client = EthClient::new_mocked(
225            mock_service.clone(),
226            HashSet::from_iter(vec![EthAddress::default()]),
227        );
228        let result = client.get_last_finalized_block_id().await.unwrap();
229        assert_eq!(result, 777);
230
231        let addresses = HashMap::from_iter(vec![(EthAddress::default(), 100)]);
232        let log = Log {
233            transaction_hash: Some(TxHash::random()),
234            block_number: Some(777),
235            log_index: Some(3),
236            ..Default::default()
237        };
238        let eth_log = EthLog {
239            block_number: 777,
240            tx_hash: log.transaction_hash.unwrap(),
241            log_index_in_tx: 0,
242            log: log.clone(),
243        };
244        mock_get_logs(
245            &mock_service,
246            EthAddress::default(),
247            100,
248            777,
249            vec![log.clone()],
250        );
251        let (_handles, mut logs_rx, mut finalized_block_rx) =
252            EthSyncer::new(Arc::new(client), addresses)
253                .run(Arc::new(BridgeMetrics::new_for_testing()))
254                .await
255                .unwrap();
256
257        // The latest finalized block stays at 777, event listener should not query again.
258        finalized_block_rx.changed().await.unwrap();
259        assert_eq!(*finalized_block_rx.borrow(), 777);
260        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
261        assert_eq!(contract_address, EthAddress::default());
262        assert_eq!(end_block, 777);
263        assert_eq!(received_logs, vec![eth_log.clone()]);
264        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
265
266        mock_get_logs(
267            &mock_service,
268            EthAddress::default(),
269            778,
270            888,
271            vec![log.clone()],
272        );
273        // The latest finalized block is updated to 888, event listener should query again.
274        mock_last_finalized_block(&mock_service, 888);
275        finalized_block_rx.changed().await.unwrap();
276        assert_eq!(*finalized_block_rx.borrow(), 888);
277        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
278        assert_eq!(contract_address, EthAddress::default());
279        assert_eq!(end_block, 888);
280        assert_eq!(received_logs, vec![eth_log]);
281        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
282
283        Ok(())
284    }
285
286    #[tokio::test]
287    async fn test_multiple_addresses() -> anyhow::Result<()> {
288        telemetry_subscribers::init_for_testing();
289        let registry = Registry::new();
290        mysten_metrics::init_metrics(&registry);
291
292        let mock_service = EthMockService::new();
293        mock_last_finalized_block(&mock_service, 198);
294
295        let another_address =
296            EthAddress::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap();
297        let client = EthClient::new_mocked(
298            mock_service.clone(),
299            HashSet::from_iter(vec![another_address]),
300        );
301
302        let addresses =
303            HashMap::from_iter(vec![(EthAddress::default(), 100), (another_address, 200)]);
304
305        let log1 = Log {
306            transaction_hash: Some(TxHash::random()),
307            block_number: Some(101),
308            log_index: Some(5),
309            ..Default::default()
310        };
311        let eth_log1 = EthLog {
312            block_number: log1.block_number.unwrap(),
313            tx_hash: log1.transaction_hash.unwrap(),
314            log_index_in_tx: 0,
315            log: log1.clone(),
316        };
317        mock_get_logs(
318            &mock_service,
319            EthAddress::default(),
320            100,
321            198,
322            vec![log1.clone()],
323        );
324        let log2 = Log {
325            inner: alloy::primitives::Log {
326                address: another_address,
327                ..Default::default()
328            },
329            transaction_hash: Some(TxHash::random()),
330            block_number: Some(201),
331            log_index: Some(6),
332            ..Default::default()
333        };
334        // Mock logs for another_address although it shouldn't be queried. We don't expect to
335        // see log2 in the logs channel later on.
336        mock_get_logs(&mock_service, another_address, 200, 198, vec![log2.clone()]);
337
338        let (_handles, mut logs_rx, mut finalized_block_rx) =
339            EthSyncer::new(Arc::new(client), addresses)
340                .run(Arc::new(BridgeMetrics::new_for_testing()))
341                .await
342                .unwrap();
343
344        // The latest finalized block stays at 198.
345        finalized_block_rx.changed().await.unwrap();
346        assert_eq!(*finalized_block_rx.borrow(), 198);
347        let (_contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
348        assert_eq!(end_block, 198);
349        assert_eq!(received_logs, vec![eth_log1.clone()]);
350        // log2 should not be received as another_address's start block is 200.
351        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
352
353        let log1 = Log {
354            block_number: Some(200),
355            transaction_hash: Some(TxHash::random()),
356            log_index: Some(7),
357            ..Default::default()
358        };
359        let eth_log1 = EthLog {
360            block_number: log1.block_number.unwrap(),
361            tx_hash: log1.transaction_hash.unwrap(),
362            log_index_in_tx: 0,
363            log: log1.clone(),
364        };
365        mock_get_logs(
366            &mock_service,
367            EthAddress::default(),
368            199,
369            400,
370            vec![log1.clone()],
371        );
372        let log2 = Log {
373            inner: alloy::primitives::Log {
374                address: another_address,
375                ..Default::default()
376            },
377            transaction_hash: Some(TxHash::random()),
378            block_number: Some(201),
379            log_index: Some(9),
380            ..Default::default()
381        };
382        let eth_log2 = EthLog {
383            block_number: log2.block_number.unwrap(),
384            tx_hash: log2.transaction_hash.unwrap(),
385            log_index_in_tx: 0,
386            log: log2.clone(),
387        };
388        mock_get_logs(&mock_service, another_address, 200, 400, vec![log2.clone()]);
389        mock_last_finalized_block(&mock_service, 400);
390
391        finalized_block_rx.changed().await.unwrap();
392        assert_eq!(*finalized_block_rx.borrow(), 400);
393        let mut logs_set = HashSet::new();
394        logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
395            logs_set.insert(format!("{:?}", log));
396        });
397        logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
398            logs_set.insert(format!("{:?}", log));
399        });
400        assert_eq!(
401            logs_set,
402            HashSet::from_iter(vec![format!("{:?}", eth_log1), format!("{:?}", eth_log2)])
403        );
404        // No more finalized block change, no more logs.
405        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
406        Ok(())
407    }
408
409    /// Test that the syncer will query for logs in multiple queries if the range is too big.
410    #[tokio::test]
411    async fn test_paginated_eth_log_query() -> anyhow::Result<()> {
412        telemetry_subscribers::init_for_testing();
413        let registry = Registry::new();
414        mysten_metrics::init_metrics(&registry);
415        let mock_service = EthMockService::new();
416        let start_block = 100;
417        // range too big, we need two queries
418        let last_finalized_block = start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE + 1;
419        mock_last_finalized_block(&mock_service, last_finalized_block);
420        let client = EthClient::new_mocked(
421            mock_service.clone(),
422            HashSet::from_iter(vec![EthAddress::default()]),
423        );
424        let result = client.get_last_finalized_block_id().await.unwrap();
425        assert_eq!(result, last_finalized_block);
426
427        let addresses = HashMap::from_iter(vec![(EthAddress::default(), start_block)]);
428        let log = Log {
429            transaction_hash: Some(TxHash::random()),
430            block_number: Some(start_block),
431            log_index: Some(3),
432            ..Default::default()
433        };
434        let log2 = Log {
435            transaction_hash: Some(TxHash::random()),
436            block_number: Some(last_finalized_block),
437            log_index: Some(3),
438            ..Default::default()
439        };
440        let eth_log = EthLog {
441            block_number: start_block,
442            tx_hash: log.transaction_hash.unwrap(),
443            log_index_in_tx: 0,
444            log: log.clone(),
445        };
446        let eth_log2 = EthLog {
447            block_number: last_finalized_block,
448            tx_hash: log2.transaction_hash.unwrap(),
449            log_index_in_tx: 0,
450            log: log2.clone(),
451        };
452        // First query handles [start, start + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1]
453        mock_get_logs(
454            &mock_service,
455            EthAddress::default(),
456            start_block,
457            start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
458            vec![log.clone()],
459        );
460        // Second query handles [start + ETH_LOG_QUERY_MAX_BLOCK_RANGE, last_finalized_block]
461        mock_get_logs(
462            &mock_service,
463            EthAddress::default(),
464            start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE,
465            last_finalized_block,
466            vec![log2.clone()],
467        );
468
469        let (_handles, mut logs_rx, mut finalized_block_rx) =
470            EthSyncer::new(Arc::new(client), addresses)
471                .run(Arc::new(BridgeMetrics::new_for_testing()))
472                .await
473                .unwrap();
474
475        finalized_block_rx.changed().await.unwrap();
476        assert_eq!(*finalized_block_rx.borrow(), last_finalized_block);
477        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
478        assert_eq!(contract_address, EthAddress::default());
479        assert_eq!(end_block, start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1);
480        assert_eq!(received_logs, vec![eth_log.clone()]);
481        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
482        assert_eq!(contract_address, EthAddress::default());
483        assert_eq!(end_block, last_finalized_block);
484        assert_eq!(received_logs, vec![eth_log2.clone()]);
485        Ok(())
486    }
487}