sui_bridge/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `BridgeOrchestrator` is the component that:
5//! 1. monitors Sui and Ethereum events with the help of `SuiSyncer` and `EthSyncer`
6//! 2. updates WAL table and cursor tables
7//! 2. hands actions to `BridgeExecutor` for execution
8
9use crate::abi::EthBridgeEvent;
10use crate::action_executor::{
11    BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
12};
13use crate::events::SuiBridgeEvent;
14use crate::metrics::BridgeMetrics;
15use crate::storage::BridgeOrchestratorTables;
16use crate::sui_client::{SuiClient, SuiClientInner};
17use crate::sui_syncer::GrpcSyncedEvents;
18use crate::types::EthLog;
19use alloy::primitives::Address as EthAddress;
20use mysten_common::ZipDebugEqIteratorExt;
21use mysten_metrics::spawn_logged_monitored_task;
22use std::sync::Arc;
23use tokio::task::JoinHandle;
24use tracing::{error, info};
25
26pub struct BridgeOrchestrator<C> {
27    _sui_client: Arc<SuiClient<C>>,
28    sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
29    eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
30    store: Arc<BridgeOrchestratorTables>,
31    eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
32    metrics: Arc<BridgeMetrics>,
33}
34
35impl<C> BridgeOrchestrator<C>
36where
37    C: SuiClientInner + 'static,
38{
39    pub fn new(
40        sui_client: Arc<SuiClient<C>>,
41        sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
42        eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
43        store: Arc<BridgeOrchestratorTables>,
44        eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
45        metrics: Arc<BridgeMetrics>,
46    ) -> Self {
47        Self {
48            _sui_client: sui_client,
49            sui_grpc_events_rx,
50            eth_events_rx,
51            store,
52            eth_monitor_tx,
53            metrics,
54        }
55    }
56
57    pub async fn run_with_grpc(
58        self,
59        bridge_action_executor: impl BridgeActionExecutorTrait,
60    ) -> Vec<JoinHandle<()>> {
61        tracing::info!("Starting BridgeOrchestrator with gRPC syncer");
62        let mut task_handles = vec![];
63        let store_clone = self.store.clone();
64
65        // Spawn BridgeActionExecutor
66        let (handles, executor_sender) = bridge_action_executor.run();
67        task_handles.extend(handles);
68        let executor_sender_clone = executor_sender.clone();
69        let metrics_clone = self.metrics.clone();
70
71        task_handles.push(spawn_logged_monitored_task!(Self::run_sui_grpc_watcher(
72            store_clone,
73            executor_sender_clone,
74            self.sui_grpc_events_rx,
75            metrics_clone,
76        )));
77        let store_clone = self.store.clone();
78
79        // Re-submit pending actions to executor
80        let actions = store_clone
81            .get_all_pending_actions()
82            .into_values()
83            .collect::<Vec<_>>();
84        for action in actions {
85            submit_to_executor(&executor_sender, action)
86                .await
87                .expect("Submit to executor should not fail");
88        }
89
90        let metrics_clone = self.metrics.clone();
91        task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
92            store_clone,
93            executor_sender,
94            self.eth_events_rx,
95            self.eth_monitor_tx,
96            metrics_clone,
97        )));
98
99        task_handles
100    }
101
102    pub async fn run_sui_grpc_watcher(
103        store: Arc<BridgeOrchestratorTables>,
104        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
105        mut sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<GrpcSyncedEvents>,
106        metrics: Arc<BridgeMetrics>,
107    ) {
108        info!("Starting sui gRPC watcher task");
109        while let Some((last_seq_num, events)) = sui_grpc_events_rx.recv().await {
110            if events.is_empty() {
111                continue;
112            }
113            info!(
114                "Received {} Sui events: last_seq_num={}",
115                events.len(),
116                last_seq_num
117            );
118            metrics
119                .sui_watcher_received_events
120                .inc_by(events.len() as u64);
121
122            let mut actions = vec![];
123            for bridge_event in events {
124                info!("Observed Sui bridge event (gRPC): {:?}", bridge_event);
125
126                // Convert to action using the same flow as JSON-RPC watcher
127                if let Some(mut action) = bridge_event.try_into_bridge_action() {
128                    metrics.last_observed_actions_seq_num.with_label_values(&[
129                        action.chain_id().to_string().as_str(),
130                        action.action_type().to_string().as_str(),
131                    ]);
132
133                    action = action.update_to_token_transfer();
134                    actions.push(action);
135                }
136            }
137
138            if !actions.is_empty() {
139                info!(
140                    "Received {} actions from Sui gRPC: {:?}",
141                    actions.len(),
142                    actions
143                );
144                metrics
145                    .sui_watcher_received_actions
146                    .inc_by(actions.len() as u64);
147                // Write action to pending WAL
148                store
149                    .insert_pending_actions(&actions)
150                    .expect("Store operation should not fail");
151                for action in actions {
152                    submit_to_executor(&executor_tx, action)
153                        .await
154                        .expect("Submit to executor should not fail");
155                }
156            }
157
158            // Store the sequence number cursor
159            store
160                .update_sui_sequence_number_cursor(last_seq_num)
161                .expect("Store operation should not fail");
162        }
163        panic!("Sui gRPC event channel was closed unexpectedly");
164    }
165
166    async fn run_eth_watcher(
167        store: Arc<BridgeOrchestratorTables>,
168        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
169        mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(
170            alloy::primitives::Address,
171            u64,
172            Vec<EthLog>,
173        )>,
174        eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
175        metrics: Arc<BridgeMetrics>,
176    ) {
177        info!("Starting eth watcher task");
178        while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
179            if logs.is_empty() {
180                store
181                    .update_eth_event_cursor(contract, end_block)
182                    .expect("Store operation should not fail");
183                continue;
184            }
185
186            info!("Received {} Eth events", logs.len());
187            metrics
188                .eth_watcher_received_events
189                .inc_by(logs.len() as u64);
190
191            let bridge_events = logs
192                .iter()
193                .map(EthBridgeEvent::try_from_eth_log)
194                .collect::<Vec<_>>();
195
196            let mut actions = vec![];
197            for (log, opt_bridge_event) in logs.iter().zip_debug_eq(bridge_events) {
198                if opt_bridge_event.is_none() {
199                    // TODO: we probably should not miss any events, log for now.
200                    metrics.eth_watcher_unrecognized_events.inc();
201                    error!("Eth event not recognized: {:?}", log);
202                    continue;
203                }
204                // Unwrap safe: checked above
205                let bridge_event = opt_bridge_event.unwrap();
206                info!("Observed Eth bridge event: {:?}", bridge_event);
207
208                // Send event to monitor
209                eth_monitor_tx
210                    .send(bridge_event.clone())
211                    .await
212                    .expect("Sending event to monitor channel should not fail");
213
214                match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
215                    Ok(Some(action)) => {
216                        metrics.last_observed_actions_seq_num.with_label_values(&[
217                            action.chain_id().to_string().as_str(),
218                            action.action_type().to_string().as_str(),
219                        ]);
220                        actions.push(action)
221                    }
222                    Ok(None) => {}
223                    Err(e) => {
224                        error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
225                    }
226                }
227            }
228            if !actions.is_empty() {
229                info!("Received {} actions from Eth: {:?}", actions.len(), actions);
230                metrics
231                    .eth_watcher_received_actions
232                    .inc_by(actions.len() as u64);
233                // Write action to pending WAL
234                store
235                    .insert_pending_actions(&actions)
236                    .expect("Store operation should not fail");
237                // Execution will remove the pending actions from DB when the action is completed.
238                for action in actions {
239                    submit_to_executor(&executor_tx, action)
240                        .await
241                        .expect("Submit to executor should not fail");
242                }
243            }
244
245            store
246                .update_eth_event_cursor(contract, end_block)
247                .expect("Store operation should not fail");
248        }
249        panic!("Eth event channel was closed");
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use crate::{
256        test_utils::{get_test_eth_to_sui_bridge_action, get_test_log_and_action},
257        types::BridgeActionDigest,
258    };
259    use alloy::primitives::TxHash;
260    use prometheus::Registry;
261    use std::str::FromStr;
262    use sui_types::Identifier;
263
264    use super::*;
265    use crate::events::SuiBridgeEvent;
266    use crate::events::init_all_struct_tags;
267    use crate::test_utils::get_test_sui_to_eth_bridge_action;
268    use crate::{events::tests::get_test_sui_event_and_action, sui_mock_client::SuiMockClient};
269
270    #[tokio::test]
271    async fn test_eth_watcher_task() {
272        // Note: this test may fail because of the following reasons:
273        // 1. Log and BridgeAction returned from `get_test_log_and_action` are not in sync
274        // 2. Log returned from `get_test_log_and_action` is not parseable log (not abigen!, check abi.rs)
275
276        let (
277            _sui_grpc_events_tx,
278            sui_grpc_events_rx,
279            eth_events_tx,
280            eth_events_rx,
281            eth_monitor_tx,
282            _eth_monitor_rx,
283            sui_client,
284            store,
285        ) = setup();
286        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
287        // start orchestrator
288        let registry = Registry::new();
289        let metrics = Arc::new(BridgeMetrics::new(&registry));
290        let _handles = BridgeOrchestrator::new(
291            Arc::new(sui_client),
292            sui_grpc_events_rx,
293            eth_events_rx,
294            store.clone(),
295            eth_monitor_tx,
296            metrics,
297        )
298        .run_with_grpc(executor)
299        .await;
300        let address = EthAddress::random();
301        let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
302        let log_index_in_tx = 10;
303        let log_block_num = log.block_number.unwrap();
304        let eth_log = EthLog {
305            log: log.clone(),
306            tx_hash: log.transaction_hash.unwrap(),
307            block_number: log_block_num,
308            log_index_in_tx,
309        };
310        let end_block_num = log_block_num + 15;
311
312        eth_events_tx
313            .send((address, end_block_num, vec![eth_log.clone()]))
314            .await
315            .unwrap();
316
317        // Executor should have received the action
318        assert_eq!(
319            executor_requested_action_rx.recv().await.unwrap(),
320            bridge_action.digest()
321        );
322        let start = std::time::Instant::now();
323        loop {
324            let actions = store.get_all_pending_actions();
325            if actions.is_empty() {
326                if start.elapsed().as_secs() > 5 {
327                    panic!("Timed out waiting for action to be written to WAL");
328                }
329                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
330                continue;
331            }
332            assert_eq!(actions.len(), 1);
333            let action = actions.get(&bridge_action.digest()).unwrap();
334            assert_eq!(action, &bridge_action);
335            assert_eq!(
336                store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
337                end_block_num,
338            );
339            break;
340        }
341    }
342
343    #[tokio::test]
344    async fn test_sui_grpc_watcher_task() {
345        let (
346            sui_grpc_events_tx,
347            sui_grpc_events_rx,
348            _eth_events_tx,
349            eth_events_rx,
350            eth_monitor_tx,
351            _eth_monitor_rx,
352            sui_client,
353            store,
354        ) = setup();
355        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
356        // start orchestrator with gRPC
357        let registry = Registry::new();
358        let metrics = Arc::new(BridgeMetrics::new(&registry));
359        let _handles = BridgeOrchestrator::new(
360            Arc::new(sui_client),
361            sui_grpc_events_rx,
362            eth_events_rx,
363            store.clone(),
364            eth_monitor_tx,
365            metrics,
366        )
367        .run_with_grpc(executor)
368        .await;
369
370        let identifier = Identifier::from_str("test_sui_grpc_watcher_task").unwrap();
371        let (sui_event, mut bridge_action) = get_test_sui_event_and_action(identifier);
372        bridge_action = bridge_action.update_to_token_transfer();
373
374        // Convert SuiEvent to SuiBridgeEvent
375        let bridge_event = SuiBridgeEvent::try_from_sui_event(&sui_event)
376            .unwrap()
377            .unwrap();
378
379        let last_seq_num = 42u64;
380        sui_grpc_events_tx
381            .send((last_seq_num, vec![bridge_event]))
382            .await
383            .unwrap();
384
385        let start = std::time::Instant::now();
386        // Executor should have received the action
387        assert_eq!(
388            executor_requested_action_rx.recv().await.unwrap(),
389            bridge_action.digest()
390        );
391        loop {
392            let actions = store.get_all_pending_actions();
393            if actions.is_empty() {
394                if start.elapsed().as_secs() > 5 {
395                    panic!("Timed out waiting for action to be written to WAL");
396                }
397                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
398                continue;
399            }
400            assert_eq!(actions.len(), 1);
401            let action = actions.get(&bridge_action.digest()).unwrap();
402            assert_eq!(action, &bridge_action);
403            // Verify sequence number cursor was updated
404            assert_eq!(
405                store.get_sui_sequence_number_cursor().unwrap().unwrap(),
406                last_seq_num,
407            );
408            break;
409        }
410    }
411
412    #[tokio::test]
413    /// Test that when orchestrator starts with gRPC, all pending actions are sent to executor
414    async fn test_resume_actions_in_pending_logs_with_grpc() {
415        let (
416            _sui_grpc_events_tx,
417            sui_grpc_events_rx,
418            _eth_events_tx,
419            eth_events_rx,
420            eth_monitor_tx,
421            _eth_monitor_rx,
422            sui_client,
423            store,
424        ) = setup();
425        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
426
427        let action1 = get_test_sui_to_eth_bridge_action(
428            None,
429            Some(0),
430            Some(99),
431            Some(10000),
432            None,
433            None,
434            None,
435        );
436
437        let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
438        store
439            .insert_pending_actions(&[action1.clone(), action2.clone()])
440            .unwrap();
441
442        // start orchestrator with gRPC
443        let registry = Registry::new();
444        let metrics = Arc::new(BridgeMetrics::new(&registry));
445        let _handles = BridgeOrchestrator::new(
446            Arc::new(sui_client),
447            sui_grpc_events_rx,
448            eth_events_rx,
449            store.clone(),
450            eth_monitor_tx,
451            metrics,
452        )
453        .run_with_grpc(executor)
454        .await;
455
456        // Executor should have received the action
457        let mut digests = std::collections::HashSet::new();
458        digests.insert(executor_requested_action_rx.recv().await.unwrap());
459        digests.insert(executor_requested_action_rx.recv().await.unwrap());
460        assert!(digests.contains(&action1.digest()));
461        assert!(digests.contains(&action2.digest()));
462        assert_eq!(digests.len(), 2);
463    }
464
465    #[tokio::test]
466    /// Test that when orchestrator starts, all pending actions are sent to executor
467    async fn test_resume_actions_in_pending_logs() {
468        let (
469            _sui_grpc_events_tx,
470            sui_grpc_events_rx,
471            _eth_events_tx,
472            eth_events_rx,
473            eth_monitor_tx,
474            _eth_monitor_rx,
475            sui_client,
476            store,
477        ) = setup();
478        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
479
480        let action1 = get_test_sui_to_eth_bridge_action(
481            None,
482            Some(0),
483            Some(99),
484            Some(10000),
485            None,
486            None,
487            None,
488        );
489
490        let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
491        store
492            .insert_pending_actions(&[action1.clone(), action2.clone()])
493            .unwrap();
494
495        // start orchestrator
496        let registry = Registry::new();
497        let metrics = Arc::new(BridgeMetrics::new(&registry));
498        let _handles = BridgeOrchestrator::new(
499            Arc::new(sui_client),
500            sui_grpc_events_rx,
501            eth_events_rx,
502            store.clone(),
503            eth_monitor_tx,
504            metrics,
505        )
506        .run_with_grpc(executor)
507        .await;
508
509        // Executor should have received the action
510        let mut digests = std::collections::HashSet::new();
511        digests.insert(executor_requested_action_rx.recv().await.unwrap());
512        digests.insert(executor_requested_action_rx.recv().await.unwrap());
513        assert!(digests.contains(&action1.digest()));
514        assert!(digests.contains(&action2.digest()));
515        assert_eq!(digests.len(), 2);
516    }
517
518    #[allow(clippy::type_complexity)]
519    fn setup() -> (
520        mysten_metrics::metered_channel::Sender<(u64, Vec<SuiBridgeEvent>)>,
521        mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
522        mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
523        mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
524        mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
525        mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
526        SuiClient<SuiMockClient>,
527        Arc<BridgeOrchestratorTables>,
528    ) {
529        telemetry_subscribers::init_for_testing();
530        let registry = Registry::new();
531        mysten_metrics::init_metrics(&registry);
532
533        init_all_struct_tags();
534
535        let temp_dir = tempfile::tempdir().unwrap();
536        let store = BridgeOrchestratorTables::new(temp_dir.path());
537
538        let mock_client = SuiMockClient::default();
539        let sui_client = SuiClient::new_for_testing(mock_client.clone());
540
541        let (eth_events_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
542            100,
543            &mysten_metrics::get_metrics()
544                .unwrap()
545                .channel_inflight
546                .with_label_values(&["unit_test_eth_events_queue"]),
547        );
548
549        let (sui_grpc_events_tx, sui_grpc_events_rx) = mysten_metrics::metered_channel::channel(
550            100,
551            &mysten_metrics::get_metrics()
552                .unwrap()
553                .channel_inflight
554                .with_label_values(&["unit_test_sui_events_queue"]),
555        );
556        let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
557            10000,
558            &mysten_metrics::get_metrics()
559                .unwrap()
560                .channel_inflight
561                .with_label_values(&["eth_monitor_queue"]),
562        );
563        (
564            sui_grpc_events_tx,
565            sui_grpc_events_rx,
566            eth_events_tx,
567            eth_events_rx,
568            eth_monitor_tx,
569            eth_monitor_rx,
570            sui_client,
571            store,
572        )
573    }
574
575    /// A `BridgeActionExecutorTrait` implementation that only tracks the submitted actions.
576    struct MockExecutor {
577        requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
578    }
579
580    impl MockExecutor {
581        fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
582            let (tx, rx) = tokio::sync::broadcast::channel(100);
583            (
584                Self {
585                    requested_transactions_tx: tx,
586                },
587                rx,
588            )
589        }
590    }
591
592    impl BridgeActionExecutorTrait for MockExecutor {
593        fn run(
594            self,
595        ) -> (
596            Vec<tokio::task::JoinHandle<()>>,
597            mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
598        ) {
599            let (tx, mut rx) =
600                mysten_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
601                    100,
602                    &mysten_metrics::get_metrics()
603                        .unwrap()
604                        .channel_inflight
605                        .with_label_values(&["unit_test_mock_executor"]),
606                );
607
608            let handles = tokio::spawn(async move {
609                while let Some(action) = rx.recv().await {
610                    self.requested_transactions_tx
611                        .send(action.0.digest())
612                        .unwrap();
613                }
614            });
615            (vec![handles], tx)
616        }
617    }
618}