1use crate::abi::EthBridgeEvent;
10use crate::action_executor::{
11    BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
12};
13use crate::error::BridgeError;
14use crate::events::SuiBridgeEvent;
15use crate::metrics::BridgeMetrics;
16use crate::storage::BridgeOrchestratorTables;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::types::EthLog;
19use ethers::types::Address as EthAddress;
20use mysten_metrics::spawn_logged_monitored_task;
21use std::sync::Arc;
22use sui_json_rpc_types::SuiEvent;
23use sui_types::Identifier;
24use tokio::task::JoinHandle;
25use tracing::{error, info};
26
27pub struct BridgeOrchestrator<C> {
28    _sui_client: Arc<SuiClient<C>>,
29    sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
30    eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
31    store: Arc<BridgeOrchestratorTables>,
32    sui_monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
33    eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
34    metrics: Arc<BridgeMetrics>,
35}
36
37impl<C> BridgeOrchestrator<C>
38where
39    C: SuiClientInner + 'static,
40{
41    pub fn new(
42        sui_client: Arc<SuiClient<C>>,
43        sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
44        eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
45        store: Arc<BridgeOrchestratorTables>,
46        sui_monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
47        eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
48        metrics: Arc<BridgeMetrics>,
49    ) -> Self {
50        Self {
51            _sui_client: sui_client,
52            sui_events_rx,
53            eth_events_rx,
54            store,
55            sui_monitor_tx,
56            eth_monitor_tx,
57            metrics,
58        }
59    }
60
61    pub async fn run(
62        self,
63        bridge_action_executor: impl BridgeActionExecutorTrait,
64    ) -> Vec<JoinHandle<()>> {
65        tracing::info!("Starting BridgeOrchestrator");
66        let mut task_handles = vec![];
67        let store_clone = self.store.clone();
68
69        let (handles, executor_sender) = bridge_action_executor.run();
71        task_handles.extend(handles);
72        let executor_sender_clone = executor_sender.clone();
73        let metrics_clone = self.metrics.clone();
74        task_handles.push(spawn_logged_monitored_task!(Self::run_sui_watcher(
75            store_clone,
76            executor_sender_clone,
77            self.sui_events_rx,
78            self.sui_monitor_tx,
79            metrics_clone,
80        )));
81        let store_clone = self.store.clone();
82
83        let actions = store_clone
85            .get_all_pending_actions()
86            .into_values()
87            .collect::<Vec<_>>();
88        for action in actions {
89            submit_to_executor(&executor_sender, action)
90                .await
91                .expect("Submit to executor should not fail");
92        }
93
94        let metrics_clone = self.metrics.clone();
95        task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
96            store_clone,
97            executor_sender,
98            self.eth_events_rx,
99            self.eth_monitor_tx,
100            metrics_clone,
101        )));
102
103        task_handles
104    }
105
106    async fn run_sui_watcher(
107        store: Arc<BridgeOrchestratorTables>,
108        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
109        mut sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
110        monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
111        metrics: Arc<BridgeMetrics>,
112    ) {
113        info!("Starting sui watcher task");
114        while let Some((identifier, events)) = sui_events_rx.recv().await {
115            if events.is_empty() {
116                continue;
117            }
118            info!("Received {} Sui events: {:?}", events.len(), events);
119            metrics
120                .sui_watcher_received_events
121                .inc_by(events.len() as u64);
122            let bridge_events = events
123                .iter()
124                .filter_map(|sui_event| {
125                    match SuiBridgeEvent::try_from_sui_event(sui_event) {
126                        Ok(bridge_event) => Some(bridge_event),
127                        Err(BridgeError::ZeroValueBridgeTransfer(_)) => {
129                            error!("Zero value bridge transfer: {:?}", sui_event);
130                            None
131                        }
132                        Err(e) => {
133                            panic!(
134                                "Sui Event could not be deserialzed to SuiBridgeEvent: {:?}",
135                                e
136                            );
137                        }
138                    }
139                })
140                .collect::<Vec<_>>();
141
142            let mut actions = vec![];
143            for (sui_event, opt_bridge_event) in events.iter().zip(bridge_events) {
144                if opt_bridge_event.is_none() {
145                    metrics.sui_watcher_unrecognized_events.inc();
147                    error!("Sui event not recognized: {:?}", sui_event);
148                    continue;
149                }
150                let bridge_event: SuiBridgeEvent = opt_bridge_event.unwrap();
152                info!("Observed Sui bridge event: {:?}", bridge_event);
153
154                monitor_tx
156                    .send(bridge_event.clone())
157                    .await
158                    .expect("Sending event to monitor channel should not fail");
159
160                if let Some(action) = bridge_event
161                    .try_into_bridge_action(sui_event.id.tx_digest, sui_event.id.event_seq as u16)
162                {
163                    metrics.last_observed_actions_seq_num.with_label_values(&[
164                        action.chain_id().to_string().as_str(),
165                        action.action_type().to_string().as_str(),
166                    ]);
167                    actions.push(action);
168                }
169            }
170
171            if !actions.is_empty() {
172                info!("Received {} actions from Sui: {:?}", actions.len(), actions);
173                metrics
174                    .sui_watcher_received_actions
175                    .inc_by(actions.len() as u64);
176                store
178                    .insert_pending_actions(&actions)
179                    .expect("Store operation should not fail");
180                for action in actions {
181                    submit_to_executor(&executor_tx, action)
182                        .await
183                        .expect("Submit to executor should not fail");
184                }
185            }
186
187            let cursor = events.last().unwrap().id;
189            store
190                .update_sui_event_cursor(identifier, cursor)
191                .expect("Store operation should not fail");
192        }
193        panic!("Sui event channel was closed unexpectedly");
194    }
195
196    async fn run_eth_watcher(
197        store: Arc<BridgeOrchestratorTables>,
198        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
199        mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(
200            ethers::types::Address,
201            u64,
202            Vec<EthLog>,
203        )>,
204        eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
205        metrics: Arc<BridgeMetrics>,
206    ) {
207        info!("Starting eth watcher task");
208        while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
209            if logs.is_empty() {
210                store
211                    .update_eth_event_cursor(contract, end_block)
212                    .expect("Store operation should not fail");
213                continue;
214            }
215
216            info!("Received {} Eth events", logs.len());
217            metrics
218                .eth_watcher_received_events
219                .inc_by(logs.len() as u64);
220
221            let bridge_events = logs
222                .iter()
223                .map(EthBridgeEvent::try_from_eth_log)
224                .collect::<Vec<_>>();
225
226            let mut actions = vec![];
227            for (log, opt_bridge_event) in logs.iter().zip(bridge_events) {
228                if opt_bridge_event.is_none() {
229                    metrics.eth_watcher_unrecognized_events.inc();
231                    error!("Eth event not recognized: {:?}", log);
232                    continue;
233                }
234                let bridge_event = opt_bridge_event.unwrap();
236                info!("Observed Eth bridge event: {:?}", bridge_event);
237
238                eth_monitor_tx
240                    .send(bridge_event.clone())
241                    .await
242                    .expect("Sending event to monitor channel should not fail");
243
244                match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
245                    Ok(Some(action)) => {
246                        metrics.last_observed_actions_seq_num.with_label_values(&[
247                            action.chain_id().to_string().as_str(),
248                            action.action_type().to_string().as_str(),
249                        ]);
250                        actions.push(action)
251                    }
252                    Ok(None) => {}
253                    Err(e) => {
254                        error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
255                    }
256                }
257            }
258            if !actions.is_empty() {
259                info!("Received {} actions from Eth: {:?}", actions.len(), actions);
260                metrics
261                    .eth_watcher_received_actions
262                    .inc_by(actions.len() as u64);
263                store
265                    .insert_pending_actions(&actions)
266                    .expect("Store operation should not fail");
267                for action in actions {
269                    submit_to_executor(&executor_tx, action)
270                        .await
271                        .expect("Submit to executor should not fail");
272                }
273            }
274
275            store
276                .update_eth_event_cursor(contract, end_block)
277                .expect("Store operation should not fail");
278        }
279        panic!("Eth event channel was closed");
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use crate::{
286        test_utils::{get_test_eth_to_sui_bridge_action, get_test_log_and_action},
287        types::BridgeActionDigest,
288    };
289    use ethers::types::{Address as EthAddress, TxHash};
290    use prometheus::Registry;
291    use std::str::FromStr;
292
293    use super::*;
294    use crate::events::init_all_struct_tags;
295    use crate::test_utils::get_test_sui_to_eth_bridge_action;
296    use crate::{events::tests::get_test_sui_event_and_action, sui_mock_client::SuiMockClient};
297
298    #[tokio::test]
299    async fn test_sui_watcher_task() {
300        let (
304            sui_events_tx,
305            sui_events_rx,
306            _eth_events_tx,
307            eth_events_rx,
308            sui_monitor_tx,
309            _sui_monitor_rx,
310            eth_monitor_tx,
311            _eth_monitor_rx,
312            sui_client,
313            store,
314        ) = setup();
315        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
316        let registry = Registry::new();
318        let metrics = Arc::new(BridgeMetrics::new(®istry));
319        let _handles = BridgeOrchestrator::new(
320            Arc::new(sui_client),
321            sui_events_rx,
322            eth_events_rx,
323            store.clone(),
324            sui_monitor_tx,
325            eth_monitor_tx,
326            metrics,
327        )
328        .run(executor)
329        .await;
330
331        let identifier = Identifier::from_str("test_sui_watcher_task").unwrap();
332        let (sui_event, bridge_action) = get_test_sui_event_and_action(identifier.clone());
333        sui_events_tx
334            .send((identifier.clone(), vec![sui_event.clone()]))
335            .await
336            .unwrap();
337
338        let start = std::time::Instant::now();
339        assert_eq!(
341            executor_requested_action_rx.recv().await.unwrap(),
342            bridge_action.digest()
343        );
344        loop {
345            let actions = store.get_all_pending_actions();
346            if actions.is_empty() {
347                if start.elapsed().as_secs() > 5 {
348                    panic!("Timed out waiting for action to be written to WAL");
349                }
350                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
351                continue;
352            }
353            assert_eq!(actions.len(), 1);
354            let action = actions.get(&bridge_action.digest()).unwrap();
355            assert_eq!(action, &bridge_action);
356            assert_eq!(
357                store.get_sui_event_cursors(&[identifier]).unwrap()[0].unwrap(),
358                sui_event.id,
359            );
360            break;
361        }
362    }
363
364    #[tokio::test]
365    async fn test_eth_watcher_task() {
366        let (
371            _sui_events_tx,
372            sui_events_rx,
373            eth_events_tx,
374            eth_events_rx,
375            sui_monitor_tx,
376            _sui_monitor_rx,
377            eth_monitor_tx,
378            _eth_monitor_rx,
379            sui_client,
380            store,
381        ) = setup();
382        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
383        let registry = Registry::new();
385        let metrics = Arc::new(BridgeMetrics::new(®istry));
386        let _handles = BridgeOrchestrator::new(
387            Arc::new(sui_client),
388            sui_events_rx,
389            eth_events_rx,
390            store.clone(),
391            sui_monitor_tx,
392            eth_monitor_tx,
393            metrics,
394        )
395        .run(executor)
396        .await;
397        let address = EthAddress::random();
398        let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
399        let log_index_in_tx = 10;
400        let log_block_num = log.block_number.unwrap().as_u64();
401        let eth_log = EthLog {
402            log: log.clone(),
403            tx_hash: log.transaction_hash.unwrap(),
404            block_number: log_block_num,
405            log_index_in_tx,
406        };
407        let end_block_num = log_block_num + 15;
408
409        eth_events_tx
410            .send((address, end_block_num, vec![eth_log.clone()]))
411            .await
412            .unwrap();
413
414        assert_eq!(
416            executor_requested_action_rx.recv().await.unwrap(),
417            bridge_action.digest()
418        );
419        let start = std::time::Instant::now();
420        loop {
421            let actions = store.get_all_pending_actions();
422            if actions.is_empty() {
423                if start.elapsed().as_secs() > 5 {
424                    panic!("Timed out waiting for action to be written to WAL");
425                }
426                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
427                continue;
428            }
429            assert_eq!(actions.len(), 1);
430            let action = actions.get(&bridge_action.digest()).unwrap();
431            assert_eq!(action, &bridge_action);
432            assert_eq!(
433                store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
434                end_block_num,
435            );
436            break;
437        }
438    }
439
440    #[tokio::test]
441    async fn test_resume_actions_in_pending_logs() {
443        let (
444            _sui_events_tx,
445            sui_events_rx,
446            _eth_events_tx,
447            eth_events_rx,
448            sui_monitor_tx,
449            _sui_monitor_rx,
450            eth_monitor_tx,
451            _eth_monitor_rx,
452            sui_client,
453            store,
454        ) = setup();
455        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
456
457        let action1 = get_test_sui_to_eth_bridge_action(
458            None,
459            Some(0),
460            Some(99),
461            Some(10000),
462            None,
463            None,
464            None,
465        );
466
467        let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
468        store
469            .insert_pending_actions(&vec![action1.clone(), action2.clone()])
470            .unwrap();
471
472        let registry = Registry::new();
474        let metrics = Arc::new(BridgeMetrics::new(®istry));
475        let _handles = BridgeOrchestrator::new(
476            Arc::new(sui_client),
477            sui_events_rx,
478            eth_events_rx,
479            store.clone(),
480            sui_monitor_tx,
481            eth_monitor_tx,
482            metrics,
483        )
484        .run(executor)
485        .await;
486
487        let mut digests = std::collections::HashSet::new();
489        digests.insert(executor_requested_action_rx.recv().await.unwrap());
490        digests.insert(executor_requested_action_rx.recv().await.unwrap());
491        assert!(digests.contains(&action1.digest()));
492        assert!(digests.contains(&action2.digest()));
493        assert_eq!(digests.len(), 2);
494    }
495
496    #[allow(clippy::type_complexity)]
497    fn setup() -> (
498        mysten_metrics::metered_channel::Sender<(Identifier, Vec<SuiEvent>)>,
499        mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
500        mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
501        mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
502        mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
503        mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
504        mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
505        mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
506        SuiClient<SuiMockClient>,
507        Arc<BridgeOrchestratorTables>,
508    ) {
509        telemetry_subscribers::init_for_testing();
510        let registry = Registry::new();
511        mysten_metrics::init_metrics(®istry);
512
513        init_all_struct_tags();
514
515        let temp_dir = tempfile::tempdir().unwrap();
516        let store = BridgeOrchestratorTables::new(temp_dir.path());
517
518        let mock_client = SuiMockClient::default();
519        let sui_client = SuiClient::new_for_testing(mock_client.clone());
520
521        let (eth_events_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
522            100,
523            &mysten_metrics::get_metrics()
524                .unwrap()
525                .channel_inflight
526                .with_label_values(&["unit_test_eth_events_queue"]),
527        );
528
529        let (sui_events_tx, sui_events_rx) = mysten_metrics::metered_channel::channel(
530            100,
531            &mysten_metrics::get_metrics()
532                .unwrap()
533                .channel_inflight
534                .with_label_values(&["unit_test_sui_events_queue"]),
535        );
536        let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
537            10000,
538            &mysten_metrics::get_metrics()
539                .unwrap()
540                .channel_inflight
541                .with_label_values(&["sui_monitor_queue"]),
542        );
543        let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
544            10000,
545            &mysten_metrics::get_metrics()
546                .unwrap()
547                .channel_inflight
548                .with_label_values(&["eth_monitor_queue"]),
549        );
550        (
551            sui_events_tx,
552            sui_events_rx,
553            eth_events_tx,
554            eth_events_rx,
555            sui_monitor_tx,
556            sui_monitor_rx,
557            eth_monitor_tx,
558            eth_monitor_rx,
559            sui_client,
560            store,
561        )
562    }
563
564    struct MockExecutor {
566        requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
567    }
568
569    impl MockExecutor {
570        fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
571            let (tx, rx) = tokio::sync::broadcast::channel(100);
572            (
573                Self {
574                    requested_transactions_tx: tx,
575                },
576                rx,
577            )
578        }
579    }
580
581    impl BridgeActionExecutorTrait for MockExecutor {
582        fn run(
583            self,
584        ) -> (
585            Vec<tokio::task::JoinHandle<()>>,
586            mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
587        ) {
588            let (tx, mut rx) =
589                mysten_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
590                    100,
591                    &mysten_metrics::get_metrics()
592                        .unwrap()
593                        .channel_inflight
594                        .with_label_values(&["unit_test_mock_executor"]),
595                );
596
597            let handles = tokio::spawn(async move {
598                while let Some(action) = rx.recv().await {
599                    self.requested_transactions_tx
600                        .send(action.0.digest())
601                        .unwrap();
602                }
603            });
604            (vec![handles], tx)
605        }
606    }
607}