1use crate::error::BridgeResult;
10use crate::eth_client::EthClient;
11use crate::metrics::BridgeMetrics;
12use crate::retry_with_max_elapsed_time;
13use crate::types::EthLog;
14use alloy::primitives::Address as EthAddress;
15use mysten_metrics::spawn_logged_monitored_task;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tokio::task::JoinHandle;
20use tokio::time::{self, Duration, Instant};
21use tracing::error;
22
23const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
24const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
25const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(5);
26
27pub struct EthSyncer {
28 eth_client: Arc<EthClient>,
29 contract_addresses: EthTargetAddresses,
30}
31
32pub type EthTargetAddresses = HashMap<EthAddress, u64>;
34
35#[allow(clippy::new_without_default)]
36impl EthSyncer {
37 pub fn new(eth_client: Arc<EthClient>, contract_addresses: EthTargetAddresses) -> Self {
38 Self {
39 eth_client,
40 contract_addresses,
41 }
42 }
43
44 pub async fn run(
45 self,
46 metrics: Arc<BridgeMetrics>,
47 ) -> BridgeResult<(
48 Vec<JoinHandle<()>>,
49 mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
50 watch::Receiver<u64>,
51 )> {
52 let (eth_evnets_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
53 ETH_EVENTS_CHANNEL_SIZE,
54 &mysten_metrics::get_metrics()
55 .unwrap()
56 .channel_inflight
57 .with_label_values(&["eth_events_queue"]),
58 );
59 let last_finalized_block = self.eth_client.get_last_finalized_block_id().await?;
60 let (last_finalized_block_tx, last_finalized_block_rx) =
61 watch::channel(last_finalized_block);
62 let mut task_handles = vec![];
63 let eth_client_clone = self.eth_client.clone();
64 let metrics_clone = metrics.clone();
65 task_handles.push(spawn_logged_monitored_task!(
66 Self::run_finalized_block_refresh_task(
67 last_finalized_block_tx,
68 eth_client_clone,
69 metrics_clone
70 )
71 ));
72 for (contract_address, start_block) in self.contract_addresses {
73 let eth_evnets_tx_clone = eth_evnets_tx.clone();
74 let last_finalized_block_rx_clone = last_finalized_block_rx.clone();
75 let eth_client_clone = self.eth_client.clone();
76 let metrics_clone = metrics.clone();
77 task_handles.push(spawn_logged_monitored_task!(
78 Self::run_event_listening_task(
79 contract_address,
80 start_block,
81 last_finalized_block_rx_clone,
82 eth_evnets_tx_clone,
83 eth_client_clone,
84 metrics_clone,
85 )
86 ));
87 }
88 Ok((task_handles, eth_events_rx, last_finalized_block_rx))
89 }
90
91 async fn run_finalized_block_refresh_task(
92 last_finalized_block_sender: watch::Sender<u64>,
93 eth_client: Arc<EthClient>,
94 metrics: Arc<BridgeMetrics>,
95 ) {
96 tracing::info!("Starting finalized block refresh task.");
97 let mut last_block_number = 0;
98 let mut interval = time::interval(FINALIZED_BLOCK_QUERY_INTERVAL);
99 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
100 loop {
101 interval.tick().await;
102 let Ok(Ok(new_value)) = retry_with_max_elapsed_time!(
104 eth_client.get_last_finalized_block_id(),
105 time::Duration::from_secs(600)
106 ) else {
107 error!("Failed to get last finalized block from eth client after retry");
108 continue;
109 };
110 tracing::debug!("Last finalized block: {}", new_value);
111 metrics.last_finalized_eth_block.set(new_value as i64);
112
113 if new_value > last_block_number {
114 last_finalized_block_sender
115 .send(new_value)
116 .expect("last_finalized_block channel receiver is closed");
117 tracing::info!("Observed new finalized eth block: {}", new_value);
118 last_block_number = new_value;
119 }
120 }
121 }
122
123 async fn run_event_listening_task(
126 contract_address: EthAddress,
127 mut start_block: u64,
128 mut last_finalized_block_receiver: watch::Receiver<u64>,
129 events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
130 eth_client: Arc<EthClient>,
131 metrics: Arc<BridgeMetrics>,
132 ) {
133 tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
134 let contract_address_str = contract_address.to_string();
135 let mut more_blocks = false;
136 loop {
137 if !more_blocks {
139 last_finalized_block_receiver
140 .changed()
141 .await
142 .expect("last_finalized_block channel sender is closed");
143 }
144 let new_finalized_block = *last_finalized_block_receiver.borrow();
145 if new_finalized_block < start_block {
146 tracing::info!(
147 contract_address=?contract_address,
148 "New finalized block {} is smaller than start block {}, ignore",
149 new_finalized_block,
150 start_block,
151 );
152 continue;
153 }
154 let end_block = std::cmp::min(
156 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
157 new_finalized_block,
158 );
159 more_blocks = end_block < new_finalized_block;
160 let timer = Instant::now();
161 let Ok(Ok(events)) = retry_with_max_elapsed_time!(
162 eth_client.get_events_in_range(contract_address, start_block, end_block),
163 Duration::from_secs(600)
164 ) else {
165 error!("Failed to get events from eth client after retry");
166 continue;
167 };
168 tracing::debug!(
169 ?contract_address,
170 start_block,
171 end_block,
172 "Querying eth events took {:?}",
173 timer.elapsed()
174 );
175 let len = events.len();
176 let last_block = events.last().map(|e| e.block_number);
177
178 events_sender
185 .send((contract_address, end_block, events))
186 .await
187 .expect("All Eth event channel receivers are closed");
188 if len != 0 {
189 tracing::info!(
190 ?contract_address,
191 start_block,
192 end_block,
193 "Observed {len} new Eth events",
194 );
195 }
196 metrics
197 .last_synced_eth_blocks
198 .with_label_values(&[&contract_address_str])
199 .set(last_block.unwrap_or(end_block) as i64);
200 start_block = end_block + 1;
201 }
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::{
209 eth_mock_provider::EthMockService,
210 test_utils::{mock_get_logs, mock_last_finalized_block},
211 };
212 use alloy::{primitives::TxHash, rpc::types::Log};
213 use prometheus::Registry;
214 use std::{collections::HashSet, str::FromStr};
215 use tokio::sync::mpsc::error::TryRecvError;
216
217 #[tokio::test]
218 async fn test_last_finalized_block() -> anyhow::Result<()> {
219 telemetry_subscribers::init_for_testing();
220 let registry = Registry::new();
221 mysten_metrics::init_metrics(®istry);
222 let mock_service = EthMockService::new();
223 mock_last_finalized_block(&mock_service, 777);
224 let client = EthClient::new_mocked(
225 mock_service.clone(),
226 HashSet::from_iter(vec![EthAddress::default()]),
227 );
228 let result = client.get_last_finalized_block_id().await.unwrap();
229 assert_eq!(result, 777);
230
231 let addresses = HashMap::from_iter(vec![(EthAddress::default(), 100)]);
232 let log = Log {
233 transaction_hash: Some(TxHash::random()),
234 block_number: Some(777),
235 log_index: Some(3),
236 ..Default::default()
237 };
238 let eth_log = EthLog {
239 block_number: 777,
240 tx_hash: log.transaction_hash.unwrap(),
241 log_index_in_tx: 0,
242 log: log.clone(),
243 };
244 mock_get_logs(
245 &mock_service,
246 EthAddress::default(),
247 100,
248 777,
249 vec![log.clone()],
250 );
251 let (_handles, mut logs_rx, mut finalized_block_rx) =
252 EthSyncer::new(Arc::new(client), addresses)
253 .run(Arc::new(BridgeMetrics::new_for_testing()))
254 .await
255 .unwrap();
256
257 finalized_block_rx.changed().await.unwrap();
259 assert_eq!(*finalized_block_rx.borrow(), 777);
260 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
261 assert_eq!(contract_address, EthAddress::default());
262 assert_eq!(end_block, 777);
263 assert_eq!(received_logs, vec![eth_log.clone()]);
264 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
265
266 mock_get_logs(
267 &mock_service,
268 EthAddress::default(),
269 778,
270 888,
271 vec![log.clone()],
272 );
273 mock_last_finalized_block(&mock_service, 888);
275 finalized_block_rx.changed().await.unwrap();
276 assert_eq!(*finalized_block_rx.borrow(), 888);
277 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
278 assert_eq!(contract_address, EthAddress::default());
279 assert_eq!(end_block, 888);
280 assert_eq!(received_logs, vec![eth_log]);
281 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
282
283 Ok(())
284 }
285
286 #[tokio::test]
287 async fn test_multiple_addresses() -> anyhow::Result<()> {
288 telemetry_subscribers::init_for_testing();
289 let registry = Registry::new();
290 mysten_metrics::init_metrics(®istry);
291
292 let mock_service = EthMockService::new();
293 mock_last_finalized_block(&mock_service, 198);
294
295 let another_address =
296 EthAddress::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap();
297 let client = EthClient::new_mocked(
298 mock_service.clone(),
299 HashSet::from_iter(vec![another_address]),
300 );
301
302 let addresses =
303 HashMap::from_iter(vec![(EthAddress::default(), 100), (another_address, 200)]);
304
305 let log1 = Log {
306 transaction_hash: Some(TxHash::random()),
307 block_number: Some(101),
308 log_index: Some(5),
309 ..Default::default()
310 };
311 let eth_log1 = EthLog {
312 block_number: log1.block_number.unwrap(),
313 tx_hash: log1.transaction_hash.unwrap(),
314 log_index_in_tx: 0,
315 log: log1.clone(),
316 };
317 mock_get_logs(
318 &mock_service,
319 EthAddress::default(),
320 100,
321 198,
322 vec![log1.clone()],
323 );
324 let log2 = Log {
325 inner: alloy::primitives::Log {
326 address: another_address,
327 ..Default::default()
328 },
329 transaction_hash: Some(TxHash::random()),
330 block_number: Some(201),
331 log_index: Some(6),
332 ..Default::default()
333 };
334 mock_get_logs(&mock_service, another_address, 200, 198, vec![log2.clone()]);
337
338 let (_handles, mut logs_rx, mut finalized_block_rx) =
339 EthSyncer::new(Arc::new(client), addresses)
340 .run(Arc::new(BridgeMetrics::new_for_testing()))
341 .await
342 .unwrap();
343
344 finalized_block_rx.changed().await.unwrap();
346 assert_eq!(*finalized_block_rx.borrow(), 198);
347 let (_contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
348 assert_eq!(end_block, 198);
349 assert_eq!(received_logs, vec![eth_log1.clone()]);
350 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
352
353 let log1 = Log {
354 block_number: Some(200),
355 transaction_hash: Some(TxHash::random()),
356 log_index: Some(7),
357 ..Default::default()
358 };
359 let eth_log1 = EthLog {
360 block_number: log1.block_number.unwrap(),
361 tx_hash: log1.transaction_hash.unwrap(),
362 log_index_in_tx: 0,
363 log: log1.clone(),
364 };
365 mock_get_logs(
366 &mock_service,
367 EthAddress::default(),
368 199,
369 400,
370 vec![log1.clone()],
371 );
372 let log2 = Log {
373 inner: alloy::primitives::Log {
374 address: another_address,
375 ..Default::default()
376 },
377 transaction_hash: Some(TxHash::random()),
378 block_number: Some(201),
379 log_index: Some(9),
380 ..Default::default()
381 };
382 let eth_log2 = EthLog {
383 block_number: log2.block_number.unwrap(),
384 tx_hash: log2.transaction_hash.unwrap(),
385 log_index_in_tx: 0,
386 log: log2.clone(),
387 };
388 mock_get_logs(&mock_service, another_address, 200, 400, vec![log2.clone()]);
389 mock_last_finalized_block(&mock_service, 400);
390
391 finalized_block_rx.changed().await.unwrap();
392 assert_eq!(*finalized_block_rx.borrow(), 400);
393 let mut logs_set = HashSet::new();
394 logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
395 logs_set.insert(format!("{:?}", log));
396 });
397 logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
398 logs_set.insert(format!("{:?}", log));
399 });
400 assert_eq!(
401 logs_set,
402 HashSet::from_iter(vec![format!("{:?}", eth_log1), format!("{:?}", eth_log2)])
403 );
404 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
406 Ok(())
407 }
408
409 #[tokio::test]
411 async fn test_paginated_eth_log_query() -> anyhow::Result<()> {
412 telemetry_subscribers::init_for_testing();
413 let registry = Registry::new();
414 mysten_metrics::init_metrics(®istry);
415 let mock_service = EthMockService::new();
416 let start_block = 100;
417 let last_finalized_block = start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE + 1;
419 mock_last_finalized_block(&mock_service, last_finalized_block);
420 let client = EthClient::new_mocked(
421 mock_service.clone(),
422 HashSet::from_iter(vec![EthAddress::default()]),
423 );
424 let result = client.get_last_finalized_block_id().await.unwrap();
425 assert_eq!(result, last_finalized_block);
426
427 let addresses = HashMap::from_iter(vec![(EthAddress::default(), start_block)]);
428 let log = Log {
429 transaction_hash: Some(TxHash::random()),
430 block_number: Some(start_block),
431 log_index: Some(3),
432 ..Default::default()
433 };
434 let log2 = Log {
435 transaction_hash: Some(TxHash::random()),
436 block_number: Some(last_finalized_block),
437 log_index: Some(3),
438 ..Default::default()
439 };
440 let eth_log = EthLog {
441 block_number: start_block,
442 tx_hash: log.transaction_hash.unwrap(),
443 log_index_in_tx: 0,
444 log: log.clone(),
445 };
446 let eth_log2 = EthLog {
447 block_number: last_finalized_block,
448 tx_hash: log2.transaction_hash.unwrap(),
449 log_index_in_tx: 0,
450 log: log2.clone(),
451 };
452 mock_get_logs(
454 &mock_service,
455 EthAddress::default(),
456 start_block,
457 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
458 vec![log.clone()],
459 );
460 mock_get_logs(
462 &mock_service,
463 EthAddress::default(),
464 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE,
465 last_finalized_block,
466 vec![log2.clone()],
467 );
468
469 let (_handles, mut logs_rx, mut finalized_block_rx) =
470 EthSyncer::new(Arc::new(client), addresses)
471 .run(Arc::new(BridgeMetrics::new_for_testing()))
472 .await
473 .unwrap();
474
475 finalized_block_rx.changed().await.unwrap();
476 assert_eq!(*finalized_block_rx.borrow(), last_finalized_block);
477 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
478 assert_eq!(contract_address, EthAddress::default());
479 assert_eq!(end_block, start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1);
480 assert_eq!(received_logs, vec![eth_log.clone()]);
481 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
482 assert_eq!(contract_address, EthAddress::default());
483 assert_eq!(end_block, last_finalized_block);
484 assert_eq!(received_logs, vec![eth_log2.clone()]);
485 Ok(())
486 }
487}