1use crate::error::BridgeResult;
10use crate::eth_client::EthClient;
11use crate::metrics::BridgeMetrics;
12use crate::retry_with_max_elapsed_time;
13use crate::types::EthLog;
14use alloy::primitives::Address as EthAddress;
15use mysten_metrics::spawn_logged_monitored_task;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tokio::task::JoinHandle;
20use tokio::time::{self, Duration, Instant};
21use tracing::{error, warn};
22
23const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
24const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
25const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(5);
26
27pub struct EthSyncer {
28 eth_client: Arc<EthClient>,
29 contract_addresses: EthTargetAddresses,
30}
31
32pub type EthTargetAddresses = HashMap<EthAddress, u64>;
34
35#[allow(clippy::new_without_default)]
36impl EthSyncer {
37 pub fn new(eth_client: Arc<EthClient>, contract_addresses: EthTargetAddresses) -> Self {
38 Self {
39 eth_client,
40 contract_addresses,
41 }
42 }
43
44 pub async fn run(
45 self,
46 metrics: Arc<BridgeMetrics>,
47 ) -> BridgeResult<(
48 Vec<JoinHandle<()>>,
49 mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
50 watch::Receiver<u64>,
51 )> {
52 let (eth_evnets_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
53 ETH_EVENTS_CHANNEL_SIZE,
54 &mysten_metrics::get_metrics()
55 .unwrap()
56 .channel_inflight
57 .with_label_values(&["eth_events_queue"]),
58 );
59 let last_finalized_block = self.eth_client.get_last_finalized_block_id().await?;
60 let (last_finalized_block_tx, last_finalized_block_rx) =
61 watch::channel(last_finalized_block);
62 let mut task_handles = vec![];
63 let eth_client_clone = self.eth_client.clone();
64 let metrics_clone = metrics.clone();
65 task_handles.push(spawn_logged_monitored_task!(
66 Self::run_finalized_block_refresh_task(
67 last_finalized_block_tx,
68 eth_client_clone,
69 metrics_clone
70 )
71 ));
72 for (contract_address, start_block) in self.contract_addresses {
73 let eth_evnets_tx_clone = eth_evnets_tx.clone();
74 let last_finalized_block_rx_clone = last_finalized_block_rx.clone();
75 let eth_client_clone = self.eth_client.clone();
76 let metrics_clone = metrics.clone();
77 task_handles.push(spawn_logged_monitored_task!(
78 Self::run_event_listening_task(
79 contract_address,
80 start_block,
81 last_finalized_block_rx_clone,
82 eth_evnets_tx_clone,
83 eth_client_clone,
84 metrics_clone,
85 )
86 ));
87 }
88 Ok((task_handles, eth_events_rx, last_finalized_block_rx))
89 }
90
91 async fn run_finalized_block_refresh_task(
92 last_finalized_block_sender: watch::Sender<u64>,
93 eth_client: Arc<EthClient>,
94 metrics: Arc<BridgeMetrics>,
95 ) {
96 tracing::info!("Starting finalized block refresh task.");
97 let mut last_block_number = 0;
98 let mut interval = time::interval(FINALIZED_BLOCK_QUERY_INTERVAL);
99 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
100 loop {
101 interval.tick().await;
102 let Ok(Ok(new_value)) = retry_with_max_elapsed_time!(
104 eth_client.get_last_finalized_block_id(),
105 time::Duration::from_secs(600)
106 ) else {
107 error!("Failed to get last finalized block from eth client after retry");
108 continue;
109 };
110 tracing::debug!("Last finalized block: {}", new_value);
111 metrics.last_finalized_eth_block.set(new_value as i64);
112
113 if new_value > last_block_number {
114 last_finalized_block_sender
115 .send(new_value)
116 .expect("last_finalized_block channel receiver is closed");
117 tracing::info!("Observed new finalized eth block: {}", new_value);
118 last_block_number = new_value;
119 }
120 }
121 }
122
123 async fn run_event_listening_task(
126 contract_address: EthAddress,
127 mut start_block: u64,
128 mut last_finalized_block_receiver: watch::Receiver<u64>,
129 events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
130 eth_client: Arc<EthClient>,
131 metrics: Arc<BridgeMetrics>,
132 ) {
133 tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
134 let contract_address_str = contract_address.to_string();
135 let mut more_blocks = false;
136 let mut query_range = ETH_LOG_QUERY_MAX_BLOCK_RANGE;
138 loop {
139 if !more_blocks {
141 last_finalized_block_receiver
142 .changed()
143 .await
144 .expect("last_finalized_block channel sender is closed");
145 }
146 let new_finalized_block = *last_finalized_block_receiver.borrow();
147 if new_finalized_block < start_block {
148 tracing::info!(
149 contract_address=?contract_address,
150 "New finalized block {} is smaller than start block {}, ignore",
151 new_finalized_block,
152 start_block,
153 );
154 continue;
155 }
156 let end_block = std::cmp::min(start_block + query_range - 1, new_finalized_block);
159 more_blocks = end_block < new_finalized_block;
160 let timer = Instant::now();
161 let events = match eth_client
162 .get_events_in_range(contract_address, start_block, end_block)
163 .await
164 {
165 Ok(events) => events,
166 Err(e) => {
167 let err_str = format!("{:?}", e);
168 if err_str.contains("query returned more than")
169 || err_str.contains("-32005")
170 || err_str.contains("32005")
171 {
172 let new_range = (query_range / 2).max(1);
174 if new_range == query_range {
175 error!(
176 contract_address=?contract_address,
177 "Block query range is already 1 but RPC still returns -32005 \
178 for block {}. Retrying with standard backoff.",
179 start_block
180 );
181 } else {
182 warn!(
183 contract_address=?contract_address,
184 "RPC returned -32005 (too many results) for block range {}-{} \
185 (window={}). Shrinking window to {} blocks and retrying.",
186 start_block,
187 end_block,
188 query_range,
189 new_range,
190 );
191 query_range = new_range;
192 }
193 more_blocks = true;
196 continue;
197 }
198 let Ok(Ok(events)) = retry_with_max_elapsed_time!(
200 eth_client.get_events_in_range(contract_address, start_block, end_block),
201 Duration::from_secs(600)
202 ) else {
203 error!("Failed to get events from eth client after retry");
204 continue;
205 };
206 events
207 }
208 };
209 tracing::debug!(
210 ?contract_address,
211 start_block,
212 end_block,
213 "Querying eth events took {:?}",
214 timer.elapsed()
215 );
216 let len = events.len();
217 let last_block = events.last().map(|e| e.block_number);
218
219 events_sender
226 .send((contract_address, end_block, events))
227 .await
228 .expect("All Eth event channel receivers are closed");
229 if len != 0 {
230 tracing::info!(
231 ?contract_address,
232 start_block,
233 end_block,
234 "Observed {len} new Eth events",
235 );
236 }
237 metrics
238 .last_synced_eth_blocks
239 .with_label_values(&[&contract_address_str])
240 .set(last_block.unwrap_or(end_block) as i64);
241 start_block = end_block + 1;
242 }
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use crate::{
250 eth_mock_provider::EthMockService,
251 test_utils::{mock_get_logs, mock_last_finalized_block},
252 };
253 use alloy::{primitives::TxHash, rpc::types::Log};
254 use prometheus::Registry;
255 use std::{collections::HashSet, str::FromStr};
256 use tokio::sync::mpsc::error::TryRecvError;
257
258 #[tokio::test]
259 async fn test_last_finalized_block() -> anyhow::Result<()> {
260 telemetry_subscribers::init_for_testing();
261 let registry = Registry::new();
262 mysten_metrics::init_metrics(®istry);
263 let mock_service = EthMockService::new();
264 mock_last_finalized_block(&mock_service, 777);
265 let client = EthClient::new_mocked(
266 mock_service.clone(),
267 HashSet::from_iter(vec![EthAddress::default()]),
268 );
269 let result = client.get_last_finalized_block_id().await.unwrap();
270 assert_eq!(result, 777);
271
272 let addresses = HashMap::from_iter(vec![(EthAddress::default(), 100)]);
273 let log = Log {
274 transaction_hash: Some(TxHash::random()),
275 block_number: Some(777),
276 log_index: Some(3),
277 ..Default::default()
278 };
279 let eth_log = EthLog {
280 block_number: 777,
281 tx_hash: log.transaction_hash.unwrap(),
282 log_index_in_tx: 0,
283 log: log.clone(),
284 };
285 mock_get_logs(
286 &mock_service,
287 EthAddress::default(),
288 100,
289 777,
290 vec![log.clone()],
291 );
292 let (_handles, mut logs_rx, mut finalized_block_rx) =
293 EthSyncer::new(Arc::new(client), addresses)
294 .run(Arc::new(BridgeMetrics::new_for_testing()))
295 .await
296 .unwrap();
297
298 finalized_block_rx.changed().await.unwrap();
300 assert_eq!(*finalized_block_rx.borrow(), 777);
301 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
302 assert_eq!(contract_address, EthAddress::default());
303 assert_eq!(end_block, 777);
304 assert_eq!(received_logs, vec![eth_log.clone()]);
305 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
306
307 mock_get_logs(
308 &mock_service,
309 EthAddress::default(),
310 778,
311 888,
312 vec![log.clone()],
313 );
314 mock_last_finalized_block(&mock_service, 888);
316 finalized_block_rx.changed().await.unwrap();
317 assert_eq!(*finalized_block_rx.borrow(), 888);
318 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
319 assert_eq!(contract_address, EthAddress::default());
320 assert_eq!(end_block, 888);
321 assert_eq!(received_logs, vec![eth_log]);
322 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
323
324 Ok(())
325 }
326
327 #[tokio::test]
328 async fn test_multiple_addresses() -> anyhow::Result<()> {
329 telemetry_subscribers::init_for_testing();
330 let registry = Registry::new();
331 mysten_metrics::init_metrics(®istry);
332
333 let mock_service = EthMockService::new();
334 mock_last_finalized_block(&mock_service, 198);
335
336 let another_address =
337 EthAddress::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap();
338 let client = EthClient::new_mocked(
339 mock_service.clone(),
340 HashSet::from_iter(vec![another_address]),
341 );
342
343 let addresses =
344 HashMap::from_iter(vec![(EthAddress::default(), 100), (another_address, 200)]);
345
346 let log1 = Log {
347 transaction_hash: Some(TxHash::random()),
348 block_number: Some(101),
349 log_index: Some(5),
350 ..Default::default()
351 };
352 let eth_log1 = EthLog {
353 block_number: log1.block_number.unwrap(),
354 tx_hash: log1.transaction_hash.unwrap(),
355 log_index_in_tx: 0,
356 log: log1.clone(),
357 };
358 mock_get_logs(
359 &mock_service,
360 EthAddress::default(),
361 100,
362 198,
363 vec![log1.clone()],
364 );
365 let log2 = Log {
366 inner: alloy::primitives::Log {
367 address: another_address,
368 ..Default::default()
369 },
370 transaction_hash: Some(TxHash::random()),
371 block_number: Some(201),
372 log_index: Some(6),
373 ..Default::default()
374 };
375 mock_get_logs(&mock_service, another_address, 200, 198, vec![log2.clone()]);
378
379 let (_handles, mut logs_rx, mut finalized_block_rx) =
380 EthSyncer::new(Arc::new(client), addresses)
381 .run(Arc::new(BridgeMetrics::new_for_testing()))
382 .await
383 .unwrap();
384
385 finalized_block_rx.changed().await.unwrap();
387 assert_eq!(*finalized_block_rx.borrow(), 198);
388 let (_contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
389 assert_eq!(end_block, 198);
390 assert_eq!(received_logs, vec![eth_log1.clone()]);
391 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
393
394 let log1 = Log {
395 block_number: Some(200),
396 transaction_hash: Some(TxHash::random()),
397 log_index: Some(7),
398 ..Default::default()
399 };
400 let eth_log1 = EthLog {
401 block_number: log1.block_number.unwrap(),
402 tx_hash: log1.transaction_hash.unwrap(),
403 log_index_in_tx: 0,
404 log: log1.clone(),
405 };
406 mock_get_logs(
407 &mock_service,
408 EthAddress::default(),
409 199,
410 400,
411 vec![log1.clone()],
412 );
413 let log2 = Log {
414 inner: alloy::primitives::Log {
415 address: another_address,
416 ..Default::default()
417 },
418 transaction_hash: Some(TxHash::random()),
419 block_number: Some(201),
420 log_index: Some(9),
421 ..Default::default()
422 };
423 let eth_log2 = EthLog {
424 block_number: log2.block_number.unwrap(),
425 tx_hash: log2.transaction_hash.unwrap(),
426 log_index_in_tx: 0,
427 log: log2.clone(),
428 };
429 mock_get_logs(&mock_service, another_address, 200, 400, vec![log2.clone()]);
430 mock_last_finalized_block(&mock_service, 400);
431
432 finalized_block_rx.changed().await.unwrap();
433 assert_eq!(*finalized_block_rx.borrow(), 400);
434 let mut logs_set = HashSet::new();
435 logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
436 logs_set.insert(format!("{:?}", log));
437 });
438 logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
439 logs_set.insert(format!("{:?}", log));
440 });
441 assert_eq!(
442 logs_set,
443 HashSet::from_iter(vec![format!("{:?}", eth_log1), format!("{:?}", eth_log2)])
444 );
445 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
447 Ok(())
448 }
449
450 #[tokio::test]
452 async fn test_paginated_eth_log_query() -> anyhow::Result<()> {
453 telemetry_subscribers::init_for_testing();
454 let registry = Registry::new();
455 mysten_metrics::init_metrics(®istry);
456 let mock_service = EthMockService::new();
457 let start_block = 100;
458 let last_finalized_block = start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE + 1;
460 mock_last_finalized_block(&mock_service, last_finalized_block);
461 let client = EthClient::new_mocked(
462 mock_service.clone(),
463 HashSet::from_iter(vec![EthAddress::default()]),
464 );
465 let result = client.get_last_finalized_block_id().await.unwrap();
466 assert_eq!(result, last_finalized_block);
467
468 let addresses = HashMap::from_iter(vec![(EthAddress::default(), start_block)]);
469 let log = Log {
470 transaction_hash: Some(TxHash::random()),
471 block_number: Some(start_block),
472 log_index: Some(3),
473 ..Default::default()
474 };
475 let log2 = Log {
476 transaction_hash: Some(TxHash::random()),
477 block_number: Some(last_finalized_block),
478 log_index: Some(3),
479 ..Default::default()
480 };
481 let eth_log = EthLog {
482 block_number: start_block,
483 tx_hash: log.transaction_hash.unwrap(),
484 log_index_in_tx: 0,
485 log: log.clone(),
486 };
487 let eth_log2 = EthLog {
488 block_number: last_finalized_block,
489 tx_hash: log2.transaction_hash.unwrap(),
490 log_index_in_tx: 0,
491 log: log2.clone(),
492 };
493 mock_get_logs(
495 &mock_service,
496 EthAddress::default(),
497 start_block,
498 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
499 vec![log.clone()],
500 );
501 mock_get_logs(
503 &mock_service,
504 EthAddress::default(),
505 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE,
506 last_finalized_block,
507 vec![log2.clone()],
508 );
509
510 let (_handles, mut logs_rx, mut finalized_block_rx) =
511 EthSyncer::new(Arc::new(client), addresses)
512 .run(Arc::new(BridgeMetrics::new_for_testing()))
513 .await
514 .unwrap();
515
516 finalized_block_rx.changed().await.unwrap();
517 assert_eq!(*finalized_block_rx.borrow(), last_finalized_block);
518 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
519 assert_eq!(contract_address, EthAddress::default());
520 assert_eq!(end_block, start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1);
521 assert_eq!(received_logs, vec![eth_log.clone()]);
522 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
523 assert_eq!(contract_address, EthAddress::default());
524 assert_eq!(end_block, last_finalized_block);
525 assert_eq!(received_logs, vec![eth_log2.clone()]);
526 Ok(())
527 }
528}