sui_bridge/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `BridgeOrchestrator` is the component that:
5//! 1. monitors Sui and Ethereum events with the help of `SuiSyncer` and `EthSyncer`
6//! 2. updates WAL table and cursor tables
7//! 2. hands actions to `BridgeExecutor` for execution
8
9use crate::abi::EthBridgeEvent;
10use crate::action_executor::{
11    BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
12};
13use crate::error::BridgeError;
14use crate::events::SuiBridgeEvent;
15use crate::metrics::BridgeMetrics;
16use crate::storage::BridgeOrchestratorTables;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::sui_syncer::GrpcSyncedEvents;
19use crate::types::EthLog;
20use alloy::primitives::Address as EthAddress;
21use mysten_metrics::spawn_logged_monitored_task;
22use std::sync::Arc;
23use sui_json_rpc_types::SuiEvent;
24use sui_types::Identifier;
25use tokio::task::JoinHandle;
26use tracing::{error, info};
27
28pub struct BridgeOrchestrator<C> {
29    _sui_client: Arc<SuiClient<C>>,
30    sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
31    sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
32    eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
33    store: Arc<BridgeOrchestratorTables>,
34    sui_monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
35    eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
36    metrics: Arc<BridgeMetrics>,
37}
38
39impl<C> BridgeOrchestrator<C>
40where
41    C: SuiClientInner + 'static,
42{
43    pub fn new(
44        sui_client: Arc<SuiClient<C>>,
45        sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
46        sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
47        eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
48        store: Arc<BridgeOrchestratorTables>,
49        sui_monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
50        eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
51        metrics: Arc<BridgeMetrics>,
52    ) -> Self {
53        Self {
54            _sui_client: sui_client,
55            sui_events_rx,
56            sui_grpc_events_rx,
57            eth_events_rx,
58            store,
59            sui_monitor_tx,
60            eth_monitor_tx,
61            metrics,
62        }
63    }
64
65    pub async fn run(
66        self,
67        bridge_action_executor: impl BridgeActionExecutorTrait,
68    ) -> Vec<JoinHandle<()>> {
69        tracing::info!("Starting BridgeOrchestrator");
70        let mut task_handles = vec![];
71        let store_clone = self.store.clone();
72
73        // Spawn BridgeActionExecutor
74        let (handles, executor_sender) = bridge_action_executor.run();
75        task_handles.extend(handles);
76        let executor_sender_clone = executor_sender.clone();
77        let metrics_clone = self.metrics.clone();
78        task_handles.push(spawn_logged_monitored_task!(Self::run_sui_watcher(
79            store_clone,
80            executor_sender_clone,
81            self.sui_events_rx,
82            self.sui_monitor_tx,
83            metrics_clone,
84        )));
85        let store_clone = self.store.clone();
86
87        // Re-submit pending actions to executor
88        let actions = store_clone
89            .get_all_pending_actions()
90            .into_values()
91            .collect::<Vec<_>>();
92        for action in actions {
93            submit_to_executor(&executor_sender, action)
94                .await
95                .expect("Submit to executor should not fail");
96        }
97
98        let metrics_clone = self.metrics.clone();
99        task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
100            store_clone,
101            executor_sender,
102            self.eth_events_rx,
103            self.eth_monitor_tx,
104            metrics_clone,
105        )));
106
107        task_handles
108    }
109
110    pub async fn run_with_grpc(
111        self,
112        bridge_action_executor: impl BridgeActionExecutorTrait,
113    ) -> Vec<JoinHandle<()>> {
114        tracing::info!("Starting BridgeOrchestrator with gRPC syncer");
115        let mut task_handles = vec![];
116        let store_clone = self.store.clone();
117
118        // Spawn BridgeActionExecutor
119        let (handles, executor_sender) = bridge_action_executor.run();
120        task_handles.extend(handles);
121        let executor_sender_clone = executor_sender.clone();
122        let metrics_clone = self.metrics.clone();
123
124        task_handles.push(spawn_logged_monitored_task!(Self::run_sui_grpc_watcher(
125            store_clone,
126            executor_sender_clone,
127            self.sui_grpc_events_rx,
128            self.sui_monitor_tx,
129            metrics_clone,
130        )));
131        let store_clone = self.store.clone();
132
133        // Re-submit pending actions to executor
134        let actions = store_clone
135            .get_all_pending_actions()
136            .into_values()
137            .collect::<Vec<_>>();
138        for action in actions {
139            submit_to_executor(&executor_sender, action)
140                .await
141                .expect("Submit to executor should not fail");
142        }
143
144        let metrics_clone = self.metrics.clone();
145        task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
146            store_clone,
147            executor_sender,
148            self.eth_events_rx,
149            self.eth_monitor_tx,
150            metrics_clone,
151        )));
152
153        task_handles
154    }
155
156    async fn run_sui_watcher(
157        store: Arc<BridgeOrchestratorTables>,
158        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
159        mut sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
160        monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
161        metrics: Arc<BridgeMetrics>,
162    ) {
163        info!("Starting sui watcher task");
164        while let Some((identifier, events)) = sui_events_rx.recv().await {
165            if events.is_empty() {
166                continue;
167            }
168            info!("Received {} Sui events: {:?}", events.len(), events);
169            metrics
170                .sui_watcher_received_events
171                .inc_by(events.len() as u64);
172            let bridge_events = events
173                .iter()
174                .filter_map(|sui_event| {
175                    match SuiBridgeEvent::try_from_sui_event(sui_event) {
176                        Ok(bridge_event) => Some(bridge_event),
177                        // On testnet some early bridge transactions could have zero value (before we disallow it in Move)
178                        Err(BridgeError::ZeroValueBridgeTransfer(_)) => {
179                            error!("Zero value bridge transfer: {:?}", sui_event);
180                            None
181                        }
182                        Err(e) => {
183                            panic!(
184                                "Sui Event could not be deserialzed to SuiBridgeEvent: {:?}",
185                                e
186                            );
187                        }
188                    }
189                })
190                .collect::<Vec<_>>();
191
192            let mut actions = vec![];
193            for (sui_event, opt_bridge_event) in events.iter().zip(bridge_events) {
194                if opt_bridge_event.is_none() {
195                    // TODO: we probably should not miss any events, log for now.
196                    metrics.sui_watcher_unrecognized_events.inc();
197                    error!("Sui event not recognized: {:?}", sui_event);
198                    continue;
199                }
200                // Unwrap safe: checked above
201                let bridge_event: SuiBridgeEvent = opt_bridge_event.unwrap();
202                info!("Observed Sui bridge event: {:?}", bridge_event);
203
204                // Send event to monitor
205                monitor_tx
206                    .send(bridge_event.clone())
207                    .await
208                    .expect("Sending event to monitor channel should not fail");
209
210                if let Some(mut action) = bridge_event.try_into_bridge_action() {
211                    metrics.last_observed_actions_seq_num.with_label_values(&[
212                        action.chain_id().to_string().as_str(),
213                        action.action_type().to_string().as_str(),
214                    ]);
215
216                    action = action.update_to_token_transfer();
217
218                    actions.push(action);
219                }
220            }
221
222            if !actions.is_empty() {
223                info!("Received {} actions from Sui: {:?}", actions.len(), actions);
224                metrics
225                    .sui_watcher_received_actions
226                    .inc_by(actions.len() as u64);
227                // Write action to pending WAL
228                store
229                    .insert_pending_actions(&actions)
230                    .expect("Store operation should not fail");
231                for action in actions {
232                    submit_to_executor(&executor_tx, action)
233                        .await
234                        .expect("Submit to executor should not fail");
235                }
236            }
237
238            // Unwrap safe: in the beginning of the loop we checked that events is not empty
239            let cursor = events.last().unwrap().id;
240            store
241                .update_sui_event_cursor(identifier, cursor)
242                .expect("Store operation should not fail");
243        }
244        panic!("Sui event channel was closed unexpectedly");
245    }
246
247    pub async fn run_sui_grpc_watcher(
248        store: Arc<BridgeOrchestratorTables>,
249        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
250        mut sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<GrpcSyncedEvents>,
251        monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
252        metrics: Arc<BridgeMetrics>,
253    ) {
254        info!("Starting sui gRPC watcher task");
255        while let Some((last_seq_num, events)) = sui_grpc_events_rx.recv().await {
256            if events.is_empty() {
257                continue;
258            }
259            info!(
260                "Received {} Sui events: last_seq_num={}",
261                events.len(),
262                last_seq_num
263            );
264            metrics
265                .sui_watcher_received_events
266                .inc_by(events.len() as u64);
267
268            let mut actions = vec![];
269            for bridge_event in events {
270                info!("Observed Sui bridge event (gRPC): {:?}", bridge_event);
271
272                // Send event to monitor
273                monitor_tx
274                    .send(bridge_event.clone())
275                    .await
276                    .expect("Sending event to monitor channel should not fail");
277
278                // Convert to action using the same flow as JSON-RPC watcher
279                if let Some(mut action) = bridge_event.try_into_bridge_action() {
280                    metrics.last_observed_actions_seq_num.with_label_values(&[
281                        action.chain_id().to_string().as_str(),
282                        action.action_type().to_string().as_str(),
283                    ]);
284
285                    action = action.update_to_token_transfer();
286                    actions.push(action);
287                }
288            }
289
290            if !actions.is_empty() {
291                info!(
292                    "Received {} actions from Sui gRPC: {:?}",
293                    actions.len(),
294                    actions
295                );
296                metrics
297                    .sui_watcher_received_actions
298                    .inc_by(actions.len() as u64);
299                // Write action to pending WAL
300                store
301                    .insert_pending_actions(&actions)
302                    .expect("Store operation should not fail");
303                for action in actions {
304                    submit_to_executor(&executor_tx, action)
305                        .await
306                        .expect("Submit to executor should not fail");
307                }
308            }
309
310            // Store the sequence number cursor
311            store
312                .update_sui_sequence_number_cursor(last_seq_num)
313                .expect("Store operation should not fail");
314        }
315        panic!("Sui gRPC event channel was closed unexpectedly");
316    }
317
318    async fn run_eth_watcher(
319        store: Arc<BridgeOrchestratorTables>,
320        executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
321        mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(
322            alloy::primitives::Address,
323            u64,
324            Vec<EthLog>,
325        )>,
326        eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
327        metrics: Arc<BridgeMetrics>,
328    ) {
329        info!("Starting eth watcher task");
330        while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
331            if logs.is_empty() {
332                store
333                    .update_eth_event_cursor(contract, end_block)
334                    .expect("Store operation should not fail");
335                continue;
336            }
337
338            info!("Received {} Eth events", logs.len());
339            metrics
340                .eth_watcher_received_events
341                .inc_by(logs.len() as u64);
342
343            let bridge_events = logs
344                .iter()
345                .map(EthBridgeEvent::try_from_eth_log)
346                .collect::<Vec<_>>();
347
348            let mut actions = vec![];
349            for (log, opt_bridge_event) in logs.iter().zip(bridge_events) {
350                if opt_bridge_event.is_none() {
351                    // TODO: we probably should not miss any events, log for now.
352                    metrics.eth_watcher_unrecognized_events.inc();
353                    error!("Eth event not recognized: {:?}", log);
354                    continue;
355                }
356                // Unwrap safe: checked above
357                let bridge_event = opt_bridge_event.unwrap();
358                info!("Observed Eth bridge event: {:?}", bridge_event);
359
360                // Send event to monitor
361                eth_monitor_tx
362                    .send(bridge_event.clone())
363                    .await
364                    .expect("Sending event to monitor channel should not fail");
365
366                match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
367                    Ok(Some(action)) => {
368                        metrics.last_observed_actions_seq_num.with_label_values(&[
369                            action.chain_id().to_string().as_str(),
370                            action.action_type().to_string().as_str(),
371                        ]);
372                        actions.push(action)
373                    }
374                    Ok(None) => {}
375                    Err(e) => {
376                        error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
377                    }
378                }
379            }
380            if !actions.is_empty() {
381                info!("Received {} actions from Eth: {:?}", actions.len(), actions);
382                metrics
383                    .eth_watcher_received_actions
384                    .inc_by(actions.len() as u64);
385                // Write action to pending WAL
386                store
387                    .insert_pending_actions(&actions)
388                    .expect("Store operation should not fail");
389                // Execution will remove the pending actions from DB when the action is completed.
390                for action in actions {
391                    submit_to_executor(&executor_tx, action)
392                        .await
393                        .expect("Submit to executor should not fail");
394                }
395            }
396
397            store
398                .update_eth_event_cursor(contract, end_block)
399                .expect("Store operation should not fail");
400        }
401        panic!("Eth event channel was closed");
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use crate::{
408        test_utils::{get_test_eth_to_sui_bridge_action, get_test_log_and_action},
409        types::BridgeActionDigest,
410    };
411    use alloy::primitives::TxHash;
412    use prometheus::Registry;
413    use std::str::FromStr;
414
415    use super::*;
416    use crate::events::SuiBridgeEvent;
417    use crate::events::init_all_struct_tags;
418    use crate::test_utils::get_test_sui_to_eth_bridge_action;
419    use crate::{events::tests::get_test_sui_event_and_action, sui_mock_client::SuiMockClient};
420
421    #[tokio::test]
422    async fn test_sui_watcher_task() {
423        // Note: this test may fail because of the following reasons:
424        // the SuiEvent's struct tag does not match the ones in events.rs
425
426        let (
427            sui_events_tx,
428            sui_events_rx,
429            _sui_grpc_events_tx,
430            sui_grpc_events_rx,
431            _eth_events_tx,
432            eth_events_rx,
433            sui_monitor_tx,
434            _sui_monitor_rx,
435            eth_monitor_tx,
436            _eth_monitor_rx,
437            sui_client,
438            store,
439        ) = setup();
440        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
441        // start orchestrator
442        let registry = Registry::new();
443        let metrics = Arc::new(BridgeMetrics::new(&registry));
444        let _handles = BridgeOrchestrator::new(
445            Arc::new(sui_client),
446            sui_events_rx,
447            sui_grpc_events_rx,
448            eth_events_rx,
449            store.clone(),
450            sui_monitor_tx,
451            eth_monitor_tx,
452            metrics,
453        )
454        .run(executor)
455        .await;
456
457        let identifier = Identifier::from_str("test_sui_watcher_task").unwrap();
458        let (sui_event, mut bridge_action) = get_test_sui_event_and_action(identifier.clone());
459        bridge_action = bridge_action.update_to_token_transfer();
460        sui_events_tx
461            .send((identifier.clone(), vec![sui_event.clone()]))
462            .await
463            .unwrap();
464
465        let start = std::time::Instant::now();
466        // Executor should have received the action
467        assert_eq!(
468            executor_requested_action_rx.recv().await.unwrap(),
469            bridge_action.digest()
470        );
471        loop {
472            let actions = store.get_all_pending_actions();
473            if actions.is_empty() {
474                if start.elapsed().as_secs() > 5 {
475                    panic!("Timed out waiting for action to be written to WAL");
476                }
477                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
478                continue;
479            }
480            assert_eq!(actions.len(), 1);
481            let action = actions.get(&bridge_action.digest()).unwrap();
482            assert_eq!(action, &bridge_action);
483            assert_eq!(
484                store.get_sui_event_cursors(&[identifier]).unwrap()[0].unwrap(),
485                sui_event.id,
486            );
487            break;
488        }
489    }
490
491    #[tokio::test]
492    async fn test_eth_watcher_task() {
493        // Note: this test may fail beacuse of the following reasons:
494        // 1. Log and BridgeAction returned from `get_test_log_and_action` are not in sync
495        // 2. Log returned from `get_test_log_and_action` is not parseable log (not abigen!, check abi.rs)
496
497        let (
498            _sui_events_tx,
499            sui_events_rx,
500            _sui_grpc_events_tx,
501            sui_grpc_events_rx,
502            eth_events_tx,
503            eth_events_rx,
504            sui_monitor_tx,
505            _sui_monitor_rx,
506            eth_monitor_tx,
507            _eth_monitor_rx,
508            sui_client,
509            store,
510        ) = setup();
511        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
512        // start orchestrator
513        let registry = Registry::new();
514        let metrics = Arc::new(BridgeMetrics::new(&registry));
515        let _handles = BridgeOrchestrator::new(
516            Arc::new(sui_client),
517            sui_events_rx,
518            sui_grpc_events_rx,
519            eth_events_rx,
520            store.clone(),
521            sui_monitor_tx,
522            eth_monitor_tx,
523            metrics,
524        )
525        .run(executor)
526        .await;
527        let address = EthAddress::random();
528        let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
529        let log_index_in_tx = 10;
530        let log_block_num = log.block_number.unwrap();
531        let eth_log = EthLog {
532            log: log.clone(),
533            tx_hash: log.transaction_hash.unwrap(),
534            block_number: log_block_num,
535            log_index_in_tx,
536        };
537        let end_block_num = log_block_num + 15;
538
539        eth_events_tx
540            .send((address, end_block_num, vec![eth_log.clone()]))
541            .await
542            .unwrap();
543
544        // Executor should have received the action
545        assert_eq!(
546            executor_requested_action_rx.recv().await.unwrap(),
547            bridge_action.digest()
548        );
549        let start = std::time::Instant::now();
550        loop {
551            let actions = store.get_all_pending_actions();
552            if actions.is_empty() {
553                if start.elapsed().as_secs() > 5 {
554                    panic!("Timed out waiting for action to be written to WAL");
555                }
556                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
557                continue;
558            }
559            assert_eq!(actions.len(), 1);
560            let action = actions.get(&bridge_action.digest()).unwrap();
561            assert_eq!(action, &bridge_action);
562            assert_eq!(
563                store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
564                end_block_num,
565            );
566            break;
567        }
568    }
569
570    #[tokio::test]
571    async fn test_sui_grpc_watcher_task() {
572        let (
573            _sui_events_tx,
574            sui_events_rx,
575            sui_grpc_events_tx,
576            sui_grpc_events_rx,
577            _eth_events_tx,
578            eth_events_rx,
579            sui_monitor_tx,
580            _sui_monitor_rx,
581            eth_monitor_tx,
582            _eth_monitor_rx,
583            sui_client,
584            store,
585        ) = setup();
586        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
587        // start orchestrator with gRPC
588        let registry = Registry::new();
589        let metrics = Arc::new(BridgeMetrics::new(&registry));
590        let _handles = BridgeOrchestrator::new(
591            Arc::new(sui_client),
592            sui_events_rx,
593            sui_grpc_events_rx,
594            eth_events_rx,
595            store.clone(),
596            sui_monitor_tx,
597            eth_monitor_tx,
598            metrics,
599        )
600        .run_with_grpc(executor)
601        .await;
602
603        let identifier = Identifier::from_str("test_sui_grpc_watcher_task").unwrap();
604        let (sui_event, mut bridge_action) = get_test_sui_event_and_action(identifier);
605        bridge_action = bridge_action.update_to_token_transfer();
606
607        // Convert SuiEvent to SuiBridgeEvent
608        let bridge_event = SuiBridgeEvent::try_from_sui_event(&sui_event)
609            .unwrap()
610            .unwrap();
611
612        let last_seq_num = 42u64;
613        sui_grpc_events_tx
614            .send((last_seq_num, vec![bridge_event]))
615            .await
616            .unwrap();
617
618        let start = std::time::Instant::now();
619        // Executor should have received the action
620        assert_eq!(
621            executor_requested_action_rx.recv().await.unwrap(),
622            bridge_action.digest()
623        );
624        loop {
625            let actions = store.get_all_pending_actions();
626            if actions.is_empty() {
627                if start.elapsed().as_secs() > 5 {
628                    panic!("Timed out waiting for action to be written to WAL");
629                }
630                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
631                continue;
632            }
633            assert_eq!(actions.len(), 1);
634            let action = actions.get(&bridge_action.digest()).unwrap();
635            assert_eq!(action, &bridge_action);
636            // Verify sequence number cursor was updated
637            assert_eq!(
638                store.get_sui_sequence_number_cursor().unwrap().unwrap(),
639                last_seq_num,
640            );
641            break;
642        }
643    }
644
645    #[tokio::test]
646    /// Test that when orchestrator starts with gRPC, all pending actions are sent to executor
647    async fn test_resume_actions_in_pending_logs_with_grpc() {
648        let (
649            _sui_events_tx,
650            sui_events_rx,
651            _sui_grpc_events_tx,
652            sui_grpc_events_rx,
653            _eth_events_tx,
654            eth_events_rx,
655            sui_monitor_tx,
656            _sui_monitor_rx,
657            eth_monitor_tx,
658            _eth_monitor_rx,
659            sui_client,
660            store,
661        ) = setup();
662        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
663
664        let action1 = get_test_sui_to_eth_bridge_action(
665            None,
666            Some(0),
667            Some(99),
668            Some(10000),
669            None,
670            None,
671            None,
672        );
673
674        let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
675        store
676            .insert_pending_actions(&vec![action1.clone(), action2.clone()])
677            .unwrap();
678
679        // start orchestrator with gRPC
680        let registry = Registry::new();
681        let metrics = Arc::new(BridgeMetrics::new(&registry));
682        let _handles = BridgeOrchestrator::new(
683            Arc::new(sui_client),
684            sui_events_rx,
685            sui_grpc_events_rx,
686            eth_events_rx,
687            store.clone(),
688            sui_monitor_tx,
689            eth_monitor_tx,
690            metrics,
691        )
692        .run_with_grpc(executor)
693        .await;
694
695        // Executor should have received the action
696        let mut digests = std::collections::HashSet::new();
697        digests.insert(executor_requested_action_rx.recv().await.unwrap());
698        digests.insert(executor_requested_action_rx.recv().await.unwrap());
699        assert!(digests.contains(&action1.digest()));
700        assert!(digests.contains(&action2.digest()));
701        assert_eq!(digests.len(), 2);
702    }
703
704    #[tokio::test]
705    /// Test that when orchestrator starts, all pending actions are sent to executor
706    async fn test_resume_actions_in_pending_logs() {
707        let (
708            _sui_events_tx,
709            sui_events_rx,
710            _sui_grpc_events_tx,
711            sui_grpc_events_rx,
712            _eth_events_tx,
713            eth_events_rx,
714            sui_monitor_tx,
715            _sui_monitor_rx,
716            eth_monitor_tx,
717            _eth_monitor_rx,
718            sui_client,
719            store,
720        ) = setup();
721        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
722
723        let action1 = get_test_sui_to_eth_bridge_action(
724            None,
725            Some(0),
726            Some(99),
727            Some(10000),
728            None,
729            None,
730            None,
731        );
732
733        let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
734        store
735            .insert_pending_actions(&vec![action1.clone(), action2.clone()])
736            .unwrap();
737
738        // start orchestrator
739        let registry = Registry::new();
740        let metrics = Arc::new(BridgeMetrics::new(&registry));
741        let _handles = BridgeOrchestrator::new(
742            Arc::new(sui_client),
743            sui_events_rx,
744            sui_grpc_events_rx,
745            eth_events_rx,
746            store.clone(),
747            sui_monitor_tx,
748            eth_monitor_tx,
749            metrics,
750        )
751        .run(executor)
752        .await;
753
754        // Executor should have received the action
755        let mut digests = std::collections::HashSet::new();
756        digests.insert(executor_requested_action_rx.recv().await.unwrap());
757        digests.insert(executor_requested_action_rx.recv().await.unwrap());
758        assert!(digests.contains(&action1.digest()));
759        assert!(digests.contains(&action2.digest()));
760        assert_eq!(digests.len(), 2);
761    }
762
763    #[allow(clippy::type_complexity)]
764    fn setup() -> (
765        mysten_metrics::metered_channel::Sender<(Identifier, Vec<SuiEvent>)>,
766        mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
767        mysten_metrics::metered_channel::Sender<(u64, Vec<SuiBridgeEvent>)>,
768        mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
769        mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
770        mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
771        mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
772        mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
773        mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
774        mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
775        SuiClient<SuiMockClient>,
776        Arc<BridgeOrchestratorTables>,
777    ) {
778        telemetry_subscribers::init_for_testing();
779        let registry = Registry::new();
780        mysten_metrics::init_metrics(&registry);
781
782        init_all_struct_tags();
783
784        let temp_dir = tempfile::tempdir().unwrap();
785        let store = BridgeOrchestratorTables::new(temp_dir.path());
786
787        let mock_client = SuiMockClient::default();
788        let sui_client = SuiClient::new_for_testing(mock_client.clone());
789
790        let (eth_events_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
791            100,
792            &mysten_metrics::get_metrics()
793                .unwrap()
794                .channel_inflight
795                .with_label_values(&["unit_test_eth_events_queue"]),
796        );
797
798        let (sui_events_tx, sui_events_rx) = mysten_metrics::metered_channel::channel(
799            100,
800            &mysten_metrics::get_metrics()
801                .unwrap()
802                .channel_inflight
803                .with_label_values(&["unit_test_legacy_sui_events_queue"]),
804        );
805        let (sui_grpc_events_tx, sui_grpc_events_rx) = mysten_metrics::metered_channel::channel(
806            100,
807            &mysten_metrics::get_metrics()
808                .unwrap()
809                .channel_inflight
810                .with_label_values(&["unit_test_sui_events_queue"]),
811        );
812        let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
813            10000,
814            &mysten_metrics::get_metrics()
815                .unwrap()
816                .channel_inflight
817                .with_label_values(&["sui_monitor_queue"]),
818        );
819        let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
820            10000,
821            &mysten_metrics::get_metrics()
822                .unwrap()
823                .channel_inflight
824                .with_label_values(&["eth_monitor_queue"]),
825        );
826        (
827            sui_events_tx,
828            sui_events_rx,
829            sui_grpc_events_tx,
830            sui_grpc_events_rx,
831            eth_events_tx,
832            eth_events_rx,
833            sui_monitor_tx,
834            sui_monitor_rx,
835            eth_monitor_tx,
836            eth_monitor_rx,
837            sui_client,
838            store,
839        )
840    }
841
842    /// A `BridgeActionExecutorTrait` implementation that only tracks the submitted actions.
843    struct MockExecutor {
844        requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
845    }
846
847    impl MockExecutor {
848        fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
849            let (tx, rx) = tokio::sync::broadcast::channel(100);
850            (
851                Self {
852                    requested_transactions_tx: tx,
853                },
854                rx,
855            )
856        }
857    }
858
859    impl BridgeActionExecutorTrait for MockExecutor {
860        fn run(
861            self,
862        ) -> (
863            Vec<tokio::task::JoinHandle<()>>,
864            mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
865        ) {
866            let (tx, mut rx) =
867                mysten_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
868                    100,
869                    &mysten_metrics::get_metrics()
870                        .unwrap()
871                        .channel_inflight
872                        .with_label_values(&["unit_test_mock_executor"]),
873                );
874
875            let handles = tokio::spawn(async move {
876                while let Some(action) = rx.recv().await {
877                    self.requested_transactions_tx
878                        .send(action.0.digest())
879                        .unwrap();
880                }
881            });
882            (vec![handles], tx)
883        }
884    }
885}