sui_bridge/
eth_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The EthSyncer module is responsible for synchronizing Events emitted on Ethereum blockchain from
5//! concerned contracts. Each contract is associated with a start block number, and the syncer will
6//! only query from that block number onwards. The syncer also keeps track of the last finalized
7//! block on Ethereum and will only query for events up to that block number.
8
9use crate::error::BridgeResult;
10use crate::eth_client::EthClient;
11use crate::metrics::BridgeMetrics;
12use crate::retry_with_max_elapsed_time;
13use crate::types::EthLog;
14use alloy::primitives::Address as EthAddress;
15use mysten_metrics::spawn_logged_monitored_task;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tokio::task::JoinHandle;
20use tokio::time::{self, Duration, Instant};
21use tracing::{error, warn};
22
23const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
24const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
25const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(5);
26
27pub struct EthSyncer {
28    eth_client: Arc<EthClient>,
29    contract_addresses: EthTargetAddresses,
30}
31
32/// Map from contract address to their start block.
33pub type EthTargetAddresses = HashMap<EthAddress, u64>;
34
35#[allow(clippy::new_without_default)]
36impl EthSyncer {
37    pub fn new(eth_client: Arc<EthClient>, contract_addresses: EthTargetAddresses) -> Self {
38        Self {
39            eth_client,
40            contract_addresses,
41        }
42    }
43
44    pub async fn run(
45        self,
46        metrics: Arc<BridgeMetrics>,
47    ) -> BridgeResult<(
48        Vec<JoinHandle<()>>,
49        mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
50        watch::Receiver<u64>,
51    )> {
52        let (eth_evnets_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
53            ETH_EVENTS_CHANNEL_SIZE,
54            &mysten_metrics::get_metrics()
55                .unwrap()
56                .channel_inflight
57                .with_label_values(&["eth_events_queue"]),
58        );
59        let last_finalized_block = self.eth_client.get_last_finalized_block_id().await?;
60        let (last_finalized_block_tx, last_finalized_block_rx) =
61            watch::channel(last_finalized_block);
62        let mut task_handles = vec![];
63        let eth_client_clone = self.eth_client.clone();
64        let metrics_clone = metrics.clone();
65        task_handles.push(spawn_logged_monitored_task!(
66            Self::run_finalized_block_refresh_task(
67                last_finalized_block_tx,
68                eth_client_clone,
69                metrics_clone
70            )
71        ));
72        for (contract_address, start_block) in self.contract_addresses {
73            let eth_evnets_tx_clone = eth_evnets_tx.clone();
74            let last_finalized_block_rx_clone = last_finalized_block_rx.clone();
75            let eth_client_clone = self.eth_client.clone();
76            let metrics_clone = metrics.clone();
77            task_handles.push(spawn_logged_monitored_task!(
78                Self::run_event_listening_task(
79                    contract_address,
80                    start_block,
81                    last_finalized_block_rx_clone,
82                    eth_evnets_tx_clone,
83                    eth_client_clone,
84                    metrics_clone,
85                )
86            ));
87        }
88        Ok((task_handles, eth_events_rx, last_finalized_block_rx))
89    }
90
91    async fn run_finalized_block_refresh_task(
92        last_finalized_block_sender: watch::Sender<u64>,
93        eth_client: Arc<EthClient>,
94        metrics: Arc<BridgeMetrics>,
95    ) {
96        tracing::info!("Starting finalized block refresh task.");
97        let mut last_block_number = 0;
98        let mut interval = time::interval(FINALIZED_BLOCK_QUERY_INTERVAL);
99        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
100        loop {
101            interval.tick().await;
102            // TODO: allow to pass custom initial interval
103            let Ok(Ok(new_value)) = retry_with_max_elapsed_time!(
104                eth_client.get_last_finalized_block_id(),
105                time::Duration::from_secs(600)
106            ) else {
107                error!("Failed to get last finalized block from eth client after retry");
108                continue;
109            };
110            tracing::debug!("Last finalized block: {}", new_value);
111            metrics.last_finalized_eth_block.set(new_value as i64);
112
113            if new_value > last_block_number {
114                last_finalized_block_sender
115                    .send(new_value)
116                    .expect("last_finalized_block channel receiver is closed");
117                tracing::info!("Observed new finalized eth block: {}", new_value);
118                last_block_number = new_value;
119            }
120        }
121    }
122
123    // TODO: define a type for block number for readability
124    // TODO: add a metrics for current start block
125    async fn run_event_listening_task(
126        contract_address: EthAddress,
127        mut start_block: u64,
128        mut last_finalized_block_receiver: watch::Receiver<u64>,
129        events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
130        eth_client: Arc<EthClient>,
131        metrics: Arc<BridgeMetrics>,
132    ) {
133        tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
134        let contract_address_str = contract_address.to_string();
135        let mut more_blocks = false;
136        // Tracks the current query window; shrinks when RPC returns -32005.
137        let mut query_range = ETH_LOG_QUERY_MAX_BLOCK_RANGE;
138        loop {
139            // If no more known blocks, wait for the next finalized block.
140            if !more_blocks {
141                last_finalized_block_receiver
142                    .changed()
143                    .await
144                    .expect("last_finalized_block channel sender is closed");
145            }
146            let new_finalized_block = *last_finalized_block_receiver.borrow();
147            if new_finalized_block < start_block {
148                tracing::info!(
149                    contract_address=?contract_address,
150                    "New finalized block {} is smaller than start block {}, ignore",
151                    new_finalized_block,
152                    start_block,
153                );
154                continue;
155            }
156            // Each query does at most `query_range` blocks (may be smaller than
157            // ETH_LOG_QUERY_MAX_BLOCK_RANGE after a -32005 shrink).
158            let end_block = std::cmp::min(start_block + query_range - 1, new_finalized_block);
159            more_blocks = end_block < new_finalized_block;
160            let timer = Instant::now();
161            let events = match eth_client
162                .get_events_in_range(contract_address, start_block, end_block)
163                .await
164            {
165                Ok(events) => events,
166                Err(e) => {
167                    let err_str = format!("{:?}", e);
168                    if err_str.contains("query returned more than")
169                        || err_str.contains("-32005")
170                        || err_str.contains("32005")
171                    {
172                        // RPC limit exceeded (-32005): halve the range and retry immediately.
173                        let new_range = (query_range / 2).max(1);
174                        if new_range == query_range {
175                            error!(
176                                contract_address=?contract_address,
177                                "Block query range is already 1 but RPC still returns -32005 \
178                                 for block {}. Retrying with standard backoff.",
179                                start_block
180                            );
181                        } else {
182                            warn!(
183                                contract_address=?contract_address,
184                                "RPC returned -32005 (too many results) for block range {}-{} \
185                                 (window={}). Shrinking window to {} blocks and retrying.",
186                                start_block,
187                                end_block,
188                                query_range,
189                                new_range,
190                            );
191                            query_range = new_range;
192                        }
193                        // Retry immediately with the new (smaller) range; more_blocks stays true
194                        // so we don't wait for a new finalized block notification.
195                        more_blocks = true;
196                        continue;
197                    }
198                    // Not a range-overflow error — use standard backoff retry.
199                    let Ok(Ok(events)) = retry_with_max_elapsed_time!(
200                        eth_client.get_events_in_range(contract_address, start_block, end_block),
201                        Duration::from_secs(600)
202                    ) else {
203                        error!("Failed to get events from eth client after retry");
204                        continue;
205                    };
206                    events
207                }
208            };
209            tracing::debug!(
210                ?contract_address,
211                start_block,
212                end_block,
213                "Querying eth events took {:?}",
214                timer.elapsed()
215            );
216            let len = events.len();
217            let last_block = events.last().map(|e| e.block_number);
218
219            // Note 1: we always events to the channel even when it is empty. This is because of
220            // how `eth_getLogs` api is designed - we want cursor to move forward continuously.
221
222            // Note 2: it's extremely critical to make sure the Logs we send via this channel
223            // are complete per block height. Namely, we should never send a partial list
224            // of events for a block. Otherwise, we may end up missing events.
225            events_sender
226                .send((contract_address, end_block, events))
227                .await
228                .expect("All Eth event channel receivers are closed");
229            if len != 0 {
230                tracing::info!(
231                    ?contract_address,
232                    start_block,
233                    end_block,
234                    "Observed {len} new Eth events",
235                );
236            }
237            metrics
238                .last_synced_eth_blocks
239                .with_label_values(&[&contract_address_str])
240                .set(last_block.unwrap_or(end_block) as i64);
241            start_block = end_block + 1;
242        }
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use crate::{
250        eth_mock_provider::EthMockService,
251        test_utils::{mock_get_logs, mock_last_finalized_block},
252    };
253    use alloy::{primitives::TxHash, rpc::types::Log};
254    use prometheus::Registry;
255    use std::{collections::HashSet, str::FromStr};
256    use tokio::sync::mpsc::error::TryRecvError;
257
258    #[tokio::test]
259    async fn test_last_finalized_block() -> anyhow::Result<()> {
260        telemetry_subscribers::init_for_testing();
261        let registry = Registry::new();
262        mysten_metrics::init_metrics(&registry);
263        let mock_service = EthMockService::new();
264        mock_last_finalized_block(&mock_service, 777);
265        let client = EthClient::new_mocked(
266            mock_service.clone(),
267            HashSet::from_iter(vec![EthAddress::default()]),
268        );
269        let result = client.get_last_finalized_block_id().await.unwrap();
270        assert_eq!(result, 777);
271
272        let addresses = HashMap::from_iter(vec![(EthAddress::default(), 100)]);
273        let log = Log {
274            transaction_hash: Some(TxHash::random()),
275            block_number: Some(777),
276            log_index: Some(3),
277            ..Default::default()
278        };
279        let eth_log = EthLog {
280            block_number: 777,
281            tx_hash: log.transaction_hash.unwrap(),
282            log_index_in_tx: 0,
283            log: log.clone(),
284        };
285        mock_get_logs(
286            &mock_service,
287            EthAddress::default(),
288            100,
289            777,
290            vec![log.clone()],
291        );
292        let (_handles, mut logs_rx, mut finalized_block_rx) =
293            EthSyncer::new(Arc::new(client), addresses)
294                .run(Arc::new(BridgeMetrics::new_for_testing()))
295                .await
296                .unwrap();
297
298        // The latest finalized block stays at 777, event listener should not query again.
299        finalized_block_rx.changed().await.unwrap();
300        assert_eq!(*finalized_block_rx.borrow(), 777);
301        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
302        assert_eq!(contract_address, EthAddress::default());
303        assert_eq!(end_block, 777);
304        assert_eq!(received_logs, vec![eth_log.clone()]);
305        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
306
307        mock_get_logs(
308            &mock_service,
309            EthAddress::default(),
310            778,
311            888,
312            vec![log.clone()],
313        );
314        // The latest finalized block is updated to 888, event listener should query again.
315        mock_last_finalized_block(&mock_service, 888);
316        finalized_block_rx.changed().await.unwrap();
317        assert_eq!(*finalized_block_rx.borrow(), 888);
318        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
319        assert_eq!(contract_address, EthAddress::default());
320        assert_eq!(end_block, 888);
321        assert_eq!(received_logs, vec![eth_log]);
322        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
323
324        Ok(())
325    }
326
327    #[tokio::test]
328    async fn test_multiple_addresses() -> anyhow::Result<()> {
329        telemetry_subscribers::init_for_testing();
330        let registry = Registry::new();
331        mysten_metrics::init_metrics(&registry);
332
333        let mock_service = EthMockService::new();
334        mock_last_finalized_block(&mock_service, 198);
335
336        let another_address =
337            EthAddress::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap();
338        let client = EthClient::new_mocked(
339            mock_service.clone(),
340            HashSet::from_iter(vec![another_address]),
341        );
342
343        let addresses =
344            HashMap::from_iter(vec![(EthAddress::default(), 100), (another_address, 200)]);
345
346        let log1 = Log {
347            transaction_hash: Some(TxHash::random()),
348            block_number: Some(101),
349            log_index: Some(5),
350            ..Default::default()
351        };
352        let eth_log1 = EthLog {
353            block_number: log1.block_number.unwrap(),
354            tx_hash: log1.transaction_hash.unwrap(),
355            log_index_in_tx: 0,
356            log: log1.clone(),
357        };
358        mock_get_logs(
359            &mock_service,
360            EthAddress::default(),
361            100,
362            198,
363            vec![log1.clone()],
364        );
365        let log2 = Log {
366            inner: alloy::primitives::Log {
367                address: another_address,
368                ..Default::default()
369            },
370            transaction_hash: Some(TxHash::random()),
371            block_number: Some(201),
372            log_index: Some(6),
373            ..Default::default()
374        };
375        // Mock logs for another_address although it shouldn't be queried. We don't expect to
376        // see log2 in the logs channel later on.
377        mock_get_logs(&mock_service, another_address, 200, 198, vec![log2.clone()]);
378
379        let (_handles, mut logs_rx, mut finalized_block_rx) =
380            EthSyncer::new(Arc::new(client), addresses)
381                .run(Arc::new(BridgeMetrics::new_for_testing()))
382                .await
383                .unwrap();
384
385        // The latest finalized block stays at 198.
386        finalized_block_rx.changed().await.unwrap();
387        assert_eq!(*finalized_block_rx.borrow(), 198);
388        let (_contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
389        assert_eq!(end_block, 198);
390        assert_eq!(received_logs, vec![eth_log1.clone()]);
391        // log2 should not be received as another_address's start block is 200.
392        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
393
394        let log1 = Log {
395            block_number: Some(200),
396            transaction_hash: Some(TxHash::random()),
397            log_index: Some(7),
398            ..Default::default()
399        };
400        let eth_log1 = EthLog {
401            block_number: log1.block_number.unwrap(),
402            tx_hash: log1.transaction_hash.unwrap(),
403            log_index_in_tx: 0,
404            log: log1.clone(),
405        };
406        mock_get_logs(
407            &mock_service,
408            EthAddress::default(),
409            199,
410            400,
411            vec![log1.clone()],
412        );
413        let log2 = Log {
414            inner: alloy::primitives::Log {
415                address: another_address,
416                ..Default::default()
417            },
418            transaction_hash: Some(TxHash::random()),
419            block_number: Some(201),
420            log_index: Some(9),
421            ..Default::default()
422        };
423        let eth_log2 = EthLog {
424            block_number: log2.block_number.unwrap(),
425            tx_hash: log2.transaction_hash.unwrap(),
426            log_index_in_tx: 0,
427            log: log2.clone(),
428        };
429        mock_get_logs(&mock_service, another_address, 200, 400, vec![log2.clone()]);
430        mock_last_finalized_block(&mock_service, 400);
431
432        finalized_block_rx.changed().await.unwrap();
433        assert_eq!(*finalized_block_rx.borrow(), 400);
434        let mut logs_set = HashSet::new();
435        logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
436            logs_set.insert(format!("{:?}", log));
437        });
438        logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
439            logs_set.insert(format!("{:?}", log));
440        });
441        assert_eq!(
442            logs_set,
443            HashSet::from_iter(vec![format!("{:?}", eth_log1), format!("{:?}", eth_log2)])
444        );
445        // No more finalized block change, no more logs.
446        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
447        Ok(())
448    }
449
450    /// Test that the syncer will query for logs in multiple queries if the range is too big.
451    #[tokio::test]
452    async fn test_paginated_eth_log_query() -> anyhow::Result<()> {
453        telemetry_subscribers::init_for_testing();
454        let registry = Registry::new();
455        mysten_metrics::init_metrics(&registry);
456        let mock_service = EthMockService::new();
457        let start_block = 100;
458        // range too big, we need two queries
459        let last_finalized_block = start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE + 1;
460        mock_last_finalized_block(&mock_service, last_finalized_block);
461        let client = EthClient::new_mocked(
462            mock_service.clone(),
463            HashSet::from_iter(vec![EthAddress::default()]),
464        );
465        let result = client.get_last_finalized_block_id().await.unwrap();
466        assert_eq!(result, last_finalized_block);
467
468        let addresses = HashMap::from_iter(vec![(EthAddress::default(), start_block)]);
469        let log = Log {
470            transaction_hash: Some(TxHash::random()),
471            block_number: Some(start_block),
472            log_index: Some(3),
473            ..Default::default()
474        };
475        let log2 = Log {
476            transaction_hash: Some(TxHash::random()),
477            block_number: Some(last_finalized_block),
478            log_index: Some(3),
479            ..Default::default()
480        };
481        let eth_log = EthLog {
482            block_number: start_block,
483            tx_hash: log.transaction_hash.unwrap(),
484            log_index_in_tx: 0,
485            log: log.clone(),
486        };
487        let eth_log2 = EthLog {
488            block_number: last_finalized_block,
489            tx_hash: log2.transaction_hash.unwrap(),
490            log_index_in_tx: 0,
491            log: log2.clone(),
492        };
493        // First query handles [start, start + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1]
494        mock_get_logs(
495            &mock_service,
496            EthAddress::default(),
497            start_block,
498            start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
499            vec![log.clone()],
500        );
501        // Second query handles [start + ETH_LOG_QUERY_MAX_BLOCK_RANGE, last_finalized_block]
502        mock_get_logs(
503            &mock_service,
504            EthAddress::default(),
505            start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE,
506            last_finalized_block,
507            vec![log2.clone()],
508        );
509
510        let (_handles, mut logs_rx, mut finalized_block_rx) =
511            EthSyncer::new(Arc::new(client), addresses)
512                .run(Arc::new(BridgeMetrics::new_for_testing()))
513                .await
514                .unwrap();
515
516        finalized_block_rx.changed().await.unwrap();
517        assert_eq!(*finalized_block_rx.borrow(), last_finalized_block);
518        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
519        assert_eq!(contract_address, EthAddress::default());
520        assert_eq!(end_block, start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1);
521        assert_eq!(received_logs, vec![eth_log.clone()]);
522        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
523        assert_eq!(contract_address, EthAddress::default());
524        assert_eq!(end_block, last_finalized_block);
525        assert_eq!(received_logs, vec![eth_log2.clone()]);
526        Ok(())
527    }
528}