sui_bridge/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `BridgeOrchestrator` is the component that:
5//! 1. monitors Sui and Ethereum events with the help of `SuiSyncer` and `EthSyncer`
6//! 2. updates WAL table and cursor tables
7//! 2. hands actions to `BridgeExecutor` for execution
8
9use crate::abi::EthBridgeEvent;
10use crate::action_executor::{
11    BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
12};
13use crate::events::SuiBridgeEvent;
14use crate::metrics::BridgeMetrics;
15use crate::storage::BridgeOrchestratorTables;
16use crate::sui_client::{SuiClient, SuiClientInner};
17use crate::sui_syncer::GrpcSyncedEvents;
18use crate::types::EthLog;
19use alloy::primitives::Address as EthAddress;
20use mysten_metrics::spawn_logged_monitored_task;
21use std::sync::Arc;
22use tokio::task::JoinHandle;
23use tracing::{error, info};
24
25pub struct BridgeOrchestrator<C> {
26    _sui_client: Arc<SuiClient<C>>,
27    sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
28    eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
29    store: Arc<BridgeOrchestratorTables>,
30    eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
31    metrics: Arc<BridgeMetrics>,
32}
33
34impl<C> BridgeOrchestrator<C>
35where
36    C: SuiClientInner + 'static,
37{
38    pub fn new(
39        sui_client: Arc<SuiClient<C>>,
40        sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
41        eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
42        store: Arc<BridgeOrchestratorTables>,
43        eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
44        metrics: Arc<BridgeMetrics>,
45    ) -> Self {
46        Self {
47            _sui_client: sui_client,
48            sui_grpc_events_rx,
49            eth_events_rx,
50            store,
51            eth_monitor_tx,
52            metrics,
53        }
54    }
55
56    pub async fn run_with_grpc(
57        self,
58        bridge_action_executor: impl BridgeActionExecutorTrait,
59    ) -> Vec<JoinHandle<()>> {
60        tracing::info!("Starting BridgeOrchestrator with gRPC syncer");
61        let mut task_handles = vec![];
62        let store_clone = self.store.clone();
63
64        // Spawn BridgeActionExecutor
65        let (handles, executor_sender) = bridge_action_executor.run();
66        task_handles.extend(handles);
67        let executor_sender_clone = executor_sender.clone();
68        let metrics_clone = self.metrics.clone();
69
70        task_handles.push(spawn_logged_monitored_task!(Self::run_sui_grpc_watcher(
71            store_clone,
72            executor_sender_clone,
73            self.sui_grpc_events_rx,
74            metrics_clone,
75        )));
76        let store_clone = self.store.clone();
77
78        // Re-submit pending actions to executor
79        let actions = store_clone
80            .get_all_pending_actions()
81            .into_values()
82            .collect::<Vec<_>>();
83        for action in actions {
84            submit_to_executor(&executor_sender, action)
85                .await
86                .expect("Submit to executor should not fail");
87        }
88
89        let metrics_clone = self.metrics.clone();
90        task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
91            store_clone,
92            executor_sender,
93            self.eth_events_rx,
94            self.eth_monitor_tx,
95            metrics_clone,
96        )));
97
98        task_handles
99    }
100
101    pub async fn run_sui_grpc_watcher(
102        store: Arc<BridgeOrchestratorTables>,
103        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
104        mut sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<GrpcSyncedEvents>,
105        metrics: Arc<BridgeMetrics>,
106    ) {
107        info!("Starting sui gRPC watcher task");
108        while let Some((last_seq_num, events)) = sui_grpc_events_rx.recv().await {
109            if events.is_empty() {
110                continue;
111            }
112            info!(
113                "Received {} Sui events: last_seq_num={}",
114                events.len(),
115                last_seq_num
116            );
117            metrics
118                .sui_watcher_received_events
119                .inc_by(events.len() as u64);
120
121            let mut actions = vec![];
122            for bridge_event in events {
123                info!("Observed Sui bridge event (gRPC): {:?}", bridge_event);
124
125                // Convert to action using the same flow as JSON-RPC watcher
126                if let Some(mut action) = bridge_event.try_into_bridge_action() {
127                    metrics.last_observed_actions_seq_num.with_label_values(&[
128                        action.chain_id().to_string().as_str(),
129                        action.action_type().to_string().as_str(),
130                    ]);
131
132                    action = action.update_to_token_transfer();
133                    actions.push(action);
134                }
135            }
136
137            if !actions.is_empty() {
138                info!(
139                    "Received {} actions from Sui gRPC: {:?}",
140                    actions.len(),
141                    actions
142                );
143                metrics
144                    .sui_watcher_received_actions
145                    .inc_by(actions.len() as u64);
146                // Write action to pending WAL
147                store
148                    .insert_pending_actions(&actions)
149                    .expect("Store operation should not fail");
150                for action in actions {
151                    submit_to_executor(&executor_tx, action)
152                        .await
153                        .expect("Submit to executor should not fail");
154                }
155            }
156
157            // Store the sequence number cursor
158            store
159                .update_sui_sequence_number_cursor(last_seq_num)
160                .expect("Store operation should not fail");
161        }
162        panic!("Sui gRPC event channel was closed unexpectedly");
163    }
164
165    async fn run_eth_watcher(
166        store: Arc<BridgeOrchestratorTables>,
167        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
168        mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(
169            alloy::primitives::Address,
170            u64,
171            Vec<EthLog>,
172        )>,
173        eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
174        metrics: Arc<BridgeMetrics>,
175    ) {
176        info!("Starting eth watcher task");
177        while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
178            if logs.is_empty() {
179                store
180                    .update_eth_event_cursor(contract, end_block)
181                    .expect("Store operation should not fail");
182                continue;
183            }
184
185            info!("Received {} Eth events", logs.len());
186            metrics
187                .eth_watcher_received_events
188                .inc_by(logs.len() as u64);
189
190            let bridge_events = logs
191                .iter()
192                .map(EthBridgeEvent::try_from_eth_log)
193                .collect::<Vec<_>>();
194
195            let mut actions = vec![];
196            for (log, opt_bridge_event) in logs.iter().zip(bridge_events) {
197                if opt_bridge_event.is_none() {
198                    // TODO: we probably should not miss any events, log for now.
199                    metrics.eth_watcher_unrecognized_events.inc();
200                    error!("Eth event not recognized: {:?}", log);
201                    continue;
202                }
203                // Unwrap safe: checked above
204                let bridge_event = opt_bridge_event.unwrap();
205                info!("Observed Eth bridge event: {:?}", bridge_event);
206
207                // Send event to monitor
208                eth_monitor_tx
209                    .send(bridge_event.clone())
210                    .await
211                    .expect("Sending event to monitor channel should not fail");
212
213                match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
214                    Ok(Some(action)) => {
215                        metrics.last_observed_actions_seq_num.with_label_values(&[
216                            action.chain_id().to_string().as_str(),
217                            action.action_type().to_string().as_str(),
218                        ]);
219                        actions.push(action)
220                    }
221                    Ok(None) => {}
222                    Err(e) => {
223                        error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
224                    }
225                }
226            }
227            if !actions.is_empty() {
228                info!("Received {} actions from Eth: {:?}", actions.len(), actions);
229                metrics
230                    .eth_watcher_received_actions
231                    .inc_by(actions.len() as u64);
232                // Write action to pending WAL
233                store
234                    .insert_pending_actions(&actions)
235                    .expect("Store operation should not fail");
236                // Execution will remove the pending actions from DB when the action is completed.
237                for action in actions {
238                    submit_to_executor(&executor_tx, action)
239                        .await
240                        .expect("Submit to executor should not fail");
241                }
242            }
243
244            store
245                .update_eth_event_cursor(contract, end_block)
246                .expect("Store operation should not fail");
247        }
248        panic!("Eth event channel was closed");
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use crate::{
255        test_utils::{get_test_eth_to_sui_bridge_action, get_test_log_and_action},
256        types::BridgeActionDigest,
257    };
258    use alloy::primitives::TxHash;
259    use prometheus::Registry;
260    use std::str::FromStr;
261    use sui_types::Identifier;
262
263    use super::*;
264    use crate::events::SuiBridgeEvent;
265    use crate::events::init_all_struct_tags;
266    use crate::test_utils::get_test_sui_to_eth_bridge_action;
267    use crate::{events::tests::get_test_sui_event_and_action, sui_mock_client::SuiMockClient};
268
269    #[tokio::test]
270    async fn test_eth_watcher_task() {
271        // Note: this test may fail because of the following reasons:
272        // 1. Log and BridgeAction returned from `get_test_log_and_action` are not in sync
273        // 2. Log returned from `get_test_log_and_action` is not parseable log (not abigen!, check abi.rs)
274
275        let (
276            _sui_grpc_events_tx,
277            sui_grpc_events_rx,
278            eth_events_tx,
279            eth_events_rx,
280            eth_monitor_tx,
281            _eth_monitor_rx,
282            sui_client,
283            store,
284        ) = setup();
285        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
286        // start orchestrator
287        let registry = Registry::new();
288        let metrics = Arc::new(BridgeMetrics::new(&registry));
289        let _handles = BridgeOrchestrator::new(
290            Arc::new(sui_client),
291            sui_grpc_events_rx,
292            eth_events_rx,
293            store.clone(),
294            eth_monitor_tx,
295            metrics,
296        )
297        .run_with_grpc(executor)
298        .await;
299        let address = EthAddress::random();
300        let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
301        let log_index_in_tx = 10;
302        let log_block_num = log.block_number.unwrap();
303        let eth_log = EthLog {
304            log: log.clone(),
305            tx_hash: log.transaction_hash.unwrap(),
306            block_number: log_block_num,
307            log_index_in_tx,
308        };
309        let end_block_num = log_block_num + 15;
310
311        eth_events_tx
312            .send((address, end_block_num, vec![eth_log.clone()]))
313            .await
314            .unwrap();
315
316        // Executor should have received the action
317        assert_eq!(
318            executor_requested_action_rx.recv().await.unwrap(),
319            bridge_action.digest()
320        );
321        let start = std::time::Instant::now();
322        loop {
323            let actions = store.get_all_pending_actions();
324            if actions.is_empty() {
325                if start.elapsed().as_secs() > 5 {
326                    panic!("Timed out waiting for action to be written to WAL");
327                }
328                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
329                continue;
330            }
331            assert_eq!(actions.len(), 1);
332            let action = actions.get(&bridge_action.digest()).unwrap();
333            assert_eq!(action, &bridge_action);
334            assert_eq!(
335                store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
336                end_block_num,
337            );
338            break;
339        }
340    }
341
342    #[tokio::test]
343    async fn test_sui_grpc_watcher_task() {
344        let (
345            sui_grpc_events_tx,
346            sui_grpc_events_rx,
347            _eth_events_tx,
348            eth_events_rx,
349            eth_monitor_tx,
350            _eth_monitor_rx,
351            sui_client,
352            store,
353        ) = setup();
354        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
355        // start orchestrator with gRPC
356        let registry = Registry::new();
357        let metrics = Arc::new(BridgeMetrics::new(&registry));
358        let _handles = BridgeOrchestrator::new(
359            Arc::new(sui_client),
360            sui_grpc_events_rx,
361            eth_events_rx,
362            store.clone(),
363            eth_monitor_tx,
364            metrics,
365        )
366        .run_with_grpc(executor)
367        .await;
368
369        let identifier = Identifier::from_str("test_sui_grpc_watcher_task").unwrap();
370        let (sui_event, mut bridge_action) = get_test_sui_event_and_action(identifier);
371        bridge_action = bridge_action.update_to_token_transfer();
372
373        // Convert SuiEvent to SuiBridgeEvent
374        let bridge_event = SuiBridgeEvent::try_from_sui_event(&sui_event)
375            .unwrap()
376            .unwrap();
377
378        let last_seq_num = 42u64;
379        sui_grpc_events_tx
380            .send((last_seq_num, vec![bridge_event]))
381            .await
382            .unwrap();
383
384        let start = std::time::Instant::now();
385        // Executor should have received the action
386        assert_eq!(
387            executor_requested_action_rx.recv().await.unwrap(),
388            bridge_action.digest()
389        );
390        loop {
391            let actions = store.get_all_pending_actions();
392            if actions.is_empty() {
393                if start.elapsed().as_secs() > 5 {
394                    panic!("Timed out waiting for action to be written to WAL");
395                }
396                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
397                continue;
398            }
399            assert_eq!(actions.len(), 1);
400            let action = actions.get(&bridge_action.digest()).unwrap();
401            assert_eq!(action, &bridge_action);
402            // Verify sequence number cursor was updated
403            assert_eq!(
404                store.get_sui_sequence_number_cursor().unwrap().unwrap(),
405                last_seq_num,
406            );
407            break;
408        }
409    }
410
411    #[tokio::test]
412    /// Test that when orchestrator starts with gRPC, all pending actions are sent to executor
413    async fn test_resume_actions_in_pending_logs_with_grpc() {
414        let (
415            _sui_grpc_events_tx,
416            sui_grpc_events_rx,
417            _eth_events_tx,
418            eth_events_rx,
419            eth_monitor_tx,
420            _eth_monitor_rx,
421            sui_client,
422            store,
423        ) = setup();
424        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
425
426        let action1 = get_test_sui_to_eth_bridge_action(
427            None,
428            Some(0),
429            Some(99),
430            Some(10000),
431            None,
432            None,
433            None,
434        );
435
436        let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
437        store
438            .insert_pending_actions(&[action1.clone(), action2.clone()])
439            .unwrap();
440
441        // start orchestrator with gRPC
442        let registry = Registry::new();
443        let metrics = Arc::new(BridgeMetrics::new(&registry));
444        let _handles = BridgeOrchestrator::new(
445            Arc::new(sui_client),
446            sui_grpc_events_rx,
447            eth_events_rx,
448            store.clone(),
449            eth_monitor_tx,
450            metrics,
451        )
452        .run_with_grpc(executor)
453        .await;
454
455        // Executor should have received the action
456        let mut digests = std::collections::HashSet::new();
457        digests.insert(executor_requested_action_rx.recv().await.unwrap());
458        digests.insert(executor_requested_action_rx.recv().await.unwrap());
459        assert!(digests.contains(&action1.digest()));
460        assert!(digests.contains(&action2.digest()));
461        assert_eq!(digests.len(), 2);
462    }
463
464    #[tokio::test]
465    /// Test that when orchestrator starts, all pending actions are sent to executor
466    async fn test_resume_actions_in_pending_logs() {
467        let (
468            _sui_grpc_events_tx,
469            sui_grpc_events_rx,
470            _eth_events_tx,
471            eth_events_rx,
472            eth_monitor_tx,
473            _eth_monitor_rx,
474            sui_client,
475            store,
476        ) = setup();
477        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
478
479        let action1 = get_test_sui_to_eth_bridge_action(
480            None,
481            Some(0),
482            Some(99),
483            Some(10000),
484            None,
485            None,
486            None,
487        );
488
489        let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
490        store
491            .insert_pending_actions(&[action1.clone(), action2.clone()])
492            .unwrap();
493
494        // start orchestrator
495        let registry = Registry::new();
496        let metrics = Arc::new(BridgeMetrics::new(&registry));
497        let _handles = BridgeOrchestrator::new(
498            Arc::new(sui_client),
499            sui_grpc_events_rx,
500            eth_events_rx,
501            store.clone(),
502            eth_monitor_tx,
503            metrics,
504        )
505        .run_with_grpc(executor)
506        .await;
507
508        // Executor should have received the action
509        let mut digests = std::collections::HashSet::new();
510        digests.insert(executor_requested_action_rx.recv().await.unwrap());
511        digests.insert(executor_requested_action_rx.recv().await.unwrap());
512        assert!(digests.contains(&action1.digest()));
513        assert!(digests.contains(&action2.digest()));
514        assert_eq!(digests.len(), 2);
515    }
516
517    #[allow(clippy::type_complexity)]
518    fn setup() -> (
519        mysten_metrics::metered_channel::Sender<(u64, Vec<SuiBridgeEvent>)>,
520        mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
521        mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
522        mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
523        mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
524        mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
525        SuiClient<SuiMockClient>,
526        Arc<BridgeOrchestratorTables>,
527    ) {
528        telemetry_subscribers::init_for_testing();
529        let registry = Registry::new();
530        mysten_metrics::init_metrics(&registry);
531
532        init_all_struct_tags();
533
534        let temp_dir = tempfile::tempdir().unwrap();
535        let store = BridgeOrchestratorTables::new(temp_dir.path());
536
537        let mock_client = SuiMockClient::default();
538        let sui_client = SuiClient::new_for_testing(mock_client.clone());
539
540        let (eth_events_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
541            100,
542            &mysten_metrics::get_metrics()
543                .unwrap()
544                .channel_inflight
545                .with_label_values(&["unit_test_eth_events_queue"]),
546        );
547
548        let (sui_grpc_events_tx, sui_grpc_events_rx) = mysten_metrics::metered_channel::channel(
549            100,
550            &mysten_metrics::get_metrics()
551                .unwrap()
552                .channel_inflight
553                .with_label_values(&["unit_test_sui_events_queue"]),
554        );
555        let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
556            10000,
557            &mysten_metrics::get_metrics()
558                .unwrap()
559                .channel_inflight
560                .with_label_values(&["eth_monitor_queue"]),
561        );
562        (
563            sui_grpc_events_tx,
564            sui_grpc_events_rx,
565            eth_events_tx,
566            eth_events_rx,
567            eth_monitor_tx,
568            eth_monitor_rx,
569            sui_client,
570            store,
571        )
572    }
573
574    /// A `BridgeActionExecutorTrait` implementation that only tracks the submitted actions.
575    struct MockExecutor {
576        requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
577    }
578
579    impl MockExecutor {
580        fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
581            let (tx, rx) = tokio::sync::broadcast::channel(100);
582            (
583                Self {
584                    requested_transactions_tx: tx,
585                },
586                rx,
587            )
588        }
589    }
590
591    impl BridgeActionExecutorTrait for MockExecutor {
592        fn run(
593            self,
594        ) -> (
595            Vec<tokio::task::JoinHandle<()>>,
596            mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
597        ) {
598            let (tx, mut rx) =
599                mysten_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
600                    100,
601                    &mysten_metrics::get_metrics()
602                        .unwrap()
603                        .channel_inflight
604                        .with_label_values(&["unit_test_mock_executor"]),
605                );
606
607            let handles = tokio::spawn(async move {
608                while let Some(action) = rx.recv().await {
609                    self.requested_transactions_tx
610                        .send(action.0.digest())
611                        .unwrap();
612                }
613            });
614            (vec![handles], tx)
615        }
616    }
617}