1use crate::abi::EthBridgeEvent;
10use crate::action_executor::{
11 BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
12};
13use crate::error::BridgeError;
14use crate::events::SuiBridgeEvent;
15use crate::metrics::BridgeMetrics;
16use crate::storage::BridgeOrchestratorTables;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::types::EthLog;
19use ethers::types::Address as EthAddress;
20use mysten_metrics::spawn_logged_monitored_task;
21use std::sync::Arc;
22use sui_json_rpc_types::SuiEvent;
23use sui_types::Identifier;
24use tokio::task::JoinHandle;
25use tracing::{error, info};
26
27pub struct BridgeOrchestrator<C> {
28 _sui_client: Arc<SuiClient<C>>,
29 sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
30 eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
31 store: Arc<BridgeOrchestratorTables>,
32 sui_monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
33 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
34 metrics: Arc<BridgeMetrics>,
35}
36
37impl<C> BridgeOrchestrator<C>
38where
39 C: SuiClientInner + 'static,
40{
41 pub fn new(
42 sui_client: Arc<SuiClient<C>>,
43 sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
44 eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
45 store: Arc<BridgeOrchestratorTables>,
46 sui_monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
47 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
48 metrics: Arc<BridgeMetrics>,
49 ) -> Self {
50 Self {
51 _sui_client: sui_client,
52 sui_events_rx,
53 eth_events_rx,
54 store,
55 sui_monitor_tx,
56 eth_monitor_tx,
57 metrics,
58 }
59 }
60
61 pub async fn run(
62 self,
63 bridge_action_executor: impl BridgeActionExecutorTrait,
64 ) -> Vec<JoinHandle<()>> {
65 tracing::info!("Starting BridgeOrchestrator");
66 let mut task_handles = vec![];
67 let store_clone = self.store.clone();
68
69 let (handles, executor_sender) = bridge_action_executor.run();
71 task_handles.extend(handles);
72 let executor_sender_clone = executor_sender.clone();
73 let metrics_clone = self.metrics.clone();
74 task_handles.push(spawn_logged_monitored_task!(Self::run_sui_watcher(
75 store_clone,
76 executor_sender_clone,
77 self.sui_events_rx,
78 self.sui_monitor_tx,
79 metrics_clone,
80 )));
81 let store_clone = self.store.clone();
82
83 let actions = store_clone
85 .get_all_pending_actions()
86 .into_values()
87 .collect::<Vec<_>>();
88 for action in actions {
89 submit_to_executor(&executor_sender, action)
90 .await
91 .expect("Submit to executor should not fail");
92 }
93
94 let metrics_clone = self.metrics.clone();
95 task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
96 store_clone,
97 executor_sender,
98 self.eth_events_rx,
99 self.eth_monitor_tx,
100 metrics_clone,
101 )));
102
103 task_handles
104 }
105
106 async fn run_sui_watcher(
107 store: Arc<BridgeOrchestratorTables>,
108 executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
109 mut sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
110 monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
111 metrics: Arc<BridgeMetrics>,
112 ) {
113 info!("Starting sui watcher task");
114 while let Some((identifier, events)) = sui_events_rx.recv().await {
115 if events.is_empty() {
116 continue;
117 }
118 info!("Received {} Sui events: {:?}", events.len(), events);
119 metrics
120 .sui_watcher_received_events
121 .inc_by(events.len() as u64);
122 let bridge_events = events
123 .iter()
124 .filter_map(|sui_event| {
125 match SuiBridgeEvent::try_from_sui_event(sui_event) {
126 Ok(bridge_event) => Some(bridge_event),
127 Err(BridgeError::ZeroValueBridgeTransfer(_)) => {
129 error!("Zero value bridge transfer: {:?}", sui_event);
130 None
131 }
132 Err(e) => {
133 panic!(
134 "Sui Event could not be deserialzed to SuiBridgeEvent: {:?}",
135 e
136 );
137 }
138 }
139 })
140 .collect::<Vec<_>>();
141
142 let mut actions = vec![];
143 for (sui_event, opt_bridge_event) in events.iter().zip(bridge_events) {
144 if opt_bridge_event.is_none() {
145 metrics.sui_watcher_unrecognized_events.inc();
147 error!("Sui event not recognized: {:?}", sui_event);
148 continue;
149 }
150 let bridge_event: SuiBridgeEvent = opt_bridge_event.unwrap();
152 info!("Observed Sui bridge event: {:?}", bridge_event);
153
154 monitor_tx
156 .send(bridge_event.clone())
157 .await
158 .expect("Sending event to monitor channel should not fail");
159
160 if let Some(mut action) = bridge_event.try_into_bridge_action() {
161 metrics.last_observed_actions_seq_num.with_label_values(&[
162 action.chain_id().to_string().as_str(),
163 action.action_type().to_string().as_str(),
164 ]);
165
166 action = action.update_to_token_transfer();
167
168 actions.push(action);
169 }
170 }
171
172 if !actions.is_empty() {
173 info!("Received {} actions from Sui: {:?}", actions.len(), actions);
174 metrics
175 .sui_watcher_received_actions
176 .inc_by(actions.len() as u64);
177 store
179 .insert_pending_actions(&actions)
180 .expect("Store operation should not fail");
181 for action in actions {
182 submit_to_executor(&executor_tx, action)
183 .await
184 .expect("Submit to executor should not fail");
185 }
186 }
187
188 let cursor = events.last().unwrap().id;
190 store
191 .update_sui_event_cursor(identifier, cursor)
192 .expect("Store operation should not fail");
193 }
194 panic!("Sui event channel was closed unexpectedly");
195 }
196
197 async fn run_eth_watcher(
198 store: Arc<BridgeOrchestratorTables>,
199 executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
200 mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(
201 ethers::types::Address,
202 u64,
203 Vec<EthLog>,
204 )>,
205 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
206 metrics: Arc<BridgeMetrics>,
207 ) {
208 info!("Starting eth watcher task");
209 while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
210 if logs.is_empty() {
211 store
212 .update_eth_event_cursor(contract, end_block)
213 .expect("Store operation should not fail");
214 continue;
215 }
216
217 info!("Received {} Eth events", logs.len());
218 metrics
219 .eth_watcher_received_events
220 .inc_by(logs.len() as u64);
221
222 let bridge_events = logs
223 .iter()
224 .map(EthBridgeEvent::try_from_eth_log)
225 .collect::<Vec<_>>();
226
227 let mut actions = vec![];
228 for (log, opt_bridge_event) in logs.iter().zip(bridge_events) {
229 if opt_bridge_event.is_none() {
230 metrics.eth_watcher_unrecognized_events.inc();
232 error!("Eth event not recognized: {:?}", log);
233 continue;
234 }
235 let bridge_event = opt_bridge_event.unwrap();
237 info!("Observed Eth bridge event: {:?}", bridge_event);
238
239 eth_monitor_tx
241 .send(bridge_event.clone())
242 .await
243 .expect("Sending event to monitor channel should not fail");
244
245 match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
246 Ok(Some(action)) => {
247 metrics.last_observed_actions_seq_num.with_label_values(&[
248 action.chain_id().to_string().as_str(),
249 action.action_type().to_string().as_str(),
250 ]);
251 actions.push(action)
252 }
253 Ok(None) => {}
254 Err(e) => {
255 error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
256 }
257 }
258 }
259 if !actions.is_empty() {
260 info!("Received {} actions from Eth: {:?}", actions.len(), actions);
261 metrics
262 .eth_watcher_received_actions
263 .inc_by(actions.len() as u64);
264 store
266 .insert_pending_actions(&actions)
267 .expect("Store operation should not fail");
268 for action in actions {
270 submit_to_executor(&executor_tx, action)
271 .await
272 .expect("Submit to executor should not fail");
273 }
274 }
275
276 store
277 .update_eth_event_cursor(contract, end_block)
278 .expect("Store operation should not fail");
279 }
280 panic!("Eth event channel was closed");
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use crate::{
287 test_utils::{get_test_eth_to_sui_bridge_action, get_test_log_and_action},
288 types::BridgeActionDigest,
289 };
290 use ethers::types::{Address as EthAddress, TxHash};
291 use prometheus::Registry;
292 use std::str::FromStr;
293
294 use super::*;
295 use crate::events::init_all_struct_tags;
296 use crate::test_utils::get_test_sui_to_eth_bridge_action;
297 use crate::{events::tests::get_test_sui_event_and_action, sui_mock_client::SuiMockClient};
298
299 #[tokio::test]
300 async fn test_sui_watcher_task() {
301 let (
305 sui_events_tx,
306 sui_events_rx,
307 _eth_events_tx,
308 eth_events_rx,
309 sui_monitor_tx,
310 _sui_monitor_rx,
311 eth_monitor_tx,
312 _eth_monitor_rx,
313 sui_client,
314 store,
315 ) = setup();
316 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
317 let registry = Registry::new();
319 let metrics = Arc::new(BridgeMetrics::new(®istry));
320 let _handles = BridgeOrchestrator::new(
321 Arc::new(sui_client),
322 sui_events_rx,
323 eth_events_rx,
324 store.clone(),
325 sui_monitor_tx,
326 eth_monitor_tx,
327 metrics,
328 )
329 .run(executor)
330 .await;
331
332 let identifier = Identifier::from_str("test_sui_watcher_task").unwrap();
333 let (sui_event, mut bridge_action) = get_test_sui_event_and_action(identifier.clone());
334 bridge_action = bridge_action.update_to_token_transfer();
335 sui_events_tx
336 .send((identifier.clone(), vec![sui_event.clone()]))
337 .await
338 .unwrap();
339
340 let start = std::time::Instant::now();
341 assert_eq!(
343 executor_requested_action_rx.recv().await.unwrap(),
344 bridge_action.digest()
345 );
346 loop {
347 let actions = store.get_all_pending_actions();
348 if actions.is_empty() {
349 if start.elapsed().as_secs() > 5 {
350 panic!("Timed out waiting for action to be written to WAL");
351 }
352 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
353 continue;
354 }
355 assert_eq!(actions.len(), 1);
356 let action = actions.get(&bridge_action.digest()).unwrap();
357 assert_eq!(action, &bridge_action);
358 assert_eq!(
359 store.get_sui_event_cursors(&[identifier]).unwrap()[0].unwrap(),
360 sui_event.id,
361 );
362 break;
363 }
364 }
365
366 #[tokio::test]
367 async fn test_eth_watcher_task() {
368 let (
373 _sui_events_tx,
374 sui_events_rx,
375 eth_events_tx,
376 eth_events_rx,
377 sui_monitor_tx,
378 _sui_monitor_rx,
379 eth_monitor_tx,
380 _eth_monitor_rx,
381 sui_client,
382 store,
383 ) = setup();
384 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
385 let registry = Registry::new();
387 let metrics = Arc::new(BridgeMetrics::new(®istry));
388 let _handles = BridgeOrchestrator::new(
389 Arc::new(sui_client),
390 sui_events_rx,
391 eth_events_rx,
392 store.clone(),
393 sui_monitor_tx,
394 eth_monitor_tx,
395 metrics,
396 )
397 .run(executor)
398 .await;
399 let address = EthAddress::random();
400 let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
401 let log_index_in_tx = 10;
402 let log_block_num = log.block_number.unwrap().as_u64();
403 let eth_log = EthLog {
404 log: log.clone(),
405 tx_hash: log.transaction_hash.unwrap(),
406 block_number: log_block_num,
407 log_index_in_tx,
408 };
409 let end_block_num = log_block_num + 15;
410
411 eth_events_tx
412 .send((address, end_block_num, vec![eth_log.clone()]))
413 .await
414 .unwrap();
415
416 assert_eq!(
418 executor_requested_action_rx.recv().await.unwrap(),
419 bridge_action.digest()
420 );
421 let start = std::time::Instant::now();
422 loop {
423 let actions = store.get_all_pending_actions();
424 if actions.is_empty() {
425 if start.elapsed().as_secs() > 5 {
426 panic!("Timed out waiting for action to be written to WAL");
427 }
428 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
429 continue;
430 }
431 assert_eq!(actions.len(), 1);
432 let action = actions.get(&bridge_action.digest()).unwrap();
433 assert_eq!(action, &bridge_action);
434 assert_eq!(
435 store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
436 end_block_num,
437 );
438 break;
439 }
440 }
441
442 #[tokio::test]
443 async fn test_resume_actions_in_pending_logs() {
445 let (
446 _sui_events_tx,
447 sui_events_rx,
448 _eth_events_tx,
449 eth_events_rx,
450 sui_monitor_tx,
451 _sui_monitor_rx,
452 eth_monitor_tx,
453 _eth_monitor_rx,
454 sui_client,
455 store,
456 ) = setup();
457 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
458
459 let action1 = get_test_sui_to_eth_bridge_action(
460 None,
461 Some(0),
462 Some(99),
463 Some(10000),
464 None,
465 None,
466 None,
467 );
468
469 let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
470 store
471 .insert_pending_actions(&vec![action1.clone(), action2.clone()])
472 .unwrap();
473
474 let registry = Registry::new();
476 let metrics = Arc::new(BridgeMetrics::new(®istry));
477 let _handles = BridgeOrchestrator::new(
478 Arc::new(sui_client),
479 sui_events_rx,
480 eth_events_rx,
481 store.clone(),
482 sui_monitor_tx,
483 eth_monitor_tx,
484 metrics,
485 )
486 .run(executor)
487 .await;
488
489 let mut digests = std::collections::HashSet::new();
491 digests.insert(executor_requested_action_rx.recv().await.unwrap());
492 digests.insert(executor_requested_action_rx.recv().await.unwrap());
493 assert!(digests.contains(&action1.digest()));
494 assert!(digests.contains(&action2.digest()));
495 assert_eq!(digests.len(), 2);
496 }
497
498 #[allow(clippy::type_complexity)]
499 fn setup() -> (
500 mysten_metrics::metered_channel::Sender<(Identifier, Vec<SuiEvent>)>,
501 mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
502 mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
503 mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
504 mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
505 mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
506 mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
507 mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
508 SuiClient<SuiMockClient>,
509 Arc<BridgeOrchestratorTables>,
510 ) {
511 telemetry_subscribers::init_for_testing();
512 let registry = Registry::new();
513 mysten_metrics::init_metrics(®istry);
514
515 init_all_struct_tags();
516
517 let temp_dir = tempfile::tempdir().unwrap();
518 let store = BridgeOrchestratorTables::new(temp_dir.path());
519
520 let mock_client = SuiMockClient::default();
521 let sui_client = SuiClient::new_for_testing(mock_client.clone());
522
523 let (eth_events_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
524 100,
525 &mysten_metrics::get_metrics()
526 .unwrap()
527 .channel_inflight
528 .with_label_values(&["unit_test_eth_events_queue"]),
529 );
530
531 let (sui_events_tx, sui_events_rx) = mysten_metrics::metered_channel::channel(
532 100,
533 &mysten_metrics::get_metrics()
534 .unwrap()
535 .channel_inflight
536 .with_label_values(&["unit_test_sui_events_queue"]),
537 );
538 let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
539 10000,
540 &mysten_metrics::get_metrics()
541 .unwrap()
542 .channel_inflight
543 .with_label_values(&["sui_monitor_queue"]),
544 );
545 let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
546 10000,
547 &mysten_metrics::get_metrics()
548 .unwrap()
549 .channel_inflight
550 .with_label_values(&["eth_monitor_queue"]),
551 );
552 (
553 sui_events_tx,
554 sui_events_rx,
555 eth_events_tx,
556 eth_events_rx,
557 sui_monitor_tx,
558 sui_monitor_rx,
559 eth_monitor_tx,
560 eth_monitor_rx,
561 sui_client,
562 store,
563 )
564 }
565
566 struct MockExecutor {
568 requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
569 }
570
571 impl MockExecutor {
572 fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
573 let (tx, rx) = tokio::sync::broadcast::channel(100);
574 (
575 Self {
576 requested_transactions_tx: tx,
577 },
578 rx,
579 )
580 }
581 }
582
583 impl BridgeActionExecutorTrait for MockExecutor {
584 fn run(
585 self,
586 ) -> (
587 Vec<tokio::task::JoinHandle<()>>,
588 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
589 ) {
590 let (tx, mut rx) =
591 mysten_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
592 100,
593 &mysten_metrics::get_metrics()
594 .unwrap()
595 .channel_inflight
596 .with_label_values(&["unit_test_mock_executor"]),
597 );
598
599 let handles = tokio::spawn(async move {
600 while let Some(action) = rx.recv().await {
601 self.requested_transactions_tx
602 .send(action.0.digest())
603 .unwrap();
604 }
605 });
606 (vec![handles], tx)
607 }
608 }
609}