sui_bridge/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `BridgeOrchestrator` is the component that:
5//! 1. monitors Sui and Ethereum events with the help of `SuiSyncer` and `EthSyncer`
6//! 2. updates WAL table and cursor tables
7//! 2. hands actions to `BridgeExecutor` for execution
8
9use crate::abi::EthBridgeEvent;
10use crate::action_executor::{
11    BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
12};
13use crate::error::BridgeError;
14use crate::events::SuiBridgeEvent;
15use crate::metrics::BridgeMetrics;
16use crate::storage::BridgeOrchestratorTables;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::types::EthLog;
19use ethers::types::Address as EthAddress;
20use mysten_metrics::spawn_logged_monitored_task;
21use std::sync::Arc;
22use sui_json_rpc_types::SuiEvent;
23use sui_types::Identifier;
24use tokio::task::JoinHandle;
25use tracing::{error, info};
26
27pub struct BridgeOrchestrator<C> {
28    _sui_client: Arc<SuiClient<C>>,
29    sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
30    eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
31    store: Arc<BridgeOrchestratorTables>,
32    sui_monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
33    eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
34    metrics: Arc<BridgeMetrics>,
35}
36
37impl<C> BridgeOrchestrator<C>
38where
39    C: SuiClientInner + 'static,
40{
41    pub fn new(
42        sui_client: Arc<SuiClient<C>>,
43        sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
44        eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
45        store: Arc<BridgeOrchestratorTables>,
46        sui_monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
47        eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
48        metrics: Arc<BridgeMetrics>,
49    ) -> Self {
50        Self {
51            _sui_client: sui_client,
52            sui_events_rx,
53            eth_events_rx,
54            store,
55            sui_monitor_tx,
56            eth_monitor_tx,
57            metrics,
58        }
59    }
60
61    pub async fn run(
62        self,
63        bridge_action_executor: impl BridgeActionExecutorTrait,
64    ) -> Vec<JoinHandle<()>> {
65        tracing::info!("Starting BridgeOrchestrator");
66        let mut task_handles = vec![];
67        let store_clone = self.store.clone();
68
69        // Spawn BridgeActionExecutor
70        let (handles, executor_sender) = bridge_action_executor.run();
71        task_handles.extend(handles);
72        let executor_sender_clone = executor_sender.clone();
73        let metrics_clone = self.metrics.clone();
74        task_handles.push(spawn_logged_monitored_task!(Self::run_sui_watcher(
75            store_clone,
76            executor_sender_clone,
77            self.sui_events_rx,
78            self.sui_monitor_tx,
79            metrics_clone,
80        )));
81        let store_clone = self.store.clone();
82
83        // Re-submit pending actions to executor
84        let actions = store_clone
85            .get_all_pending_actions()
86            .into_values()
87            .collect::<Vec<_>>();
88        for action in actions {
89            submit_to_executor(&executor_sender, action)
90                .await
91                .expect("Submit to executor should not fail");
92        }
93
94        let metrics_clone = self.metrics.clone();
95        task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
96            store_clone,
97            executor_sender,
98            self.eth_events_rx,
99            self.eth_monitor_tx,
100            metrics_clone,
101        )));
102
103        task_handles
104    }
105
106    async fn run_sui_watcher(
107        store: Arc<BridgeOrchestratorTables>,
108        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
109        mut sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
110        monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
111        metrics: Arc<BridgeMetrics>,
112    ) {
113        info!("Starting sui watcher task");
114        while let Some((identifier, events)) = sui_events_rx.recv().await {
115            if events.is_empty() {
116                continue;
117            }
118            info!("Received {} Sui events: {:?}", events.len(), events);
119            metrics
120                .sui_watcher_received_events
121                .inc_by(events.len() as u64);
122            let bridge_events = events
123                .iter()
124                .filter_map(|sui_event| {
125                    match SuiBridgeEvent::try_from_sui_event(sui_event) {
126                        Ok(bridge_event) => Some(bridge_event),
127                        // On testnet some early bridge transactions could have zero value (before we disallow it in Move)
128                        Err(BridgeError::ZeroValueBridgeTransfer(_)) => {
129                            error!("Zero value bridge transfer: {:?}", sui_event);
130                            None
131                        }
132                        Err(e) => {
133                            panic!(
134                                "Sui Event could not be deserialzed to SuiBridgeEvent: {:?}",
135                                e
136                            );
137                        }
138                    }
139                })
140                .collect::<Vec<_>>();
141
142            let mut actions = vec![];
143            for (sui_event, opt_bridge_event) in events.iter().zip(bridge_events) {
144                if opt_bridge_event.is_none() {
145                    // TODO: we probably should not miss any events, log for now.
146                    metrics.sui_watcher_unrecognized_events.inc();
147                    error!("Sui event not recognized: {:?}", sui_event);
148                    continue;
149                }
150                // Unwrap safe: checked above
151                let bridge_event: SuiBridgeEvent = opt_bridge_event.unwrap();
152                info!("Observed Sui bridge event: {:?}", bridge_event);
153
154                // Send event to monitor
155                monitor_tx
156                    .send(bridge_event.clone())
157                    .await
158                    .expect("Sending event to monitor channel should not fail");
159
160                if let Some(mut action) = bridge_event.try_into_bridge_action() {
161                    metrics.last_observed_actions_seq_num.with_label_values(&[
162                        action.chain_id().to_string().as_str(),
163                        action.action_type().to_string().as_str(),
164                    ]);
165
166                    action = action.update_to_token_transfer();
167
168                    actions.push(action);
169                }
170            }
171
172            if !actions.is_empty() {
173                info!("Received {} actions from Sui: {:?}", actions.len(), actions);
174                metrics
175                    .sui_watcher_received_actions
176                    .inc_by(actions.len() as u64);
177                // Write action to pending WAL
178                store
179                    .insert_pending_actions(&actions)
180                    .expect("Store operation should not fail");
181                for action in actions {
182                    submit_to_executor(&executor_tx, action)
183                        .await
184                        .expect("Submit to executor should not fail");
185                }
186            }
187
188            // Unwrap safe: in the beginning of the loop we checked that events is not empty
189            let cursor = events.last().unwrap().id;
190            store
191                .update_sui_event_cursor(identifier, cursor)
192                .expect("Store operation should not fail");
193        }
194        panic!("Sui event channel was closed unexpectedly");
195    }
196
197    async fn run_eth_watcher(
198        store: Arc<BridgeOrchestratorTables>,
199        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
200        mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(
201            ethers::types::Address,
202            u64,
203            Vec<EthLog>,
204        )>,
205        eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
206        metrics: Arc<BridgeMetrics>,
207    ) {
208        info!("Starting eth watcher task");
209        while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
210            if logs.is_empty() {
211                store
212                    .update_eth_event_cursor(contract, end_block)
213                    .expect("Store operation should not fail");
214                continue;
215            }
216
217            info!("Received {} Eth events", logs.len());
218            metrics
219                .eth_watcher_received_events
220                .inc_by(logs.len() as u64);
221
222            let bridge_events = logs
223                .iter()
224                .map(EthBridgeEvent::try_from_eth_log)
225                .collect::<Vec<_>>();
226
227            let mut actions = vec![];
228            for (log, opt_bridge_event) in logs.iter().zip(bridge_events) {
229                if opt_bridge_event.is_none() {
230                    // TODO: we probably should not miss any events, log for now.
231                    metrics.eth_watcher_unrecognized_events.inc();
232                    error!("Eth event not recognized: {:?}", log);
233                    continue;
234                }
235                // Unwrap safe: checked above
236                let bridge_event = opt_bridge_event.unwrap();
237                info!("Observed Eth bridge event: {:?}", bridge_event);
238
239                // Send event to monitor
240                eth_monitor_tx
241                    .send(bridge_event.clone())
242                    .await
243                    .expect("Sending event to monitor channel should not fail");
244
245                match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
246                    Ok(Some(action)) => {
247                        metrics.last_observed_actions_seq_num.with_label_values(&[
248                            action.chain_id().to_string().as_str(),
249                            action.action_type().to_string().as_str(),
250                        ]);
251                        actions.push(action)
252                    }
253                    Ok(None) => {}
254                    Err(e) => {
255                        error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
256                    }
257                }
258            }
259            if !actions.is_empty() {
260                info!("Received {} actions from Eth: {:?}", actions.len(), actions);
261                metrics
262                    .eth_watcher_received_actions
263                    .inc_by(actions.len() as u64);
264                // Write action to pending WAL
265                store
266                    .insert_pending_actions(&actions)
267                    .expect("Store operation should not fail");
268                // Execution will remove the pending actions from DB when the action is completed.
269                for action in actions {
270                    submit_to_executor(&executor_tx, action)
271                        .await
272                        .expect("Submit to executor should not fail");
273                }
274            }
275
276            store
277                .update_eth_event_cursor(contract, end_block)
278                .expect("Store operation should not fail");
279        }
280        panic!("Eth event channel was closed");
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use crate::{
287        test_utils::{get_test_eth_to_sui_bridge_action, get_test_log_and_action},
288        types::BridgeActionDigest,
289    };
290    use ethers::types::{Address as EthAddress, TxHash};
291    use prometheus::Registry;
292    use std::str::FromStr;
293
294    use super::*;
295    use crate::events::init_all_struct_tags;
296    use crate::test_utils::get_test_sui_to_eth_bridge_action;
297    use crate::{events::tests::get_test_sui_event_and_action, sui_mock_client::SuiMockClient};
298
299    #[tokio::test]
300    async fn test_sui_watcher_task() {
301        // Note: this test may fail because of the following reasons:
302        // the SuiEvent's struct tag does not match the ones in events.rs
303
304        let (
305            sui_events_tx,
306            sui_events_rx,
307            _eth_events_tx,
308            eth_events_rx,
309            sui_monitor_tx,
310            _sui_monitor_rx,
311            eth_monitor_tx,
312            _eth_monitor_rx,
313            sui_client,
314            store,
315        ) = setup();
316        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
317        // start orchestrator
318        let registry = Registry::new();
319        let metrics = Arc::new(BridgeMetrics::new(&registry));
320        let _handles = BridgeOrchestrator::new(
321            Arc::new(sui_client),
322            sui_events_rx,
323            eth_events_rx,
324            store.clone(),
325            sui_monitor_tx,
326            eth_monitor_tx,
327            metrics,
328        )
329        .run(executor)
330        .await;
331
332        let identifier = Identifier::from_str("test_sui_watcher_task").unwrap();
333        let (sui_event, mut bridge_action) = get_test_sui_event_and_action(identifier.clone());
334        bridge_action = bridge_action.update_to_token_transfer();
335        sui_events_tx
336            .send((identifier.clone(), vec![sui_event.clone()]))
337            .await
338            .unwrap();
339
340        let start = std::time::Instant::now();
341        // Executor should have received the action
342        assert_eq!(
343            executor_requested_action_rx.recv().await.unwrap(),
344            bridge_action.digest()
345        );
346        loop {
347            let actions = store.get_all_pending_actions();
348            if actions.is_empty() {
349                if start.elapsed().as_secs() > 5 {
350                    panic!("Timed out waiting for action to be written to WAL");
351                }
352                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
353                continue;
354            }
355            assert_eq!(actions.len(), 1);
356            let action = actions.get(&bridge_action.digest()).unwrap();
357            assert_eq!(action, &bridge_action);
358            assert_eq!(
359                store.get_sui_event_cursors(&[identifier]).unwrap()[0].unwrap(),
360                sui_event.id,
361            );
362            break;
363        }
364    }
365
366    #[tokio::test]
367    async fn test_eth_watcher_task() {
368        // Note: this test may fail beacuse of the following reasons:
369        // 1. Log and BridgeAction returned from `get_test_log_and_action` are not in sync
370        // 2. Log returned from `get_test_log_and_action` is not parseable log (not abigen!, check abi.rs)
371
372        let (
373            _sui_events_tx,
374            sui_events_rx,
375            eth_events_tx,
376            eth_events_rx,
377            sui_monitor_tx,
378            _sui_monitor_rx,
379            eth_monitor_tx,
380            _eth_monitor_rx,
381            sui_client,
382            store,
383        ) = setup();
384        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
385        // start orchestrator
386        let registry = Registry::new();
387        let metrics = Arc::new(BridgeMetrics::new(&registry));
388        let _handles = BridgeOrchestrator::new(
389            Arc::new(sui_client),
390            sui_events_rx,
391            eth_events_rx,
392            store.clone(),
393            sui_monitor_tx,
394            eth_monitor_tx,
395            metrics,
396        )
397        .run(executor)
398        .await;
399        let address = EthAddress::random();
400        let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
401        let log_index_in_tx = 10;
402        let log_block_num = log.block_number.unwrap().as_u64();
403        let eth_log = EthLog {
404            log: log.clone(),
405            tx_hash: log.transaction_hash.unwrap(),
406            block_number: log_block_num,
407            log_index_in_tx,
408        };
409        let end_block_num = log_block_num + 15;
410
411        eth_events_tx
412            .send((address, end_block_num, vec![eth_log.clone()]))
413            .await
414            .unwrap();
415
416        // Executor should have received the action
417        assert_eq!(
418            executor_requested_action_rx.recv().await.unwrap(),
419            bridge_action.digest()
420        );
421        let start = std::time::Instant::now();
422        loop {
423            let actions = store.get_all_pending_actions();
424            if actions.is_empty() {
425                if start.elapsed().as_secs() > 5 {
426                    panic!("Timed out waiting for action to be written to WAL");
427                }
428                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
429                continue;
430            }
431            assert_eq!(actions.len(), 1);
432            let action = actions.get(&bridge_action.digest()).unwrap();
433            assert_eq!(action, &bridge_action);
434            assert_eq!(
435                store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
436                end_block_num,
437            );
438            break;
439        }
440    }
441
442    #[tokio::test]
443    /// Test that when orchestrator starts, all pending actions are sent to executor
444    async fn test_resume_actions_in_pending_logs() {
445        let (
446            _sui_events_tx,
447            sui_events_rx,
448            _eth_events_tx,
449            eth_events_rx,
450            sui_monitor_tx,
451            _sui_monitor_rx,
452            eth_monitor_tx,
453            _eth_monitor_rx,
454            sui_client,
455            store,
456        ) = setup();
457        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
458
459        let action1 = get_test_sui_to_eth_bridge_action(
460            None,
461            Some(0),
462            Some(99),
463            Some(10000),
464            None,
465            None,
466            None,
467        );
468
469        let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
470        store
471            .insert_pending_actions(&vec![action1.clone(), action2.clone()])
472            .unwrap();
473
474        // start orchestrator
475        let registry = Registry::new();
476        let metrics = Arc::new(BridgeMetrics::new(&registry));
477        let _handles = BridgeOrchestrator::new(
478            Arc::new(sui_client),
479            sui_events_rx,
480            eth_events_rx,
481            store.clone(),
482            sui_monitor_tx,
483            eth_monitor_tx,
484            metrics,
485        )
486        .run(executor)
487        .await;
488
489        // Executor should have received the action
490        let mut digests = std::collections::HashSet::new();
491        digests.insert(executor_requested_action_rx.recv().await.unwrap());
492        digests.insert(executor_requested_action_rx.recv().await.unwrap());
493        assert!(digests.contains(&action1.digest()));
494        assert!(digests.contains(&action2.digest()));
495        assert_eq!(digests.len(), 2);
496    }
497
498    #[allow(clippy::type_complexity)]
499    fn setup() -> (
500        mysten_metrics::metered_channel::Sender<(Identifier, Vec<SuiEvent>)>,
501        mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
502        mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
503        mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
504        mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
505        mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
506        mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
507        mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
508        SuiClient<SuiMockClient>,
509        Arc<BridgeOrchestratorTables>,
510    ) {
511        telemetry_subscribers::init_for_testing();
512        let registry = Registry::new();
513        mysten_metrics::init_metrics(&registry);
514
515        init_all_struct_tags();
516
517        let temp_dir = tempfile::tempdir().unwrap();
518        let store = BridgeOrchestratorTables::new(temp_dir.path());
519
520        let mock_client = SuiMockClient::default();
521        let sui_client = SuiClient::new_for_testing(mock_client.clone());
522
523        let (eth_events_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
524            100,
525            &mysten_metrics::get_metrics()
526                .unwrap()
527                .channel_inflight
528                .with_label_values(&["unit_test_eth_events_queue"]),
529        );
530
531        let (sui_events_tx, sui_events_rx) = mysten_metrics::metered_channel::channel(
532            100,
533            &mysten_metrics::get_metrics()
534                .unwrap()
535                .channel_inflight
536                .with_label_values(&["unit_test_sui_events_queue"]),
537        );
538        let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
539            10000,
540            &mysten_metrics::get_metrics()
541                .unwrap()
542                .channel_inflight
543                .with_label_values(&["sui_monitor_queue"]),
544        );
545        let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
546            10000,
547            &mysten_metrics::get_metrics()
548                .unwrap()
549                .channel_inflight
550                .with_label_values(&["eth_monitor_queue"]),
551        );
552        (
553            sui_events_tx,
554            sui_events_rx,
555            eth_events_tx,
556            eth_events_rx,
557            sui_monitor_tx,
558            sui_monitor_rx,
559            eth_monitor_tx,
560            eth_monitor_rx,
561            sui_client,
562            store,
563        )
564    }
565
566    /// A `BridgeActionExecutorTrait` implementation that only tracks the submitted actions.
567    struct MockExecutor {
568        requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
569    }
570
571    impl MockExecutor {
572        fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
573            let (tx, rx) = tokio::sync::broadcast::channel(100);
574            (
575                Self {
576                    requested_transactions_tx: tx,
577                },
578                rx,
579            )
580        }
581    }
582
583    impl BridgeActionExecutorTrait for MockExecutor {
584        fn run(
585            self,
586        ) -> (
587            Vec<tokio::task::JoinHandle<()>>,
588            mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
589        ) {
590            let (tx, mut rx) =
591                mysten_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
592                    100,
593                    &mysten_metrics::get_metrics()
594                        .unwrap()
595                        .channel_inflight
596                        .with_label_values(&["unit_test_mock_executor"]),
597                );
598
599            let handles = tokio::spawn(async move {
600                while let Some(action) = rx.recv().await {
601                    self.requested_transactions_tx
602                        .send(action.0.digest())
603                        .unwrap();
604                }
605            });
606            (vec![handles], tx)
607        }
608    }
609}