1use crate::error::BridgeResult;
10use crate::eth_client::EthClient;
11use crate::metrics::BridgeMetrics;
12use crate::retry_with_max_elapsed_time;
13use crate::types::EthLog;
14use ethers::types::Address as EthAddress;
15use mysten_metrics::spawn_logged_monitored_task;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tokio::task::JoinHandle;
20use tokio::time::{self, Duration, Instant};
21use tracing::error;
22
23const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
24const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
25const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(5);
26
27pub struct EthSyncer<P> {
28 eth_client: Arc<EthClient<P>>,
29 contract_addresses: EthTargetAddresses,
30}
31
32pub type EthTargetAddresses = HashMap<EthAddress, u64>;
34
35#[allow(clippy::new_without_default)]
36impl<P> EthSyncer<P>
37where
38 P: ethers::providers::JsonRpcClient + 'static,
39{
40 pub fn new(eth_client: Arc<EthClient<P>>, contract_addresses: EthTargetAddresses) -> Self {
41 Self {
42 eth_client,
43 contract_addresses,
44 }
45 }
46
47 pub async fn run(
48 self,
49 metrics: Arc<BridgeMetrics>,
50 ) -> BridgeResult<(
51 Vec<JoinHandle<()>>,
52 mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
53 watch::Receiver<u64>,
54 )> {
55 let (eth_evnets_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
56 ETH_EVENTS_CHANNEL_SIZE,
57 &mysten_metrics::get_metrics()
58 .unwrap()
59 .channel_inflight
60 .with_label_values(&["eth_events_queue"]),
61 );
62 let last_finalized_block = self.eth_client.get_last_finalized_block_id().await?;
63 let (last_finalized_block_tx, last_finalized_block_rx) =
64 watch::channel(last_finalized_block);
65 let mut task_handles = vec![];
66 let eth_client_clone = self.eth_client.clone();
67 let metrics_clone = metrics.clone();
68 task_handles.push(spawn_logged_monitored_task!(
69 Self::run_finalized_block_refresh_task(
70 last_finalized_block_tx,
71 eth_client_clone,
72 metrics_clone
73 )
74 ));
75 for (contract_address, start_block) in self.contract_addresses {
76 let eth_evnets_tx_clone = eth_evnets_tx.clone();
77 let last_finalized_block_rx_clone = last_finalized_block_rx.clone();
78 let eth_client_clone = self.eth_client.clone();
79 let metrics_clone = metrics.clone();
80 task_handles.push(spawn_logged_monitored_task!(
81 Self::run_event_listening_task(
82 contract_address,
83 start_block,
84 last_finalized_block_rx_clone,
85 eth_evnets_tx_clone,
86 eth_client_clone,
87 metrics_clone,
88 )
89 ));
90 }
91 Ok((task_handles, eth_events_rx, last_finalized_block_rx))
92 }
93
94 async fn run_finalized_block_refresh_task(
95 last_finalized_block_sender: watch::Sender<u64>,
96 eth_client: Arc<EthClient<P>>,
97 metrics: Arc<BridgeMetrics>,
98 ) {
99 tracing::info!("Starting finalized block refresh task.");
100 let mut last_block_number = 0;
101 let mut interval = time::interval(FINALIZED_BLOCK_QUERY_INTERVAL);
102 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
103 loop {
104 interval.tick().await;
105 let Ok(Ok(new_value)) = retry_with_max_elapsed_time!(
107 eth_client.get_last_finalized_block_id(),
108 time::Duration::from_secs(600)
109 ) else {
110 error!("Failed to get last finalized block from eth client after retry");
111 continue;
112 };
113 tracing::debug!("Last finalized block: {}", new_value);
114 metrics.last_finalized_eth_block.set(new_value as i64);
115
116 if new_value > last_block_number {
117 last_finalized_block_sender
118 .send(new_value)
119 .expect("last_finalized_block channel receiver is closed");
120 tracing::info!("Observed new finalized eth block: {}", new_value);
121 last_block_number = new_value;
122 }
123 }
124 }
125
126 async fn run_event_listening_task(
129 contract_address: EthAddress,
130 mut start_block: u64,
131 mut last_finalized_block_receiver: watch::Receiver<u64>,
132 events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
133 eth_client: Arc<EthClient<P>>,
134 metrics: Arc<BridgeMetrics>,
135 ) {
136 tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
137 let contract_address_str = contract_address.to_string();
138 let mut more_blocks = false;
139 loop {
140 if !more_blocks {
142 last_finalized_block_receiver
143 .changed()
144 .await
145 .expect("last_finalized_block channel sender is closed");
146 }
147 let new_finalized_block = *last_finalized_block_receiver.borrow();
148 if new_finalized_block < start_block {
149 tracing::info!(
150 contract_address=?contract_address,
151 "New finalized block {} is smaller than start block {}, ignore",
152 new_finalized_block,
153 start_block,
154 );
155 continue;
156 }
157 let end_block = std::cmp::min(
159 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
160 new_finalized_block,
161 );
162 more_blocks = end_block < new_finalized_block;
163 let timer = Instant::now();
164 let Ok(Ok(events)) = retry_with_max_elapsed_time!(
165 eth_client.get_events_in_range(contract_address, start_block, end_block),
166 Duration::from_secs(600)
167 ) else {
168 error!("Failed to get events from eth client after retry");
169 continue;
170 };
171 tracing::debug!(
172 ?contract_address,
173 start_block,
174 end_block,
175 "Querying eth events took {:?}",
176 timer.elapsed()
177 );
178 let len = events.len();
179 let last_block = events.last().map(|e| e.block_number);
180
181 events_sender
188 .send((contract_address, end_block, events))
189 .await
190 .expect("All Eth event channel receivers are closed");
191 if len != 0 {
192 tracing::info!(
193 ?contract_address,
194 start_block,
195 end_block,
196 "Observed {len} new Eth events",
197 );
198 }
199 metrics
200 .last_synced_eth_blocks
201 .with_label_values(&[&contract_address_str])
202 .set(last_block.unwrap_or(end_block) as i64);
203 start_block = end_block + 1;
204 }
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use std::{collections::HashSet, str::FromStr};
211
212 use ethers::types::{Log, U64, U256};
213 use prometheus::Registry;
214 use tokio::sync::mpsc::error::TryRecvError;
215
216 use crate::{
217 eth_mock_provider::EthMockProvider,
218 test_utils::{mock_get_logs, mock_last_finalized_block},
219 };
220
221 use super::*;
222 use ethers::types::TxHash;
223
224 #[tokio::test]
225 async fn test_last_finalized_block() -> anyhow::Result<()> {
226 telemetry_subscribers::init_for_testing();
227 let registry = Registry::new();
228 mysten_metrics::init_metrics(®istry);
229 let mock_provider = EthMockProvider::new();
230 mock_last_finalized_block(&mock_provider, 777);
231 let client = EthClient::new_mocked(
232 mock_provider.clone(),
233 HashSet::from_iter(vec![EthAddress::zero()]),
234 );
235 let result = client.get_last_finalized_block_id().await.unwrap();
236 assert_eq!(result, 777);
237
238 let addresses = HashMap::from_iter(vec![(EthAddress::zero(), 100)]);
239 let log = Log {
240 address: EthAddress::zero(),
241 transaction_hash: Some(TxHash::random()),
242 block_number: Some(U64::from(777)),
243 log_index: Some(U256::from(3)),
244 ..Default::default()
245 };
246 let eth_log = EthLog {
247 block_number: 777,
248 tx_hash: log.transaction_hash.unwrap(),
249 log_index_in_tx: 0,
250 log: log.clone(),
251 };
252 mock_get_logs(
253 &mock_provider,
254 EthAddress::zero(),
255 100,
256 777,
257 vec![log.clone()],
258 );
259 let (_handles, mut logs_rx, mut finalized_block_rx) =
260 EthSyncer::new(Arc::new(client), addresses)
261 .run(Arc::new(BridgeMetrics::new_for_testing()))
262 .await
263 .unwrap();
264
265 finalized_block_rx.changed().await.unwrap();
267 assert_eq!(*finalized_block_rx.borrow(), 777);
268 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
269 assert_eq!(contract_address, EthAddress::zero());
270 assert_eq!(end_block, 777);
271 assert_eq!(received_logs, vec![eth_log.clone()]);
272 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
273
274 mock_get_logs(
275 &mock_provider,
276 EthAddress::zero(),
277 778,
278 888,
279 vec![log.clone()],
280 );
281 mock_last_finalized_block(&mock_provider, 888);
283 finalized_block_rx.changed().await.unwrap();
284 assert_eq!(*finalized_block_rx.borrow(), 888);
285 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
286 assert_eq!(contract_address, EthAddress::zero());
287 assert_eq!(end_block, 888);
288 assert_eq!(received_logs, vec![eth_log]);
289 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
290
291 Ok(())
292 }
293
294 #[tokio::test]
295 async fn test_multiple_addresses() -> anyhow::Result<()> {
296 telemetry_subscribers::init_for_testing();
297 let registry = Registry::new();
298 mysten_metrics::init_metrics(®istry);
299
300 let mock_provider = EthMockProvider::new();
301 mock_last_finalized_block(&mock_provider, 198);
302
303 let another_address =
304 EthAddress::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap();
305 let client = EthClient::new_mocked(
306 mock_provider.clone(),
307 HashSet::from_iter(vec![another_address]),
308 );
309
310 let addresses = HashMap::from_iter(vec![(EthAddress::zero(), 100), (another_address, 200)]);
311
312 let log1 = Log {
313 address: EthAddress::zero(),
314 transaction_hash: Some(TxHash::random()),
315 block_number: Some(U64::from(101)),
316 log_index: Some(U256::from(5)),
317 ..Default::default()
318 };
319 let eth_log1 = EthLog {
320 block_number: log1.block_number.unwrap().as_u64(),
321 tx_hash: log1.transaction_hash.unwrap(),
322 log_index_in_tx: 0,
323 log: log1.clone(),
324 };
325 mock_get_logs(
326 &mock_provider,
327 EthAddress::zero(),
328 100,
329 198,
330 vec![log1.clone()],
331 );
332 let log2 = Log {
333 address: another_address,
334 transaction_hash: Some(TxHash::random()),
335 block_number: Some(U64::from(201)),
336 log_index: Some(U256::from(6)),
337 ..Default::default()
338 };
339 mock_get_logs(
342 &mock_provider,
343 another_address,
344 200,
345 198,
346 vec![log2.clone()],
347 );
348
349 let (_handles, mut logs_rx, mut finalized_block_rx) =
350 EthSyncer::new(Arc::new(client), addresses)
351 .run(Arc::new(BridgeMetrics::new_for_testing()))
352 .await
353 .unwrap();
354
355 finalized_block_rx.changed().await.unwrap();
357 assert_eq!(*finalized_block_rx.borrow(), 198);
358 let (_contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
359 assert_eq!(end_block, 198);
360 assert_eq!(received_logs, vec![eth_log1.clone()]);
361 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
363
364 let log1 = Log {
365 address: EthAddress::zero(),
366 block_number: Some(U64::from(200)),
367 transaction_hash: Some(TxHash::random()),
368 log_index: Some(U256::from(7)),
369 ..Default::default()
370 };
371 let eth_log1 = EthLog {
372 block_number: log1.block_number.unwrap().as_u64(),
373 tx_hash: log1.transaction_hash.unwrap(),
374 log_index_in_tx: 0,
375 log: log1.clone(),
376 };
377 mock_get_logs(
378 &mock_provider,
379 EthAddress::zero(),
380 199,
381 400,
382 vec![log1.clone()],
383 );
384 let log2 = Log {
385 address: another_address,
386 transaction_hash: Some(TxHash::random()),
387 block_number: Some(U64::from(201)),
388 log_index: Some(U256::from(9)),
389 ..Default::default()
390 };
391 let eth_log2 = EthLog {
392 block_number: log2.block_number.unwrap().as_u64(),
393 tx_hash: log2.transaction_hash.unwrap(),
394 log_index_in_tx: 0,
395 log: log2.clone(),
396 };
397 mock_get_logs(
398 &mock_provider,
399 another_address,
400 200,
401 400,
402 vec![log2.clone()],
403 );
404 mock_last_finalized_block(&mock_provider, 400);
405
406 finalized_block_rx.changed().await.unwrap();
407 assert_eq!(*finalized_block_rx.borrow(), 400);
408 let mut logs_set = HashSet::new();
409 logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
410 logs_set.insert(format!("{:?}", log));
411 });
412 logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
413 logs_set.insert(format!("{:?}", log));
414 });
415 assert_eq!(
416 logs_set,
417 HashSet::from_iter(vec![format!("{:?}", eth_log1), format!("{:?}", eth_log2)])
418 );
419 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
421 Ok(())
422 }
423
424 #[tokio::test]
426 async fn test_paginated_eth_log_query() -> anyhow::Result<()> {
427 telemetry_subscribers::init_for_testing();
428 let registry = Registry::new();
429 mysten_metrics::init_metrics(®istry);
430 let mock_provider = EthMockProvider::new();
431 let start_block = 100;
432 let last_finalized_block = start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE + 1;
434 mock_last_finalized_block(&mock_provider, last_finalized_block);
435 let client = EthClient::new_mocked(
436 mock_provider.clone(),
437 HashSet::from_iter(vec![EthAddress::zero()]),
438 );
439 let result = client.get_last_finalized_block_id().await.unwrap();
440 assert_eq!(result, last_finalized_block);
441
442 let addresses = HashMap::from_iter(vec![(EthAddress::zero(), start_block)]);
443 let log = Log {
444 address: EthAddress::zero(),
445 transaction_hash: Some(TxHash::random()),
446 block_number: Some(U64::from(start_block)),
447 log_index: Some(U256::from(3)),
448 ..Default::default()
449 };
450 let log2 = Log {
451 address: EthAddress::zero(),
452 transaction_hash: Some(TxHash::random()),
453 block_number: Some(U64::from(last_finalized_block)),
454 log_index: Some(U256::from(3)),
455 ..Default::default()
456 };
457 let eth_log = EthLog {
458 block_number: start_block,
459 tx_hash: log.transaction_hash.unwrap(),
460 log_index_in_tx: 0,
461 log: log.clone(),
462 };
463 let eth_log2 = EthLog {
464 block_number: last_finalized_block,
465 tx_hash: log2.transaction_hash.unwrap(),
466 log_index_in_tx: 0,
467 log: log2.clone(),
468 };
469 mock_get_logs(
471 &mock_provider,
472 EthAddress::zero(),
473 start_block,
474 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
475 vec![log.clone()],
476 );
477 mock_get_logs(
479 &mock_provider,
480 EthAddress::zero(),
481 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE,
482 last_finalized_block,
483 vec![log2.clone()],
484 );
485
486 let (_handles, mut logs_rx, mut finalized_block_rx) =
487 EthSyncer::new(Arc::new(client), addresses)
488 .run(Arc::new(BridgeMetrics::new_for_testing()))
489 .await
490 .unwrap();
491
492 finalized_block_rx.changed().await.unwrap();
493 assert_eq!(*finalized_block_rx.borrow(), last_finalized_block);
494 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
495 assert_eq!(contract_address, EthAddress::zero());
496 assert_eq!(end_block, start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1);
497 assert_eq!(received_logs, vec![eth_log.clone()]);
498 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
499 assert_eq!(contract_address, EthAddress::zero());
500 assert_eq!(end_block, last_finalized_block);
501 assert_eq!(received_logs, vec![eth_log2.clone()]);
502 Ok(())
503 }
504}