1use crate::abi::EthBridgeEvent;
10use crate::action_executor::{
11 BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
12};
13use crate::events::SuiBridgeEvent;
14use crate::metrics::BridgeMetrics;
15use crate::storage::BridgeOrchestratorTables;
16use crate::sui_client::{SuiClient, SuiClientInner};
17use crate::sui_syncer::GrpcSyncedEvents;
18use crate::types::EthLog;
19use alloy::primitives::Address as EthAddress;
20use mysten_common::ZipDebugEqIteratorExt;
21use mysten_metrics::spawn_logged_monitored_task;
22use std::sync::Arc;
23use tokio::task::JoinHandle;
24use tracing::{error, info};
25
26pub struct BridgeOrchestrator<C> {
27 _sui_client: Arc<SuiClient<C>>,
28 sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
29 eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
30 store: Arc<BridgeOrchestratorTables>,
31 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
32 metrics: Arc<BridgeMetrics>,
33}
34
35impl<C> BridgeOrchestrator<C>
36where
37 C: SuiClientInner + 'static,
38{
39 pub fn new(
40 sui_client: Arc<SuiClient<C>>,
41 sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
42 eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
43 store: Arc<BridgeOrchestratorTables>,
44 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
45 metrics: Arc<BridgeMetrics>,
46 ) -> Self {
47 Self {
48 _sui_client: sui_client,
49 sui_grpc_events_rx,
50 eth_events_rx,
51 store,
52 eth_monitor_tx,
53 metrics,
54 }
55 }
56
57 pub async fn run_with_grpc(
58 self,
59 bridge_action_executor: impl BridgeActionExecutorTrait,
60 ) -> Vec<JoinHandle<()>> {
61 tracing::info!("Starting BridgeOrchestrator with gRPC syncer");
62 let mut task_handles = vec![];
63 let store_clone = self.store.clone();
64
65 let (handles, executor_sender) = bridge_action_executor.run();
67 task_handles.extend(handles);
68 let executor_sender_clone = executor_sender.clone();
69 let metrics_clone = self.metrics.clone();
70
71 task_handles.push(spawn_logged_monitored_task!(Self::run_sui_grpc_watcher(
72 store_clone,
73 executor_sender_clone,
74 self.sui_grpc_events_rx,
75 metrics_clone,
76 )));
77 let store_clone = self.store.clone();
78
79 let actions = store_clone
81 .get_all_pending_actions()
82 .into_values()
83 .collect::<Vec<_>>();
84 for action in actions {
85 submit_to_executor(&executor_sender, action)
86 .await
87 .expect("Submit to executor should not fail");
88 }
89
90 let metrics_clone = self.metrics.clone();
91 task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
92 store_clone,
93 executor_sender,
94 self.eth_events_rx,
95 self.eth_monitor_tx,
96 metrics_clone,
97 )));
98
99 task_handles
100 }
101
102 pub async fn run_sui_grpc_watcher(
103 store: Arc<BridgeOrchestratorTables>,
104 executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
105 mut sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<GrpcSyncedEvents>,
106 metrics: Arc<BridgeMetrics>,
107 ) {
108 info!("Starting sui gRPC watcher task");
109 while let Some((last_seq_num, events)) = sui_grpc_events_rx.recv().await {
110 if events.is_empty() {
111 continue;
112 }
113 info!(
114 "Received {} Sui events: last_seq_num={}",
115 events.len(),
116 last_seq_num
117 );
118 metrics
119 .sui_watcher_received_events
120 .inc_by(events.len() as u64);
121
122 let mut actions = vec![];
123 for bridge_event in events {
124 info!("Observed Sui bridge event (gRPC): {:?}", bridge_event);
125
126 if let Some(mut action) = bridge_event.try_into_bridge_action() {
128 metrics.last_observed_actions_seq_num.with_label_values(&[
129 action.chain_id().to_string().as_str(),
130 action.action_type().to_string().as_str(),
131 ]);
132
133 action = action.update_to_token_transfer();
134 actions.push(action);
135 }
136 }
137
138 if !actions.is_empty() {
139 info!(
140 "Received {} actions from Sui gRPC: {:?}",
141 actions.len(),
142 actions
143 );
144 metrics
145 .sui_watcher_received_actions
146 .inc_by(actions.len() as u64);
147 store
149 .insert_pending_actions(&actions)
150 .expect("Store operation should not fail");
151 for action in actions {
152 submit_to_executor(&executor_tx, action)
153 .await
154 .expect("Submit to executor should not fail");
155 }
156 }
157
158 store
160 .update_sui_sequence_number_cursor(last_seq_num)
161 .expect("Store operation should not fail");
162 }
163 panic!("Sui gRPC event channel was closed unexpectedly");
164 }
165
166 async fn run_eth_watcher(
167 store: Arc<BridgeOrchestratorTables>,
168 executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
169 mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(
170 alloy::primitives::Address,
171 u64,
172 Vec<EthLog>,
173 )>,
174 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
175 metrics: Arc<BridgeMetrics>,
176 ) {
177 info!("Starting eth watcher task");
178 while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
179 if logs.is_empty() {
180 store
181 .update_eth_event_cursor(contract, end_block)
182 .expect("Store operation should not fail");
183 continue;
184 }
185
186 info!("Received {} Eth events", logs.len());
187 metrics
188 .eth_watcher_received_events
189 .inc_by(logs.len() as u64);
190
191 let bridge_events = logs
192 .iter()
193 .map(EthBridgeEvent::try_from_eth_log)
194 .collect::<Vec<_>>();
195
196 let mut actions = vec![];
197 for (log, opt_bridge_event) in logs.iter().zip_debug_eq(bridge_events) {
198 if opt_bridge_event.is_none() {
199 metrics.eth_watcher_unrecognized_events.inc();
201 error!("Eth event not recognized: {:?}", log);
202 continue;
203 }
204 let bridge_event = opt_bridge_event.unwrap();
206 info!("Observed Eth bridge event: {:?}", bridge_event);
207
208 eth_monitor_tx
210 .send(bridge_event.clone())
211 .await
212 .expect("Sending event to monitor channel should not fail");
213
214 match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
215 Ok(Some(action)) => {
216 metrics.last_observed_actions_seq_num.with_label_values(&[
217 action.chain_id().to_string().as_str(),
218 action.action_type().to_string().as_str(),
219 ]);
220 actions.push(action)
221 }
222 Ok(None) => {}
223 Err(e) => {
224 error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
225 }
226 }
227 }
228 if !actions.is_empty() {
229 info!("Received {} actions from Eth: {:?}", actions.len(), actions);
230 metrics
231 .eth_watcher_received_actions
232 .inc_by(actions.len() as u64);
233 store
235 .insert_pending_actions(&actions)
236 .expect("Store operation should not fail");
237 for action in actions {
239 submit_to_executor(&executor_tx, action)
240 .await
241 .expect("Submit to executor should not fail");
242 }
243 }
244
245 store
246 .update_eth_event_cursor(contract, end_block)
247 .expect("Store operation should not fail");
248 }
249 panic!("Eth event channel was closed");
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use crate::{
256 test_utils::{get_test_eth_to_sui_bridge_action, get_test_log_and_action},
257 types::BridgeActionDigest,
258 };
259 use alloy::primitives::TxHash;
260 use prometheus::Registry;
261 use std::str::FromStr;
262 use sui_types::Identifier;
263
264 use super::*;
265 use crate::events::SuiBridgeEvent;
266 use crate::events::init_all_struct_tags;
267 use crate::test_utils::get_test_sui_to_eth_bridge_action;
268 use crate::{events::tests::get_test_sui_event_and_action, sui_mock_client::SuiMockClient};
269
270 #[tokio::test]
271 async fn test_eth_watcher_task() {
272 let (
277 _sui_grpc_events_tx,
278 sui_grpc_events_rx,
279 eth_events_tx,
280 eth_events_rx,
281 eth_monitor_tx,
282 _eth_monitor_rx,
283 sui_client,
284 store,
285 ) = setup();
286 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
287 let registry = Registry::new();
289 let metrics = Arc::new(BridgeMetrics::new(®istry));
290 let _handles = BridgeOrchestrator::new(
291 Arc::new(sui_client),
292 sui_grpc_events_rx,
293 eth_events_rx,
294 store.clone(),
295 eth_monitor_tx,
296 metrics,
297 )
298 .run_with_grpc(executor)
299 .await;
300 let address = EthAddress::random();
301 let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
302 let log_index_in_tx = 10;
303 let log_block_num = log.block_number.unwrap();
304 let eth_log = EthLog {
305 log: log.clone(),
306 tx_hash: log.transaction_hash.unwrap(),
307 block_number: log_block_num,
308 log_index_in_tx,
309 };
310 let end_block_num = log_block_num + 15;
311
312 eth_events_tx
313 .send((address, end_block_num, vec![eth_log.clone()]))
314 .await
315 .unwrap();
316
317 assert_eq!(
319 executor_requested_action_rx.recv().await.unwrap(),
320 bridge_action.digest()
321 );
322 let start = std::time::Instant::now();
323 loop {
324 let actions = store.get_all_pending_actions();
325 if actions.is_empty() {
326 if start.elapsed().as_secs() > 5 {
327 panic!("Timed out waiting for action to be written to WAL");
328 }
329 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
330 continue;
331 }
332 assert_eq!(actions.len(), 1);
333 let action = actions.get(&bridge_action.digest()).unwrap();
334 assert_eq!(action, &bridge_action);
335 assert_eq!(
336 store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
337 end_block_num,
338 );
339 break;
340 }
341 }
342
343 #[tokio::test]
344 async fn test_sui_grpc_watcher_task() {
345 let (
346 sui_grpc_events_tx,
347 sui_grpc_events_rx,
348 _eth_events_tx,
349 eth_events_rx,
350 eth_monitor_tx,
351 _eth_monitor_rx,
352 sui_client,
353 store,
354 ) = setup();
355 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
356 let registry = Registry::new();
358 let metrics = Arc::new(BridgeMetrics::new(®istry));
359 let _handles = BridgeOrchestrator::new(
360 Arc::new(sui_client),
361 sui_grpc_events_rx,
362 eth_events_rx,
363 store.clone(),
364 eth_monitor_tx,
365 metrics,
366 )
367 .run_with_grpc(executor)
368 .await;
369
370 let identifier = Identifier::from_str("test_sui_grpc_watcher_task").unwrap();
371 let (sui_event, mut bridge_action) = get_test_sui_event_and_action(identifier);
372 bridge_action = bridge_action.update_to_token_transfer();
373
374 let bridge_event = SuiBridgeEvent::try_from_sui_event(&sui_event)
376 .unwrap()
377 .unwrap();
378
379 let last_seq_num = 42u64;
380 sui_grpc_events_tx
381 .send((last_seq_num, vec![bridge_event]))
382 .await
383 .unwrap();
384
385 let start = std::time::Instant::now();
386 assert_eq!(
388 executor_requested_action_rx.recv().await.unwrap(),
389 bridge_action.digest()
390 );
391 loop {
392 let actions = store.get_all_pending_actions();
393 if actions.is_empty() {
394 if start.elapsed().as_secs() > 5 {
395 panic!("Timed out waiting for action to be written to WAL");
396 }
397 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
398 continue;
399 }
400 assert_eq!(actions.len(), 1);
401 let action = actions.get(&bridge_action.digest()).unwrap();
402 assert_eq!(action, &bridge_action);
403 assert_eq!(
405 store.get_sui_sequence_number_cursor().unwrap().unwrap(),
406 last_seq_num,
407 );
408 break;
409 }
410 }
411
412 #[tokio::test]
413 async fn test_resume_actions_in_pending_logs_with_grpc() {
415 let (
416 _sui_grpc_events_tx,
417 sui_grpc_events_rx,
418 _eth_events_tx,
419 eth_events_rx,
420 eth_monitor_tx,
421 _eth_monitor_rx,
422 sui_client,
423 store,
424 ) = setup();
425 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
426
427 let action1 = get_test_sui_to_eth_bridge_action(
428 None,
429 Some(0),
430 Some(99),
431 Some(10000),
432 None,
433 None,
434 None,
435 );
436
437 let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
438 store
439 .insert_pending_actions(&[action1.clone(), action2.clone()])
440 .unwrap();
441
442 let registry = Registry::new();
444 let metrics = Arc::new(BridgeMetrics::new(®istry));
445 let _handles = BridgeOrchestrator::new(
446 Arc::new(sui_client),
447 sui_grpc_events_rx,
448 eth_events_rx,
449 store.clone(),
450 eth_monitor_tx,
451 metrics,
452 )
453 .run_with_grpc(executor)
454 .await;
455
456 let mut digests = std::collections::HashSet::new();
458 digests.insert(executor_requested_action_rx.recv().await.unwrap());
459 digests.insert(executor_requested_action_rx.recv().await.unwrap());
460 assert!(digests.contains(&action1.digest()));
461 assert!(digests.contains(&action2.digest()));
462 assert_eq!(digests.len(), 2);
463 }
464
465 #[tokio::test]
466 async fn test_resume_actions_in_pending_logs() {
468 let (
469 _sui_grpc_events_tx,
470 sui_grpc_events_rx,
471 _eth_events_tx,
472 eth_events_rx,
473 eth_monitor_tx,
474 _eth_monitor_rx,
475 sui_client,
476 store,
477 ) = setup();
478 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
479
480 let action1 = get_test_sui_to_eth_bridge_action(
481 None,
482 Some(0),
483 Some(99),
484 Some(10000),
485 None,
486 None,
487 None,
488 );
489
490 let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
491 store
492 .insert_pending_actions(&[action1.clone(), action2.clone()])
493 .unwrap();
494
495 let registry = Registry::new();
497 let metrics = Arc::new(BridgeMetrics::new(®istry));
498 let _handles = BridgeOrchestrator::new(
499 Arc::new(sui_client),
500 sui_grpc_events_rx,
501 eth_events_rx,
502 store.clone(),
503 eth_monitor_tx,
504 metrics,
505 )
506 .run_with_grpc(executor)
507 .await;
508
509 let mut digests = std::collections::HashSet::new();
511 digests.insert(executor_requested_action_rx.recv().await.unwrap());
512 digests.insert(executor_requested_action_rx.recv().await.unwrap());
513 assert!(digests.contains(&action1.digest()));
514 assert!(digests.contains(&action2.digest()));
515 assert_eq!(digests.len(), 2);
516 }
517
518 #[allow(clippy::type_complexity)]
519 fn setup() -> (
520 mysten_metrics::metered_channel::Sender<(u64, Vec<SuiBridgeEvent>)>,
521 mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
522 mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
523 mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
524 mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
525 mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
526 SuiClient<SuiMockClient>,
527 Arc<BridgeOrchestratorTables>,
528 ) {
529 telemetry_subscribers::init_for_testing();
530 let registry = Registry::new();
531 mysten_metrics::init_metrics(®istry);
532
533 init_all_struct_tags();
534
535 let temp_dir = tempfile::tempdir().unwrap();
536 let store = BridgeOrchestratorTables::new(temp_dir.path());
537
538 let mock_client = SuiMockClient::default();
539 let sui_client = SuiClient::new_for_testing(mock_client.clone());
540
541 let (eth_events_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
542 100,
543 &mysten_metrics::get_metrics()
544 .unwrap()
545 .channel_inflight
546 .with_label_values(&["unit_test_eth_events_queue"]),
547 );
548
549 let (sui_grpc_events_tx, sui_grpc_events_rx) = mysten_metrics::metered_channel::channel(
550 100,
551 &mysten_metrics::get_metrics()
552 .unwrap()
553 .channel_inflight
554 .with_label_values(&["unit_test_sui_events_queue"]),
555 );
556 let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
557 10000,
558 &mysten_metrics::get_metrics()
559 .unwrap()
560 .channel_inflight
561 .with_label_values(&["eth_monitor_queue"]),
562 );
563 (
564 sui_grpc_events_tx,
565 sui_grpc_events_rx,
566 eth_events_tx,
567 eth_events_rx,
568 eth_monitor_tx,
569 eth_monitor_rx,
570 sui_client,
571 store,
572 )
573 }
574
575 struct MockExecutor {
577 requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
578 }
579
580 impl MockExecutor {
581 fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
582 let (tx, rx) = tokio::sync::broadcast::channel(100);
583 (
584 Self {
585 requested_transactions_tx: tx,
586 },
587 rx,
588 )
589 }
590 }
591
592 impl BridgeActionExecutorTrait for MockExecutor {
593 fn run(
594 self,
595 ) -> (
596 Vec<tokio::task::JoinHandle<()>>,
597 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
598 ) {
599 let (tx, mut rx) =
600 mysten_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
601 100,
602 &mysten_metrics::get_metrics()
603 .unwrap()
604 .channel_inflight
605 .with_label_values(&["unit_test_mock_executor"]),
606 );
607
608 let handles = tokio::spawn(async move {
609 while let Some(action) = rx.recv().await {
610 self.requested_transactions_tx
611 .send(action.0.digest())
612 .unwrap();
613 }
614 });
615 (vec![handles], tx)
616 }
617 }
618}