1use crate::abi::EthBridgeEvent;
10use crate::action_executor::{
11 BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
12};
13use crate::events::SuiBridgeEvent;
14use crate::metrics::BridgeMetrics;
15use crate::storage::BridgeOrchestratorTables;
16use crate::sui_client::{SuiClient, SuiClientInner};
17use crate::sui_syncer::GrpcSyncedEvents;
18use crate::types::EthLog;
19use alloy::primitives::Address as EthAddress;
20use mysten_metrics::spawn_logged_monitored_task;
21use std::sync::Arc;
22use tokio::task::JoinHandle;
23use tracing::{error, info};
24
25pub struct BridgeOrchestrator<C> {
26 _sui_client: Arc<SuiClient<C>>,
27 sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
28 eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
29 store: Arc<BridgeOrchestratorTables>,
30 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
31 metrics: Arc<BridgeMetrics>,
32}
33
34impl<C> BridgeOrchestrator<C>
35where
36 C: SuiClientInner + 'static,
37{
38 pub fn new(
39 sui_client: Arc<SuiClient<C>>,
40 sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
41 eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
42 store: Arc<BridgeOrchestratorTables>,
43 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
44 metrics: Arc<BridgeMetrics>,
45 ) -> Self {
46 Self {
47 _sui_client: sui_client,
48 sui_grpc_events_rx,
49 eth_events_rx,
50 store,
51 eth_monitor_tx,
52 metrics,
53 }
54 }
55
56 pub async fn run_with_grpc(
57 self,
58 bridge_action_executor: impl BridgeActionExecutorTrait,
59 ) -> Vec<JoinHandle<()>> {
60 tracing::info!("Starting BridgeOrchestrator with gRPC syncer");
61 let mut task_handles = vec![];
62 let store_clone = self.store.clone();
63
64 let (handles, executor_sender) = bridge_action_executor.run();
66 task_handles.extend(handles);
67 let executor_sender_clone = executor_sender.clone();
68 let metrics_clone = self.metrics.clone();
69
70 task_handles.push(spawn_logged_monitored_task!(Self::run_sui_grpc_watcher(
71 store_clone,
72 executor_sender_clone,
73 self.sui_grpc_events_rx,
74 metrics_clone,
75 )));
76 let store_clone = self.store.clone();
77
78 let actions = store_clone
80 .get_all_pending_actions()
81 .into_values()
82 .collect::<Vec<_>>();
83 for action in actions {
84 submit_to_executor(&executor_sender, action)
85 .await
86 .expect("Submit to executor should not fail");
87 }
88
89 let metrics_clone = self.metrics.clone();
90 task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
91 store_clone,
92 executor_sender,
93 self.eth_events_rx,
94 self.eth_monitor_tx,
95 metrics_clone,
96 )));
97
98 task_handles
99 }
100
101 pub async fn run_sui_grpc_watcher(
102 store: Arc<BridgeOrchestratorTables>,
103 executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
104 mut sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<GrpcSyncedEvents>,
105 metrics: Arc<BridgeMetrics>,
106 ) {
107 info!("Starting sui gRPC watcher task");
108 while let Some((last_seq_num, events)) = sui_grpc_events_rx.recv().await {
109 if events.is_empty() {
110 continue;
111 }
112 info!(
113 "Received {} Sui events: last_seq_num={}",
114 events.len(),
115 last_seq_num
116 );
117 metrics
118 .sui_watcher_received_events
119 .inc_by(events.len() as u64);
120
121 let mut actions = vec![];
122 for bridge_event in events {
123 info!("Observed Sui bridge event (gRPC): {:?}", bridge_event);
124
125 if let Some(mut action) = bridge_event.try_into_bridge_action() {
127 metrics.last_observed_actions_seq_num.with_label_values(&[
128 action.chain_id().to_string().as_str(),
129 action.action_type().to_string().as_str(),
130 ]);
131
132 action = action.update_to_token_transfer();
133 actions.push(action);
134 }
135 }
136
137 if !actions.is_empty() {
138 info!(
139 "Received {} actions from Sui gRPC: {:?}",
140 actions.len(),
141 actions
142 );
143 metrics
144 .sui_watcher_received_actions
145 .inc_by(actions.len() as u64);
146 store
148 .insert_pending_actions(&actions)
149 .expect("Store operation should not fail");
150 for action in actions {
151 submit_to_executor(&executor_tx, action)
152 .await
153 .expect("Submit to executor should not fail");
154 }
155 }
156
157 store
159 .update_sui_sequence_number_cursor(last_seq_num)
160 .expect("Store operation should not fail");
161 }
162 panic!("Sui gRPC event channel was closed unexpectedly");
163 }
164
165 async fn run_eth_watcher(
166 store: Arc<BridgeOrchestratorTables>,
167 executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
168 mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(
169 alloy::primitives::Address,
170 u64,
171 Vec<EthLog>,
172 )>,
173 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
174 metrics: Arc<BridgeMetrics>,
175 ) {
176 info!("Starting eth watcher task");
177 while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
178 if logs.is_empty() {
179 store
180 .update_eth_event_cursor(contract, end_block)
181 .expect("Store operation should not fail");
182 continue;
183 }
184
185 info!("Received {} Eth events", logs.len());
186 metrics
187 .eth_watcher_received_events
188 .inc_by(logs.len() as u64);
189
190 let bridge_events = logs
191 .iter()
192 .map(EthBridgeEvent::try_from_eth_log)
193 .collect::<Vec<_>>();
194
195 let mut actions = vec![];
196 for (log, opt_bridge_event) in logs.iter().zip(bridge_events) {
197 if opt_bridge_event.is_none() {
198 metrics.eth_watcher_unrecognized_events.inc();
200 error!("Eth event not recognized: {:?}", log);
201 continue;
202 }
203 let bridge_event = opt_bridge_event.unwrap();
205 info!("Observed Eth bridge event: {:?}", bridge_event);
206
207 eth_monitor_tx
209 .send(bridge_event.clone())
210 .await
211 .expect("Sending event to monitor channel should not fail");
212
213 match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
214 Ok(Some(action)) => {
215 metrics.last_observed_actions_seq_num.with_label_values(&[
216 action.chain_id().to_string().as_str(),
217 action.action_type().to_string().as_str(),
218 ]);
219 actions.push(action)
220 }
221 Ok(None) => {}
222 Err(e) => {
223 error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
224 }
225 }
226 }
227 if !actions.is_empty() {
228 info!("Received {} actions from Eth: {:?}", actions.len(), actions);
229 metrics
230 .eth_watcher_received_actions
231 .inc_by(actions.len() as u64);
232 store
234 .insert_pending_actions(&actions)
235 .expect("Store operation should not fail");
236 for action in actions {
238 submit_to_executor(&executor_tx, action)
239 .await
240 .expect("Submit to executor should not fail");
241 }
242 }
243
244 store
245 .update_eth_event_cursor(contract, end_block)
246 .expect("Store operation should not fail");
247 }
248 panic!("Eth event channel was closed");
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use crate::{
255 test_utils::{get_test_eth_to_sui_bridge_action, get_test_log_and_action},
256 types::BridgeActionDigest,
257 };
258 use alloy::primitives::TxHash;
259 use prometheus::Registry;
260 use std::str::FromStr;
261 use sui_types::Identifier;
262
263 use super::*;
264 use crate::events::SuiBridgeEvent;
265 use crate::events::init_all_struct_tags;
266 use crate::test_utils::get_test_sui_to_eth_bridge_action;
267 use crate::{events::tests::get_test_sui_event_and_action, sui_mock_client::SuiMockClient};
268
269 #[tokio::test]
270 async fn test_eth_watcher_task() {
271 let (
276 _sui_grpc_events_tx,
277 sui_grpc_events_rx,
278 eth_events_tx,
279 eth_events_rx,
280 eth_monitor_tx,
281 _eth_monitor_rx,
282 sui_client,
283 store,
284 ) = setup();
285 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
286 let registry = Registry::new();
288 let metrics = Arc::new(BridgeMetrics::new(®istry));
289 let _handles = BridgeOrchestrator::new(
290 Arc::new(sui_client),
291 sui_grpc_events_rx,
292 eth_events_rx,
293 store.clone(),
294 eth_monitor_tx,
295 metrics,
296 )
297 .run_with_grpc(executor)
298 .await;
299 let address = EthAddress::random();
300 let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
301 let log_index_in_tx = 10;
302 let log_block_num = log.block_number.unwrap();
303 let eth_log = EthLog {
304 log: log.clone(),
305 tx_hash: log.transaction_hash.unwrap(),
306 block_number: log_block_num,
307 log_index_in_tx,
308 };
309 let end_block_num = log_block_num + 15;
310
311 eth_events_tx
312 .send((address, end_block_num, vec![eth_log.clone()]))
313 .await
314 .unwrap();
315
316 assert_eq!(
318 executor_requested_action_rx.recv().await.unwrap(),
319 bridge_action.digest()
320 );
321 let start = std::time::Instant::now();
322 loop {
323 let actions = store.get_all_pending_actions();
324 if actions.is_empty() {
325 if start.elapsed().as_secs() > 5 {
326 panic!("Timed out waiting for action to be written to WAL");
327 }
328 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
329 continue;
330 }
331 assert_eq!(actions.len(), 1);
332 let action = actions.get(&bridge_action.digest()).unwrap();
333 assert_eq!(action, &bridge_action);
334 assert_eq!(
335 store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
336 end_block_num,
337 );
338 break;
339 }
340 }
341
342 #[tokio::test]
343 async fn test_sui_grpc_watcher_task() {
344 let (
345 sui_grpc_events_tx,
346 sui_grpc_events_rx,
347 _eth_events_tx,
348 eth_events_rx,
349 eth_monitor_tx,
350 _eth_monitor_rx,
351 sui_client,
352 store,
353 ) = setup();
354 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
355 let registry = Registry::new();
357 let metrics = Arc::new(BridgeMetrics::new(®istry));
358 let _handles = BridgeOrchestrator::new(
359 Arc::new(sui_client),
360 sui_grpc_events_rx,
361 eth_events_rx,
362 store.clone(),
363 eth_monitor_tx,
364 metrics,
365 )
366 .run_with_grpc(executor)
367 .await;
368
369 let identifier = Identifier::from_str("test_sui_grpc_watcher_task").unwrap();
370 let (sui_event, mut bridge_action) = get_test_sui_event_and_action(identifier);
371 bridge_action = bridge_action.update_to_token_transfer();
372
373 let bridge_event = SuiBridgeEvent::try_from_sui_event(&sui_event)
375 .unwrap()
376 .unwrap();
377
378 let last_seq_num = 42u64;
379 sui_grpc_events_tx
380 .send((last_seq_num, vec![bridge_event]))
381 .await
382 .unwrap();
383
384 let start = std::time::Instant::now();
385 assert_eq!(
387 executor_requested_action_rx.recv().await.unwrap(),
388 bridge_action.digest()
389 );
390 loop {
391 let actions = store.get_all_pending_actions();
392 if actions.is_empty() {
393 if start.elapsed().as_secs() > 5 {
394 panic!("Timed out waiting for action to be written to WAL");
395 }
396 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
397 continue;
398 }
399 assert_eq!(actions.len(), 1);
400 let action = actions.get(&bridge_action.digest()).unwrap();
401 assert_eq!(action, &bridge_action);
402 assert_eq!(
404 store.get_sui_sequence_number_cursor().unwrap().unwrap(),
405 last_seq_num,
406 );
407 break;
408 }
409 }
410
411 #[tokio::test]
412 async fn test_resume_actions_in_pending_logs_with_grpc() {
414 let (
415 _sui_grpc_events_tx,
416 sui_grpc_events_rx,
417 _eth_events_tx,
418 eth_events_rx,
419 eth_monitor_tx,
420 _eth_monitor_rx,
421 sui_client,
422 store,
423 ) = setup();
424 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
425
426 let action1 = get_test_sui_to_eth_bridge_action(
427 None,
428 Some(0),
429 Some(99),
430 Some(10000),
431 None,
432 None,
433 None,
434 );
435
436 let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
437 store
438 .insert_pending_actions(&[action1.clone(), action2.clone()])
439 .unwrap();
440
441 let registry = Registry::new();
443 let metrics = Arc::new(BridgeMetrics::new(®istry));
444 let _handles = BridgeOrchestrator::new(
445 Arc::new(sui_client),
446 sui_grpc_events_rx,
447 eth_events_rx,
448 store.clone(),
449 eth_monitor_tx,
450 metrics,
451 )
452 .run_with_grpc(executor)
453 .await;
454
455 let mut digests = std::collections::HashSet::new();
457 digests.insert(executor_requested_action_rx.recv().await.unwrap());
458 digests.insert(executor_requested_action_rx.recv().await.unwrap());
459 assert!(digests.contains(&action1.digest()));
460 assert!(digests.contains(&action2.digest()));
461 assert_eq!(digests.len(), 2);
462 }
463
464 #[tokio::test]
465 async fn test_resume_actions_in_pending_logs() {
467 let (
468 _sui_grpc_events_tx,
469 sui_grpc_events_rx,
470 _eth_events_tx,
471 eth_events_rx,
472 eth_monitor_tx,
473 _eth_monitor_rx,
474 sui_client,
475 store,
476 ) = setup();
477 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
478
479 let action1 = get_test_sui_to_eth_bridge_action(
480 None,
481 Some(0),
482 Some(99),
483 Some(10000),
484 None,
485 None,
486 None,
487 );
488
489 let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
490 store
491 .insert_pending_actions(&[action1.clone(), action2.clone()])
492 .unwrap();
493
494 let registry = Registry::new();
496 let metrics = Arc::new(BridgeMetrics::new(®istry));
497 let _handles = BridgeOrchestrator::new(
498 Arc::new(sui_client),
499 sui_grpc_events_rx,
500 eth_events_rx,
501 store.clone(),
502 eth_monitor_tx,
503 metrics,
504 )
505 .run_with_grpc(executor)
506 .await;
507
508 let mut digests = std::collections::HashSet::new();
510 digests.insert(executor_requested_action_rx.recv().await.unwrap());
511 digests.insert(executor_requested_action_rx.recv().await.unwrap());
512 assert!(digests.contains(&action1.digest()));
513 assert!(digests.contains(&action2.digest()));
514 assert_eq!(digests.len(), 2);
515 }
516
517 #[allow(clippy::type_complexity)]
518 fn setup() -> (
519 mysten_metrics::metered_channel::Sender<(u64, Vec<SuiBridgeEvent>)>,
520 mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
521 mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
522 mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
523 mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
524 mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
525 SuiClient<SuiMockClient>,
526 Arc<BridgeOrchestratorTables>,
527 ) {
528 telemetry_subscribers::init_for_testing();
529 let registry = Registry::new();
530 mysten_metrics::init_metrics(®istry);
531
532 init_all_struct_tags();
533
534 let temp_dir = tempfile::tempdir().unwrap();
535 let store = BridgeOrchestratorTables::new(temp_dir.path());
536
537 let mock_client = SuiMockClient::default();
538 let sui_client = SuiClient::new_for_testing(mock_client.clone());
539
540 let (eth_events_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
541 100,
542 &mysten_metrics::get_metrics()
543 .unwrap()
544 .channel_inflight
545 .with_label_values(&["unit_test_eth_events_queue"]),
546 );
547
548 let (sui_grpc_events_tx, sui_grpc_events_rx) = mysten_metrics::metered_channel::channel(
549 100,
550 &mysten_metrics::get_metrics()
551 .unwrap()
552 .channel_inflight
553 .with_label_values(&["unit_test_sui_events_queue"]),
554 );
555 let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
556 10000,
557 &mysten_metrics::get_metrics()
558 .unwrap()
559 .channel_inflight
560 .with_label_values(&["eth_monitor_queue"]),
561 );
562 (
563 sui_grpc_events_tx,
564 sui_grpc_events_rx,
565 eth_events_tx,
566 eth_events_rx,
567 eth_monitor_tx,
568 eth_monitor_rx,
569 sui_client,
570 store,
571 )
572 }
573
574 struct MockExecutor {
576 requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
577 }
578
579 impl MockExecutor {
580 fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
581 let (tx, rx) = tokio::sync::broadcast::channel(100);
582 (
583 Self {
584 requested_transactions_tx: tx,
585 },
586 rx,
587 )
588 }
589 }
590
591 impl BridgeActionExecutorTrait for MockExecutor {
592 fn run(
593 self,
594 ) -> (
595 Vec<tokio::task::JoinHandle<()>>,
596 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
597 ) {
598 let (tx, mut rx) =
599 mysten_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
600 100,
601 &mysten_metrics::get_metrics()
602 .unwrap()
603 .channel_inflight
604 .with_label_values(&["unit_test_mock_executor"]),
605 );
606
607 let handles = tokio::spawn(async move {
608 while let Some(action) = rx.recv().await {
609 self.requested_transactions_tx
610 .send(action.0.digest())
611 .unwrap();
612 }
613 });
614 (vec![handles], tx)
615 }
616 }
617}