1use crate::error::BridgeResult;
10use crate::eth_client::EthClient;
11use crate::metrics::BridgeMetrics;
12use crate::retry_with_max_elapsed_time;
13use crate::types::EthLog;
14use ethers::types::Address as EthAddress;
15use mysten_metrics::spawn_logged_monitored_task;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tokio::task::JoinHandle;
20use tokio::time::{self, Duration, Instant};
21use tracing::error;
22
23const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
24const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
25const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(5);
26
27pub struct EthSyncer<P> {
28    eth_client: Arc<EthClient<P>>,
29    contract_addresses: EthTargetAddresses,
30}
31
32pub type EthTargetAddresses = HashMap<EthAddress, u64>;
34
35#[allow(clippy::new_without_default)]
36impl<P> EthSyncer<P>
37where
38    P: ethers::providers::JsonRpcClient + 'static,
39{
40    pub fn new(eth_client: Arc<EthClient<P>>, contract_addresses: EthTargetAddresses) -> Self {
41        Self {
42            eth_client,
43            contract_addresses,
44        }
45    }
46
47    pub async fn run(
48        self,
49        metrics: Arc<BridgeMetrics>,
50    ) -> BridgeResult<(
51        Vec<JoinHandle<()>>,
52        mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
53        watch::Receiver<u64>,
54    )> {
55        let (eth_evnets_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
56            ETH_EVENTS_CHANNEL_SIZE,
57            &mysten_metrics::get_metrics()
58                .unwrap()
59                .channel_inflight
60                .with_label_values(&["eth_events_queue"]),
61        );
62        let last_finalized_block = self.eth_client.get_last_finalized_block_id().await?;
63        let (last_finalized_block_tx, last_finalized_block_rx) =
64            watch::channel(last_finalized_block);
65        let mut task_handles = vec![];
66        let eth_client_clone = self.eth_client.clone();
67        let metrics_clone = metrics.clone();
68        task_handles.push(spawn_logged_monitored_task!(
69            Self::run_finalized_block_refresh_task(
70                last_finalized_block_tx,
71                eth_client_clone,
72                metrics_clone
73            )
74        ));
75        for (contract_address, start_block) in self.contract_addresses {
76            let eth_evnets_tx_clone = eth_evnets_tx.clone();
77            let last_finalized_block_rx_clone = last_finalized_block_rx.clone();
78            let eth_client_clone = self.eth_client.clone();
79            let metrics_clone = metrics.clone();
80            task_handles.push(spawn_logged_monitored_task!(
81                Self::run_event_listening_task(
82                    contract_address,
83                    start_block,
84                    last_finalized_block_rx_clone,
85                    eth_evnets_tx_clone,
86                    eth_client_clone,
87                    metrics_clone,
88                )
89            ));
90        }
91        Ok((task_handles, eth_events_rx, last_finalized_block_rx))
92    }
93
94    async fn run_finalized_block_refresh_task(
95        last_finalized_block_sender: watch::Sender<u64>,
96        eth_client: Arc<EthClient<P>>,
97        metrics: Arc<BridgeMetrics>,
98    ) {
99        tracing::info!("Starting finalized block refresh task.");
100        let mut last_block_number = 0;
101        let mut interval = time::interval(FINALIZED_BLOCK_QUERY_INTERVAL);
102        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
103        loop {
104            interval.tick().await;
105            let Ok(Ok(new_value)) = retry_with_max_elapsed_time!(
107                eth_client.get_last_finalized_block_id(),
108                time::Duration::from_secs(600)
109            ) else {
110                error!("Failed to get last finalized block from eth client after retry");
111                continue;
112            };
113            tracing::debug!("Last finalized block: {}", new_value);
114            metrics.last_finalized_eth_block.set(new_value as i64);
115
116            if new_value > last_block_number {
117                last_finalized_block_sender
118                    .send(new_value)
119                    .expect("last_finalized_block channel receiver is closed");
120                tracing::info!("Observed new finalized eth block: {}", new_value);
121                last_block_number = new_value;
122            }
123        }
124    }
125
126    async fn run_event_listening_task(
129        contract_address: EthAddress,
130        mut start_block: u64,
131        mut last_finalized_block_receiver: watch::Receiver<u64>,
132        events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
133        eth_client: Arc<EthClient<P>>,
134        metrics: Arc<BridgeMetrics>,
135    ) {
136        tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
137        let contract_address_str = contract_address.to_string();
138        let mut more_blocks = false;
139        loop {
140            if !more_blocks {
142                last_finalized_block_receiver
143                    .changed()
144                    .await
145                    .expect("last_finalized_block channel sender is closed");
146            }
147            let new_finalized_block = *last_finalized_block_receiver.borrow();
148            if new_finalized_block < start_block {
149                tracing::info!(
150                    contract_address=?contract_address,
151                    "New finalized block {} is smaller than start block {}, ignore",
152                    new_finalized_block,
153                    start_block,
154                );
155                continue;
156            }
157            let end_block = std::cmp::min(
159                start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
160                new_finalized_block,
161            );
162            more_blocks = end_block < new_finalized_block;
163            let timer = Instant::now();
164            let Ok(Ok(events)) = retry_with_max_elapsed_time!(
165                eth_client.get_events_in_range(contract_address, start_block, end_block),
166                Duration::from_secs(600)
167            ) else {
168                error!("Failed to get events from eth client after retry");
169                continue;
170            };
171            tracing::debug!(
172                ?contract_address,
173                start_block,
174                end_block,
175                "Querying eth events took {:?}",
176                timer.elapsed()
177            );
178            let len = events.len();
179            let last_block = events.last().map(|e| e.block_number);
180
181            events_sender
188                .send((contract_address, end_block, events))
189                .await
190                .expect("All Eth event channel receivers are closed");
191            if len != 0 {
192                tracing::info!(
193                    ?contract_address,
194                    start_block,
195                    end_block,
196                    "Observed {len} new Eth events",
197                );
198            }
199            metrics
200                .last_synced_eth_blocks
201                .with_label_values(&[&contract_address_str])
202                .set(last_block.unwrap_or(end_block) as i64);
203            start_block = end_block + 1;
204        }
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use std::{collections::HashSet, str::FromStr};
211
212    use ethers::types::{Log, U64, U256};
213    use prometheus::Registry;
214    use tokio::sync::mpsc::error::TryRecvError;
215
216    use crate::{
217        eth_mock_provider::EthMockProvider,
218        test_utils::{mock_get_logs, mock_last_finalized_block},
219    };
220
221    use super::*;
222    use ethers::types::TxHash;
223
224    #[tokio::test]
225    async fn test_last_finalized_block() -> anyhow::Result<()> {
226        telemetry_subscribers::init_for_testing();
227        let registry = Registry::new();
228        mysten_metrics::init_metrics(®istry);
229        let mock_provider = EthMockProvider::new();
230        mock_last_finalized_block(&mock_provider, 777);
231        let client = EthClient::new_mocked(
232            mock_provider.clone(),
233            HashSet::from_iter(vec![EthAddress::zero()]),
234        );
235        let result = client.get_last_finalized_block_id().await.unwrap();
236        assert_eq!(result, 777);
237
238        let addresses = HashMap::from_iter(vec![(EthAddress::zero(), 100)]);
239        let log = Log {
240            address: EthAddress::zero(),
241            transaction_hash: Some(TxHash::random()),
242            block_number: Some(U64::from(777)),
243            log_index: Some(U256::from(3)),
244            ..Default::default()
245        };
246        let eth_log = EthLog {
247            block_number: 777,
248            tx_hash: log.transaction_hash.unwrap(),
249            log_index_in_tx: 0,
250            log: log.clone(),
251        };
252        mock_get_logs(
253            &mock_provider,
254            EthAddress::zero(),
255            100,
256            777,
257            vec![log.clone()],
258        );
259        let (_handles, mut logs_rx, mut finalized_block_rx) =
260            EthSyncer::new(Arc::new(client), addresses)
261                .run(Arc::new(BridgeMetrics::new_for_testing()))
262                .await
263                .unwrap();
264
265        finalized_block_rx.changed().await.unwrap();
267        assert_eq!(*finalized_block_rx.borrow(), 777);
268        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
269        assert_eq!(contract_address, EthAddress::zero());
270        assert_eq!(end_block, 777);
271        assert_eq!(received_logs, vec![eth_log.clone()]);
272        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
273
274        mock_get_logs(
275            &mock_provider,
276            EthAddress::zero(),
277            778,
278            888,
279            vec![log.clone()],
280        );
281        mock_last_finalized_block(&mock_provider, 888);
283        finalized_block_rx.changed().await.unwrap();
284        assert_eq!(*finalized_block_rx.borrow(), 888);
285        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
286        assert_eq!(contract_address, EthAddress::zero());
287        assert_eq!(end_block, 888);
288        assert_eq!(received_logs, vec![eth_log]);
289        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
290
291        Ok(())
292    }
293
294    #[tokio::test]
295    async fn test_multiple_addresses() -> anyhow::Result<()> {
296        telemetry_subscribers::init_for_testing();
297        let registry = Registry::new();
298        mysten_metrics::init_metrics(®istry);
299
300        let mock_provider = EthMockProvider::new();
301        mock_last_finalized_block(&mock_provider, 198);
302
303        let another_address =
304            EthAddress::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap();
305        let client = EthClient::new_mocked(
306            mock_provider.clone(),
307            HashSet::from_iter(vec![another_address]),
308        );
309
310        let addresses = HashMap::from_iter(vec![(EthAddress::zero(), 100), (another_address, 200)]);
311
312        let log1 = Log {
313            address: EthAddress::zero(),
314            transaction_hash: Some(TxHash::random()),
315            block_number: Some(U64::from(101)),
316            log_index: Some(U256::from(5)),
317            ..Default::default()
318        };
319        let eth_log1 = EthLog {
320            block_number: log1.block_number.unwrap().as_u64(),
321            tx_hash: log1.transaction_hash.unwrap(),
322            log_index_in_tx: 0,
323            log: log1.clone(),
324        };
325        mock_get_logs(
326            &mock_provider,
327            EthAddress::zero(),
328            100,
329            198,
330            vec![log1.clone()],
331        );
332        let log2 = Log {
333            address: another_address,
334            transaction_hash: Some(TxHash::random()),
335            block_number: Some(U64::from(201)),
336            log_index: Some(U256::from(6)),
337            ..Default::default()
338        };
339        mock_get_logs(
342            &mock_provider,
343            another_address,
344            200,
345            198,
346            vec![log2.clone()],
347        );
348
349        let (_handles, mut logs_rx, mut finalized_block_rx) =
350            EthSyncer::new(Arc::new(client), addresses)
351                .run(Arc::new(BridgeMetrics::new_for_testing()))
352                .await
353                .unwrap();
354
355        finalized_block_rx.changed().await.unwrap();
357        assert_eq!(*finalized_block_rx.borrow(), 198);
358        let (_contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
359        assert_eq!(end_block, 198);
360        assert_eq!(received_logs, vec![eth_log1.clone()]);
361        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
363
364        let log1 = Log {
365            address: EthAddress::zero(),
366            block_number: Some(U64::from(200)),
367            transaction_hash: Some(TxHash::random()),
368            log_index: Some(U256::from(7)),
369            ..Default::default()
370        };
371        let eth_log1 = EthLog {
372            block_number: log1.block_number.unwrap().as_u64(),
373            tx_hash: log1.transaction_hash.unwrap(),
374            log_index_in_tx: 0,
375            log: log1.clone(),
376        };
377        mock_get_logs(
378            &mock_provider,
379            EthAddress::zero(),
380            199,
381            400,
382            vec![log1.clone()],
383        );
384        let log2 = Log {
385            address: another_address,
386            transaction_hash: Some(TxHash::random()),
387            block_number: Some(U64::from(201)),
388            log_index: Some(U256::from(9)),
389            ..Default::default()
390        };
391        let eth_log2 = EthLog {
392            block_number: log2.block_number.unwrap().as_u64(),
393            tx_hash: log2.transaction_hash.unwrap(),
394            log_index_in_tx: 0,
395            log: log2.clone(),
396        };
397        mock_get_logs(
398            &mock_provider,
399            another_address,
400            200,
401            400,
402            vec![log2.clone()],
403        );
404        mock_last_finalized_block(&mock_provider, 400);
405
406        finalized_block_rx.changed().await.unwrap();
407        assert_eq!(*finalized_block_rx.borrow(), 400);
408        let mut logs_set = HashSet::new();
409        logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
410            logs_set.insert(format!("{:?}", log));
411        });
412        logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
413            logs_set.insert(format!("{:?}", log));
414        });
415        assert_eq!(
416            logs_set,
417            HashSet::from_iter(vec![format!("{:?}", eth_log1), format!("{:?}", eth_log2)])
418        );
419        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
421        Ok(())
422    }
423
424    #[tokio::test]
426    async fn test_paginated_eth_log_query() -> anyhow::Result<()> {
427        telemetry_subscribers::init_for_testing();
428        let registry = Registry::new();
429        mysten_metrics::init_metrics(®istry);
430        let mock_provider = EthMockProvider::new();
431        let start_block = 100;
432        let last_finalized_block = start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE + 1;
434        mock_last_finalized_block(&mock_provider, last_finalized_block);
435        let client = EthClient::new_mocked(
436            mock_provider.clone(),
437            HashSet::from_iter(vec![EthAddress::zero()]),
438        );
439        let result = client.get_last_finalized_block_id().await.unwrap();
440        assert_eq!(result, last_finalized_block);
441
442        let addresses = HashMap::from_iter(vec![(EthAddress::zero(), start_block)]);
443        let log = Log {
444            address: EthAddress::zero(),
445            transaction_hash: Some(TxHash::random()),
446            block_number: Some(U64::from(start_block)),
447            log_index: Some(U256::from(3)),
448            ..Default::default()
449        };
450        let log2 = Log {
451            address: EthAddress::zero(),
452            transaction_hash: Some(TxHash::random()),
453            block_number: Some(U64::from(last_finalized_block)),
454            log_index: Some(U256::from(3)),
455            ..Default::default()
456        };
457        let eth_log = EthLog {
458            block_number: start_block,
459            tx_hash: log.transaction_hash.unwrap(),
460            log_index_in_tx: 0,
461            log: log.clone(),
462        };
463        let eth_log2 = EthLog {
464            block_number: last_finalized_block,
465            tx_hash: log2.transaction_hash.unwrap(),
466            log_index_in_tx: 0,
467            log: log2.clone(),
468        };
469        mock_get_logs(
471            &mock_provider,
472            EthAddress::zero(),
473            start_block,
474            start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
475            vec![log.clone()],
476        );
477        mock_get_logs(
479            &mock_provider,
480            EthAddress::zero(),
481            start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE,
482            last_finalized_block,
483            vec![log2.clone()],
484        );
485
486        let (_handles, mut logs_rx, mut finalized_block_rx) =
487            EthSyncer::new(Arc::new(client), addresses)
488                .run(Arc::new(BridgeMetrics::new_for_testing()))
489                .await
490                .unwrap();
491
492        finalized_block_rx.changed().await.unwrap();
493        assert_eq!(*finalized_block_rx.borrow(), last_finalized_block);
494        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
495        assert_eq!(contract_address, EthAddress::zero());
496        assert_eq!(end_block, start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1);
497        assert_eq!(received_logs, vec![eth_log.clone()]);
498        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
499        assert_eq!(contract_address, EthAddress::zero());
500        assert_eq!(end_block, last_finalized_block);
501        assert_eq!(received_logs, vec![eth_log2.clone()]);
502        Ok(())
503    }
504}