1use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::{
13    SuiExecutionStatus, SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse,
14};
15use sui_types::TypeTag;
16use sui_types::transaction::ObjectArg;
17use sui_types::{
18    base_types::{ObjectID, ObjectRef, SuiAddress},
19    crypto::{Signature, SuiKeyPair},
20    digests::TransactionDigest,
21    gas_coin::GasCoin,
22    object::Owner,
23    transaction::Transaction,
24};
25
26use crate::events::{
27    TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
28    TokenTransferClaimed,
29};
30use crate::metrics::BridgeMetrics;
31use crate::{
32    client::bridge_authority_aggregator::BridgeAuthorityAggregator,
33    error::BridgeError,
34    storage::BridgeOrchestratorTables,
35    sui_client::{SuiClient, SuiClientInner},
36    sui_transaction_builder::build_sui_transaction,
37    types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
38};
39use std::collections::HashMap;
40use std::sync::Arc;
41use tokio::sync::Semaphore;
42use tokio::time::Duration;
43use tracing::{Instrument, error, info, instrument, warn};
44
45pub const CHANNEL_SIZE: usize = 1000;
46pub const SIGNING_CONCURRENCY: usize = 10;
47
48pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
51pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
52
53async fn delay(attempt_times: u64) {
54    let delay_ms = 100 * (2 ^ attempt_times);
55    tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
56}
57
58#[derive(Debug)]
59pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
60
61#[derive(Debug)]
62pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
63
64pub trait BridgeActionExecutorTrait {
65    fn run(
66        self,
67    ) -> (
68        Vec<tokio::task::JoinHandle<()>>,
69        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
70    );
71}
72
73pub struct BridgeActionExecutor<C> {
74    sui_client: Arc<SuiClient<C>>,
75    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
76    key: SuiKeyPair,
77    sui_address: SuiAddress,
78    gas_object_id: ObjectID,
79    store: Arc<BridgeOrchestratorTables>,
80    bridge_object_arg: ObjectArg,
81    sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
82    bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
83    metrics: Arc<BridgeMetrics>,
84}
85
86impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
87where
88    C: SuiClientInner + 'static,
89{
90    fn run(
91        self,
92    ) -> (
93        Vec<tokio::task::JoinHandle<()>>,
94        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
95    ) {
96        let (tasks, sender, _) = self.run_inner();
97        (tasks, sender)
98    }
99}
100
101impl<C> BridgeActionExecutor<C>
102where
103    C: SuiClientInner + 'static,
104{
105    pub async fn new(
106        sui_client: Arc<SuiClient<C>>,
107        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
108        store: Arc<BridgeOrchestratorTables>,
109        key: SuiKeyPair,
110        sui_address: SuiAddress,
111        gas_object_id: ObjectID,
112        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
113        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
114        metrics: Arc<BridgeMetrics>,
115    ) -> Self {
116        let bridge_object_arg = sui_client
117            .get_mutable_bridge_object_arg_must_succeed()
118            .await;
119        Self {
120            sui_client,
121            bridge_auth_agg,
122            store,
123            key,
124            gas_object_id,
125            sui_address,
126            bridge_object_arg,
127            sui_token_type_tags,
128            bridge_pause_rx,
129            metrics,
130        }
131    }
132
133    fn run_inner(
134        self,
135    ) -> (
136        Vec<tokio::task::JoinHandle<()>>,
137        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
138        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
139    ) {
140        let key = self.key;
141
142        let (sender, receiver) = mysten_metrics::metered_channel::channel(
143            CHANNEL_SIZE,
144            &mysten_metrics::get_metrics()
145                .unwrap()
146                .channel_inflight
147                .with_label_values(&["executor_signing_queue"]),
148        );
149
150        let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
151            CHANNEL_SIZE,
152            &mysten_metrics::get_metrics()
153                .unwrap()
154                .channel_inflight
155                .with_label_values(&["executor_execution_queue"]),
156        );
157        let execution_tx_clone = execution_tx.clone();
158        let sender_clone = sender.clone();
159        let store_clone = self.store.clone();
160        let client_clone = self.sui_client.clone();
161        let mut tasks = vec![];
162        let metrics = self.metrics.clone();
163        tasks.push(spawn_logged_monitored_task!(
164            Self::run_signature_aggregation_loop(
165                client_clone,
166                self.bridge_auth_agg,
167                store_clone,
168                sender_clone,
169                receiver,
170                execution_tx_clone,
171                metrics,
172            )
173        ));
174
175        let metrics = self.metrics.clone();
176        let execution_tx_clone = execution_tx.clone();
177        tasks.push(spawn_logged_monitored_task!(
178            Self::run_onchain_execution_loop(
179                self.sui_client.clone(),
180                key,
181                self.sui_address,
182                self.gas_object_id,
183                self.store.clone(),
184                execution_tx_clone,
185                execution_rx,
186                self.bridge_object_arg,
187                self.sui_token_type_tags,
188                self.bridge_pause_rx,
189                metrics,
190            )
191        ));
192        (tasks, sender, execution_tx)
193    }
194
195    async fn run_signature_aggregation_loop(
196        sui_client: Arc<SuiClient<C>>,
197        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
198        store: Arc<BridgeOrchestratorTables>,
199        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
200        mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
201            BridgeActionExecutionWrapper,
202        >,
203        execution_queue_sender: mysten_metrics::metered_channel::Sender<
204            CertifiedBridgeActionExecutionWrapper,
205        >,
206        metrics: Arc<BridgeMetrics>,
207    ) {
208        info!("Starting run_signature_aggregation_loop");
209        let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
210        while let Some(action) = signing_queue_receiver.recv().await {
211            Self::handle_signing_task(
212                &semaphore,
213                &auth_agg,
214                &signing_queue_sender,
215                &execution_queue_sender,
216                &sui_client,
217                &store,
218                action,
219                &metrics,
220            )
221            .await;
222        }
223    }
224
225    async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
226        let Ok(Ok(is_paused)) =
227            retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
228        else {
229            error!("Failed to get bridge status after retry");
230            return false;
231        };
232        !is_paused
233    }
234
235    #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
236    async fn handle_signing_task(
237        semaphore: &Arc<Semaphore>,
238        auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
239        signing_queue_sender: &mysten_metrics::metered_channel::Sender<
240            BridgeActionExecutionWrapper,
241        >,
242        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
243            CertifiedBridgeActionExecutionWrapper,
244        >,
245        sui_client: &Arc<SuiClient<C>>,
246        store: &Arc<BridgeOrchestratorTables>,
247        action: BridgeActionExecutionWrapper,
248        metrics: &Arc<BridgeMetrics>,
249    ) {
250        metrics.action_executor_signing_queue_received_actions.inc();
251        let action_key = action.0.key();
252        info!("Received action for signing: {:?}", action.0);
253
254        let should_proceed = Self::should_proceed_signing(sui_client).await;
259        if !should_proceed {
260            metrics.action_executor_signing_queue_skipped_actions.inc();
261            warn!("skipping signing task: {:?}", action_key);
262            return;
263        }
264
265        let auth_agg_clone = auth_agg.clone();
266        let signing_queue_sender_clone = signing_queue_sender.clone();
267        let execution_queue_sender_clone = execution_queue_sender.clone();
268        let sui_client_clone = sui_client.clone();
269        let store_clone = store.clone();
270        let metrics_clone = metrics.clone();
271        let semaphore_clone = semaphore.clone();
272        spawn_logged_monitored_task!(
273            Self::request_signatures(
274                semaphore_clone,
275                sui_client_clone,
276                auth_agg_clone,
277                action,
278                store_clone,
279                signing_queue_sender_clone,
280                execution_queue_sender_clone,
281                metrics_clone,
282            )
283            .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
284            "request_signatures"
285        );
286    }
287
288    async fn handle_already_processed_token_transfer_action_maybe(
292        sui_client: &Arc<SuiClient<C>>,
293        action: &BridgeAction,
294        store: &Arc<BridgeOrchestratorTables>,
295        metrics: &Arc<BridgeMetrics>,
296    ) -> bool {
297        let status = sui_client
298            .get_token_transfer_action_onchain_status_until_success(
299                action.chain_id() as u8,
300                action.seq_number(),
301            )
302            .await;
303        match status {
304            BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
305                info!(
306                    "Action already approved or claimed, removing action from pending logs: {:?}",
307                    action
308                );
309                metrics.action_executor_already_processed_actions.inc();
310                store
311                    .remove_pending_actions(&[action.digest()])
312                    .unwrap_or_else(|e| {
313                        panic!("Write to DB should not fail: {:?}", e);
314                    });
315                true
316            }
317            BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
320        }
321    }
322
323    async fn request_signatures(
326        semaphore: Arc<Semaphore>,
327        sui_client: Arc<SuiClient<C>>,
328        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
329        action: BridgeActionExecutionWrapper,
330        store: Arc<BridgeOrchestratorTables>,
331        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
332        execution_queue_sender: mysten_metrics::metered_channel::Sender<
333            CertifiedBridgeActionExecutionWrapper,
334        >,
335        metrics: Arc<BridgeMetrics>,
336    ) {
337        let _permit = semaphore
338            .acquire()
339            .await
340            .expect("semaphore should not be closed");
341        info!("requesting signatures");
342        let BridgeActionExecutionWrapper(action, attempt_times) = action;
343
344        match &action {
346            BridgeAction::SuiToEthBridgeAction(_) | BridgeAction::EthToSuiBridgeAction(_) => (),
347            _ => unreachable!("Non token transfer action should not reach here"),
348        };
349
350        if Self::handle_already_processed_token_transfer_action_maybe(
352            &sui_client,
353            &action,
354            &store,
355            &metrics,
356        )
357        .await
358        {
359            return;
360        }
361        match auth_agg
362            .load()
363            .request_committee_signatures(action.clone())
364            .await
365        {
366            Ok(certificate) => {
367                info!("Sending certificate to execution");
368                execution_queue_sender
369                    .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
370                    .await
371                    .unwrap_or_else(|e| {
372                        panic!("Sending to execution queue should not fail: {:?}", e);
373                    });
374            }
375            Err(e) => {
376                warn!("Failed to collect sigs for bridge action: {:?}", e);
377                metrics.err_signature_aggregation.inc();
378
379                if attempt_times >= MAX_SIGNING_ATTEMPTS {
381                    metrics.err_signature_aggregation_too_many_failures.inc();
382                    error!(
383                        "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
384                        e
385                    );
386                    return;
387                }
388                delay(attempt_times).await;
389                signing_queue_sender
390                    .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
391                    .await
392                    .unwrap_or_else(|e| {
393                        panic!("Sending to signing queue should not fail: {:?}", e);
394                    });
395            }
396        }
397    }
398
399    async fn run_onchain_execution_loop(
402        sui_client: Arc<SuiClient<C>>,
403        sui_key: SuiKeyPair,
404        sui_address: SuiAddress,
405        gas_object_id: ObjectID,
406        store: Arc<BridgeOrchestratorTables>,
407        execution_queue_sender: mysten_metrics::metered_channel::Sender<
408            CertifiedBridgeActionExecutionWrapper,
409        >,
410        mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
411            CertifiedBridgeActionExecutionWrapper,
412        >,
413        bridge_object_arg: ObjectArg,
414        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
415        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
416        metrics: Arc<BridgeMetrics>,
417    ) {
418        info!("Starting run_onchain_execution_loop");
419        while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
420            if *bridge_pause_rx.borrow() {
424                warn!("Bridge is paused, skipping execution");
425                metrics
426                    .action_executor_execution_queue_skipped_actions_due_to_pausing
427                    .inc();
428                continue;
429            }
430            Self::handle_execution_task(
431                certificate_wrapper,
432                &sui_client,
433                &sui_key,
434                &sui_address,
435                gas_object_id,
436                &store,
437                &execution_queue_sender,
438                &bridge_object_arg,
439                &sui_token_type_tags,
440                &metrics,
441            )
442            .await;
443        }
444        panic!("Execution queue closed unexpectedly");
445    }
446
447    #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
448    async fn handle_execution_task(
449        certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
450        sui_client: &Arc<SuiClient<C>>,
451        sui_key: &SuiKeyPair,
452        sui_address: &SuiAddress,
453        gas_object_id: ObjectID,
454        store: &Arc<BridgeOrchestratorTables>,
455        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
456            CertifiedBridgeActionExecutionWrapper,
457        >,
458        bridge_object_arg: &ObjectArg,
459        sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
460        metrics: &Arc<BridgeMetrics>,
461    ) {
462        metrics
463            .action_executor_execution_queue_received_actions
464            .inc();
465        let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
466        let action = certificate.data();
467        let action_key = action.key();
468
469        info!("Received certified action for execution: {:?}", action);
470
471        let (gas_coin, gas_object_ref) =
473            Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
474        metrics.gas_coin_balance.set(gas_coin.value() as i64);
475
476        let ceriticate_clone = certificate.clone();
477
478        if Self::handle_already_processed_token_transfer_action_maybe(
480            sui_client, action, store, metrics,
481        )
482        .await
483        {
484            info!("Action already processed, skipping");
485            return;
486        }
487
488        info!("Building Sui transaction");
489        let rgp = sui_client.get_reference_gas_price_until_success().await;
490        let tx_data = match build_sui_transaction(
491            *sui_address,
492            &gas_object_ref,
493            ceriticate_clone,
494            *bridge_object_arg,
495            sui_token_type_tags.load().as_ref(),
496            rgp,
497        ) {
498            Ok(tx_data) => tx_data,
499            Err(err) => {
500                metrics.err_build_sui_transaction.inc();
501                error!(
502                    "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
503                    action, err
504                );
505                return;
508            }
509        };
510        let sig = Signature::new_secure(
511            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
512            sui_key,
513        );
514        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
515        let tx_digest = *signed_tx.digest();
516
517        if Self::handle_already_processed_token_transfer_action_maybe(
519            sui_client, action, store, metrics,
520        )
521        .await
522        {
523            info!("Action already processed, skipping");
524            return;
525        }
526
527        info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
528        match sui_client
529            .execute_transaction_block_with_effects(signed_tx)
530            .await
531        {
532            Ok(resp) => {
533                Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
534            }
535
536            Err(err) => {
538                error!(
539                    ?action_key,
540                    ?tx_digest,
541                    "Sui transaction failed at signing: {err:?}"
542                );
543                metrics.err_sui_transaction_submission.inc();
544                let metrics_clone = metrics.clone();
545                let sender_clone = execution_queue_sender.clone();
547                spawn_logged_monitored_task!(async move {
548                    if attempt_times >= MAX_EXECUTION_ATTEMPTS {
550                        metrics_clone
551                            .err_sui_transaction_submission_too_many_failures
552                            .inc();
553                        error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
554                        return;
555                    }
556                    delay(attempt_times).await;
557                    sender_clone
558                        .send(CertifiedBridgeActionExecutionWrapper(
559                            certificate,
560                            attempt_times + 1,
561                        ))
562                        .await
563                        .unwrap_or_else(|e| {
564                            panic!("Sending to execution queue should not fail: {:?}", e);
565                        });
566                    info!("Re-enqueued certificate for execution");
567                }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
568            }
569        }
570    }
571
572    async fn handle_execution_effects(
574        tx_digest: TransactionDigest,
575        response: SuiTransactionBlockResponse,
576        store: &Arc<BridgeOrchestratorTables>,
577        action: &BridgeAction,
578        metrics: &Arc<BridgeMetrics>,
579    ) {
580        let effects = response
581            .effects
582            .clone()
583            .expect("We requested effects but got None.");
584        let status = effects.status();
585        match status {
586            SuiExecutionStatus::Success => {
587                let events = response.events.expect("We requested events but got None.");
588                let relevant_events = events
589                    .data
590                    .iter()
591                    .filter(|e| {
592                        e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
593                            || e.type_ == *TokenTransferClaimed.get().unwrap()
594                            || e.type_ == *TokenTransferApproved.get().unwrap()
595                            || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
596                    })
597                    .collect::<Vec<_>>();
598                assert!(
599                    !relevant_events.is_empty(),
600                    "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
601                    or TokenTransferAlreadyApproved event but got: {:?}",
602                    events
603                );
604                info!(?tx_digest, "Sui transaction executed successfully");
605                relevant_events.iter().for_each(|e| {
607                    if e.type_ == *TokenTransferClaimed.get().unwrap() {
608                        match action {
609                            BridgeAction::EthToSuiBridgeAction(_) => {
610                                metrics.eth_sui_token_transfer_claimed.inc();
611                            }
612                            BridgeAction::SuiToEthBridgeAction(_) => {
613                                metrics.sui_eth_token_transfer_claimed.inc();
614                            }
615                            _ => error!("Unexpected action type for claimed event: {:?}", action),
616                        }
617                    } else if e.type_ == *TokenTransferApproved.get().unwrap() {
618                        match action {
619                            BridgeAction::EthToSuiBridgeAction(_) => {
620                                metrics.eth_sui_token_transfer_approved.inc();
621                            }
622                            BridgeAction::SuiToEthBridgeAction(_) => {
623                                metrics.sui_eth_token_transfer_approved.inc();
624                            }
625                            _ => error!("Unexpected action type for approved event: {:?}", action),
626                        }
627                    }
628                });
629                store
630                    .remove_pending_actions(&[action.digest()])
631                    .unwrap_or_else(|e| {
632                        panic!("Write to DB should not fail: {:?}", e);
633                    })
634            }
635            SuiExecutionStatus::Failure { error } => {
636                metrics.err_sui_transaction_execution.inc();
643                error!(
644                    ?tx_digest,
645                    "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
646                );
647            }
648        }
649    }
650
651    async fn get_gas_data_assert_ownership(
653        sui_address: SuiAddress,
654        gas_object_id: ObjectID,
655        sui_client: &SuiClient<C>,
656    ) -> (GasCoin, ObjectRef) {
657        let (gas_coin, gas_obj_ref, owner) = sui_client
658            .get_gas_data_panic_if_not_gas(gas_object_id)
659            .await;
660
661        assert_eq!(
664            owner,
665            Owner::AddressOwner(sui_address),
666            "Gas object {:?} is no longer owned by address {}",
667            gas_object_id,
668            sui_address
669        );
670        (gas_coin, gas_obj_ref)
671    }
672}
673
674pub async fn submit_to_executor(
675    tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
676    action: BridgeAction,
677) -> Result<(), BridgeError> {
678    tx.send(BridgeActionExecutionWrapper(action, 0))
679        .await
680        .map_err(|e| BridgeError::Generic(e.to_string()))
681}
682
683#[cfg(test)]
684mod tests {
685    use crate::events::init_all_struct_tags;
686    use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
687    use crate::types::BRIDGE_PAUSED;
688    use fastcrypto::traits::KeyPair;
689    use prometheus::Registry;
690    use std::collections::{BTreeMap, HashMap};
691    use std::str::FromStr;
692    use sui_json_rpc_types::SuiTransactionBlockEffects;
693    use sui_json_rpc_types::SuiTransactionBlockEvents;
694    use sui_json_rpc_types::{SuiEvent, SuiTransactionBlockResponse};
695    use sui_types::TypeTag;
696    use sui_types::crypto::get_key_pair;
697    use sui_types::gas_coin::GasCoin;
698    use sui_types::{base_types::random_object_ref, transaction::TransactionData};
699
700    use crate::{
701        crypto::{
702            BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
703            BridgeAuthorityRecoverableSignature,
704        },
705        server::mock_handler::BridgeRequestMockHandler,
706        sui_mock_client::SuiMockClient,
707        test_utils::{
708            get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
709            get_test_sui_to_eth_bridge_action, sign_action_with_key,
710        },
711        types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
712    };
713
714    use super::*;
715
716    #[tokio::test]
717    async fn test_onchain_execution_loop() {
718        let (
719            signing_tx,
720            _execution_tx,
721            sui_client_mock,
722            mut tx_subscription,
723            store,
724            secrets,
725            dummy_sui_key,
726            mock0,
727            mock1,
728            mock2,
729            mock3,
730            _handles,
731            gas_object_ref,
732            sui_address,
733            sui_token_type_tags,
734            _bridge_pause_tx,
735        ) = setup().await;
736        let (action_certificate, _, _) = get_bridge_authority_approved_action(
737            vec![&mock0, &mock1, &mock2, &mock3],
738            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
739            None,
740            true,
741        );
742        let action = action_certificate.data().clone();
743        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
744        let tx_data = build_sui_transaction(
745            sui_address,
746            &gas_object_ref,
747            action_certificate,
748            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
749            &id_token_map,
750            1000,
751        )
752        .unwrap();
753
754        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
755
756        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
758            gas_coin.clone(),
759            gas_object_ref,
760            Owner::AddressOwner(sui_address),
761        );
762
763        let mut event = SuiEvent::random_for_testing();
765        event.type_ = TokenTransferClaimed.get().unwrap().clone();
766        let events = vec![event];
767        mock_transaction_response(
768            &sui_client_mock,
769            tx_digest,
770            SuiExecutionStatus::Success,
771            Some(events),
772            true,
773        );
774
775        store
776            .insert_pending_actions(std::slice::from_ref(&action))
777            .unwrap();
778        assert_eq!(
779            store.get_all_pending_actions()[&action.digest()],
780            action.clone()
781        );
782
783        submit_to_executor(&signing_tx, action.clone())
785            .await
786            .unwrap();
787
788        tx_subscription.recv().await.unwrap();
790        assert!(store.get_all_pending_actions().is_empty());
791
792        let (action_certificate, _, _) = get_bridge_authority_approved_action(
797            vec![&mock0, &mock1, &mock2, &mock3],
798            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
799            None,
800            true,
801        );
802
803        let action = action_certificate.data().clone();
804
805        let tx_data = build_sui_transaction(
806            sui_address,
807            &gas_object_ref,
808            action_certificate,
809            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
810            &id_token_map,
811            1000,
812        )
813        .unwrap();
814        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
815
816        mock_transaction_response(
818            &sui_client_mock,
819            tx_digest,
820            SuiExecutionStatus::Failure {
821                error: "failure is mother of success".to_string(),
822            },
823            None,
824            true,
825        );
826
827        store
828            .insert_pending_actions(std::slice::from_ref(&action))
829            .unwrap();
830        assert_eq!(
831            store.get_all_pending_actions()[&action.digest()],
832            action.clone()
833        );
834
835        submit_to_executor(&signing_tx, action.clone())
837            .await
838            .unwrap();
839
840        tx_subscription.recv().await.unwrap();
842        assert_eq!(
844            store.get_all_pending_actions()[&action.digest()],
845            action.clone()
846        );
847
848        let (action_certificate, _, _) = get_bridge_authority_approved_action(
853            vec![&mock0, &mock1, &mock2, &mock3],
854            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
855            None,
856            true,
857        );
858
859        let action = action_certificate.data().clone();
860
861        let tx_data = build_sui_transaction(
862            sui_address,
863            &gas_object_ref,
864            action_certificate,
865            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
866            &id_token_map,
867            1000,
868        )
869        .unwrap();
870        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
871        mock_transaction_error(
872            &sui_client_mock,
873            tx_digest,
874            BridgeError::Generic("some random error".to_string()),
875            true,
876        );
877
878        store
879            .insert_pending_actions(std::slice::from_ref(&action))
880            .unwrap();
881        assert_eq!(
882            store.get_all_pending_actions()[&action.digest()],
883            action.clone()
884        );
885
886        submit_to_executor(&signing_tx, action.clone())
888            .await
889            .unwrap();
890
891        let tx_digest = tx_subscription.recv().await.unwrap();
893        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
894
895        assert!(
897            store
898                .get_all_pending_actions()
899                .contains_key(&action.digest())
900        );
901
902        let mut event = SuiEvent::random_for_testing();
904        event.type_ = TokenTransferClaimed.get().unwrap().clone();
905        let events = vec![event];
906        mock_transaction_response(
907            &sui_client_mock,
908            tx_digest,
909            SuiExecutionStatus::Success,
910            Some(events),
911            true,
912        );
913
914        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
916        assert!(
918            !store
919                .get_all_pending_actions()
920                .contains_key(&action.digest())
921        );
922    }
923
924    #[tokio::test]
925    async fn test_signature_aggregation_loop() {
926        let (
927            signing_tx,
928            _execution_tx,
929            sui_client_mock,
930            mut tx_subscription,
931            store,
932            secrets,
933            dummy_sui_key,
934            mock0,
935            mock1,
936            mock2,
937            mock3,
938            _handles,
939            gas_object_ref,
940            sui_address,
941            sui_token_type_tags,
942            _bridge_pause_tx,
943        ) = setup().await;
944        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
945        let (action_certificate, sui_tx_digest, sui_tx_event_index) =
946            get_bridge_authority_approved_action(
947                vec![&mock0, &mock1, &mock2, &mock3],
948                vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
949                None,
950                true,
951            );
952        let action = action_certificate.data().clone();
953        mock_bridge_authority_signing_errors(
954            vec![&mock0, &mock1, &mock2],
955            sui_tx_digest,
956            sui_tx_event_index,
957        );
958        let mut sigs = mock_bridge_authority_sigs(
959            vec![&mock3],
960            &action,
961            vec![&secrets[3]],
962            sui_tx_digest,
963            sui_tx_event_index,
964        );
965
966        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
968            gas_coin,
969            gas_object_ref,
970            Owner::AddressOwner(sui_address),
971        );
972        store
973            .insert_pending_actions(std::slice::from_ref(&action))
974            .unwrap();
975        assert_eq!(
976            store.get_all_pending_actions()[&action.digest()],
977            action.clone()
978        );
979
980        submit_to_executor(&signing_tx, action.clone())
982            .await
983            .unwrap();
984
985        loop {
987            let requested_times =
988                mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
989            if requested_times >= 2 {
990                break;
991            }
992            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
993        }
994        assert_eq!(
996            tx_subscription.try_recv().unwrap_err(),
997            tokio::sync::broadcast::error::TryRecvError::Empty
998        );
999        assert_eq!(
1001            store.get_all_pending_actions()[&action.digest()],
1002            action.clone()
1003        );
1004
1005        let sig_from_2 = mock_bridge_authority_sigs(
1007            vec![&mock2],
1008            &action,
1009            vec![&secrets[2]],
1010            sui_tx_digest,
1011            sui_tx_event_index,
1012        );
1013        sigs.extend(sig_from_2);
1014        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1015            action.clone(),
1016            BridgeCommitteeValiditySignInfo { signatures: sigs },
1017        );
1018        let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1019        let tx_data = build_sui_transaction(
1020            sui_address,
1021            &gas_object_ref,
1022            action_certificate,
1023            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1024            &id_token_map,
1025            1000,
1026        )
1027        .unwrap();
1028        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1029
1030        let mut event = SuiEvent::random_for_testing();
1031        event.type_ = TokenTransferClaimed.get().unwrap().clone();
1032        let events = vec![event];
1033        mock_transaction_response(
1034            &sui_client_mock,
1035            tx_digest,
1036            SuiExecutionStatus::Success,
1037            Some(events),
1038            true,
1039        );
1040
1041        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1043        assert!(
1045            !store
1046                .get_all_pending_actions()
1047                .contains_key(&action.digest())
1048        );
1049    }
1050
1051    #[tokio::test]
1052    async fn test_skip_request_signature_if_already_processed_on_chain() {
1053        let (
1054            signing_tx,
1055            _execution_tx,
1056            sui_client_mock,
1057            mut tx_subscription,
1058            store,
1059            _secrets,
1060            _dummy_sui_key,
1061            mock0,
1062            mock1,
1063            mock2,
1064            mock3,
1065            _handles,
1066            _gas_object_ref,
1067            _sui_address,
1068            _sui_token_type_tags,
1069            _bridge_pause_tx,
1070        ) = setup().await;
1071
1072        let sui_tx_digest = TransactionDigest::random();
1073        let sui_tx_event_index = 0;
1074        let action = get_test_sui_to_eth_bridge_action(
1075            Some(sui_tx_digest),
1076            Some(sui_tx_event_index),
1077            None,
1078            None,
1079            None,
1080            None,
1081            None,
1082        );
1083        mock_bridge_authority_signing_errors(
1084            vec![&mock0, &mock1, &mock2, &mock3],
1085            sui_tx_digest,
1086            sui_tx_event_index,
1087        );
1088        store
1089            .insert_pending_actions(std::slice::from_ref(&action))
1090            .unwrap();
1091        assert_eq!(
1092            store.get_all_pending_actions()[&action.digest()],
1093            action.clone()
1094        );
1095
1096        submit_to_executor(&signing_tx, action.clone())
1098            .await
1099            .unwrap();
1100        let action_digest = action.digest();
1101
1102        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1104        tx_subscription.try_recv().unwrap_err();
1105        assert!(store.get_all_pending_actions().contains_key(&action_digest));
1107
1108        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1109
1110        let now = std::time::Instant::now();
1112        while store.get_all_pending_actions().contains_key(&action_digest) {
1113            if now.elapsed().as_secs() > 10 {
1114                panic!("Timeout waiting for action to be removed from WAL");
1115            }
1116            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1117        }
1118        tx_subscription.try_recv().unwrap_err();
1119    }
1120
1121    #[tokio::test]
1122    async fn test_skip_tx_submission_if_already_processed_on_chain() {
1123        let (
1124            _signing_tx,
1125            execution_tx,
1126            sui_client_mock,
1127            mut tx_subscription,
1128            store,
1129            secrets,
1130            dummy_sui_key,
1131            mock0,
1132            mock1,
1133            mock2,
1134            mock3,
1135            _handles,
1136            gas_object_ref,
1137            sui_address,
1138            sui_token_type_tags,
1139            _bridge_pause_tx,
1140        ) = setup().await;
1141        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1142        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1143            vec![&mock0, &mock1, &mock2, &mock3],
1144            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1145            None,
1146            true,
1147        );
1148
1149        let action = action_certificate.data().clone();
1150        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1151        let tx_data = build_sui_transaction(
1152            sui_address,
1153            &gas_object_ref,
1154            action_certificate.clone(),
1155            arg,
1156            &id_token_map,
1157            1000,
1158        )
1159        .unwrap();
1160        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1161        mock_transaction_error(
1162            &sui_client_mock,
1163            tx_digest,
1164            BridgeError::Generic("some random error".to_string()),
1165            true,
1166        );
1167
1168        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1170            gas_coin.clone(),
1171            gas_object_ref,
1172            Owner::AddressOwner(sui_address),
1173        );
1174
1175        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1176
1177        store
1178            .insert_pending_actions(std::slice::from_ref(&action))
1179            .unwrap();
1180        assert_eq!(
1181            store.get_all_pending_actions()[&action.digest()],
1182            action.clone()
1183        );
1184
1185        execution_tx
1187            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1188            .await
1189            .unwrap();
1190
1191        tx_subscription.recv().await.unwrap();
1193
1194        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1196
1197        let now = std::time::Instant::now();
1199        let action_digest = action.digest();
1200        while store.get_all_pending_actions().contains_key(&action_digest) {
1201            if now.elapsed().as_secs() > 10 {
1202                panic!("Timeout waiting for action to be removed from WAL");
1203            }
1204            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1205        }
1206    }
1207
1208    #[tokio::test]
1209    async fn test_skip_tx_submission_if_bridge_is_paused() {
1210        let (
1211            _signing_tx,
1212            execution_tx,
1213            sui_client_mock,
1214            mut tx_subscription,
1215            store,
1216            secrets,
1217            dummy_sui_key,
1218            mock0,
1219            mock1,
1220            mock2,
1221            mock3,
1222            _handles,
1223            gas_object_ref,
1224            sui_address,
1225            sui_token_type_tags,
1226            bridge_pause_tx,
1227        ) = setup().await;
1228        let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1229        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1230            vec![&mock0, &mock1, &mock2, &mock3],
1231            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1232            None,
1233            true,
1234        );
1235
1236        let action = action_certificate.data().clone();
1237        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1238        let tx_data = build_sui_transaction(
1239            sui_address,
1240            &gas_object_ref,
1241            action_certificate.clone(),
1242            arg,
1243            &id_token_map,
1244            1000,
1245        )
1246        .unwrap();
1247        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1248        mock_transaction_error(
1249            &sui_client_mock,
1250            tx_digest,
1251            BridgeError::Generic("some random error".to_string()),
1252            true,
1253        );
1254
1255        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1257            gas_coin.clone(),
1258            gas_object_ref,
1259            Owner::AddressOwner(sui_address),
1260        );
1261        let action_digest = action.digest();
1262        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1263
1264        assert!(!*bridge_pause_tx.borrow());
1266
1267        store
1268            .insert_pending_actions(std::slice::from_ref(&action))
1269            .unwrap();
1270        assert_eq!(
1271            store.get_all_pending_actions()[&action.digest()],
1272            action.clone()
1273        );
1274
1275        execution_tx
1277            .send(CertifiedBridgeActionExecutionWrapper(
1278                action_certificate.clone(),
1279                0,
1280            ))
1281            .await
1282            .unwrap();
1283
1284        tx_subscription.recv().await.unwrap();
1286
1287        bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1289
1290        execution_tx
1292            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1293            .await
1294            .unwrap();
1295
1296        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1297        assert_eq!(
1299            tx_subscription.try_recv().unwrap_err(),
1300            tokio::sync::broadcast::error::TryRecvError::Empty
1301        );
1302        assert_eq!(
1304            store.get_all_pending_actions()[&action_digest],
1305            action.clone()
1306        );
1307    }
1308
1309    #[tokio::test]
1310    async fn test_action_executor_handle_new_token() {
1311        let new_token_id = 255u8; let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1313        let (
1314            _signing_tx,
1315            execution_tx,
1316            sui_client_mock,
1317            mut tx_subscription,
1318            _store,
1319            secrets,
1320            dummy_sui_key,
1321            mock0,
1322            mock1,
1323            mock2,
1324            mock3,
1325            _handles,
1326            gas_object_ref,
1327            sui_address,
1328            sui_token_type_tags,
1329            _bridge_pause_tx,
1330        ) = setup().await;
1331        let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1332        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1333            vec![&mock0, &mock1, &mock2, &mock3],
1334            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1335            Some(new_token_id),
1336            false, );
1338
1339        let action = action_certificate.data().clone();
1340        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1341        let tx_data = build_sui_transaction(
1342            sui_address,
1343            &gas_object_ref,
1344            action_certificate.clone(),
1345            arg,
1346            &maplit::hashmap! {
1347                new_token_id => new_type_tag.clone()
1348            },
1349            1000,
1350        )
1351        .unwrap();
1352        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1353        mock_transaction_error(
1354            &sui_client_mock,
1355            tx_digest,
1356            BridgeError::Generic("some random error".to_string()),
1357            true,
1358        );
1359
1360        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1362            gas_coin.clone(),
1363            gas_object_ref,
1364            Owner::AddressOwner(sui_address),
1365        );
1366        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1367
1368        execution_tx
1370            .send(CertifiedBridgeActionExecutionWrapper(
1371                action_certificate.clone(),
1372                0,
1373            ))
1374            .await
1375            .unwrap();
1376
1377        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1378        assert_eq!(
1380            tx_subscription.try_recv().unwrap_err(),
1381            tokio::sync::broadcast::error::TryRecvError::Empty
1382        );
1383
1384        id_token_map.insert(new_token_id, new_type_tag);
1386        sui_token_type_tags.store(Arc::new(id_token_map));
1387
1388        execution_tx
1390            .send(CertifiedBridgeActionExecutionWrapper(
1391                action_certificate.clone(),
1392                0,
1393            ))
1394            .await
1395            .unwrap();
1396
1397        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1398        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1400    }
1401
1402    fn mock_bridge_authority_sigs(
1403        mocks: Vec<&BridgeRequestMockHandler>,
1404        action: &BridgeAction,
1405        secrets: Vec<&BridgeAuthorityKeyPair>,
1406        sui_tx_digest: TransactionDigest,
1407        sui_tx_event_index: u16,
1408    ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1409        assert_eq!(mocks.len(), secrets.len());
1410        let mut signed_actions = BTreeMap::new();
1411        for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1412            let signed_action = sign_action_with_key(action, secret);
1413            mock.add_sui_event_response(
1414                sui_tx_digest,
1415                sui_tx_event_index,
1416                Ok(signed_action.clone()),
1417                None,
1418            );
1419            signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1420        }
1421        signed_actions
1422    }
1423
1424    fn mock_bridge_authority_signing_errors(
1425        mocks: Vec<&BridgeRequestMockHandler>,
1426        sui_tx_digest: TransactionDigest,
1427        sui_tx_event_index: u16,
1428    ) {
1429        for mock in mocks {
1430            mock.add_sui_event_response(
1431                sui_tx_digest,
1432                sui_tx_event_index,
1433                Err(BridgeError::RestAPIError("small issue".into())),
1434                None,
1435            );
1436        }
1437    }
1438
1439    fn get_bridge_authority_approved_action(
1441        mocks: Vec<&BridgeRequestMockHandler>,
1442        secrets: Vec<&BridgeAuthorityKeyPair>,
1443        token_id: Option<u8>,
1444        sui_to_eth: bool,
1445    ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1446        let sui_tx_digest = TransactionDigest::random();
1447        let sui_tx_event_index = 1;
1448        let action = if sui_to_eth {
1449            get_test_sui_to_eth_bridge_action(
1450                Some(sui_tx_digest),
1451                Some(sui_tx_event_index),
1452                None,
1453                None,
1454                None,
1455                None,
1456                token_id,
1457            )
1458        } else {
1459            get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1460        };
1461
1462        let sigs =
1463            mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1464        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1465            action,
1466            BridgeCommitteeValiditySignInfo { signatures: sigs },
1467        );
1468        (
1469            VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1470            sui_tx_digest,
1471            sui_tx_event_index,
1472        )
1473    }
1474
1475    fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1476        let sig = Signature::new_secure(
1477            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1478            dummy_sui_key,
1479        );
1480        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1481        *signed_tx.digest()
1482    }
1483
1484    fn mock_transaction_response(
1488        sui_client_mock: &SuiMockClient,
1489        tx_digest: TransactionDigest,
1490        status: SuiExecutionStatus,
1491        events: Option<Vec<SuiEvent>>,
1492        wildcard: bool,
1493    ) {
1494        let mut response = SuiTransactionBlockResponse::new(tx_digest);
1495        let effects = SuiTransactionBlockEffects::new_for_testing(tx_digest, status);
1496        if let Some(events) = events {
1497            response.events = Some(SuiTransactionBlockEvents { data: events });
1498        }
1499        response.effects = Some(effects);
1500        if wildcard {
1501            sui_client_mock.set_wildcard_transaction_response(Ok(response));
1502        } else {
1503            sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1504        }
1505    }
1506
1507    fn mock_transaction_error(
1508        sui_client_mock: &SuiMockClient,
1509        tx_digest: TransactionDigest,
1510        error: BridgeError,
1511        wildcard: bool,
1512    ) {
1513        if wildcard {
1514            sui_client_mock.set_wildcard_transaction_response(Err(error));
1515        } else {
1516            sui_client_mock.add_transaction_response(tx_digest, Err(error));
1517        }
1518    }
1519
1520    #[allow(clippy::type_complexity)]
1521    async fn setup() -> (
1522        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1523        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1524        SuiMockClient,
1525        tokio::sync::broadcast::Receiver<TransactionDigest>,
1526        Arc<BridgeOrchestratorTables>,
1527        Vec<BridgeAuthorityKeyPair>,
1528        SuiKeyPair,
1529        BridgeRequestMockHandler,
1530        BridgeRequestMockHandler,
1531        BridgeRequestMockHandler,
1532        BridgeRequestMockHandler,
1533        Vec<tokio::task::JoinHandle<()>>,
1534        ObjectRef,
1535        SuiAddress,
1536        Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1537        tokio::sync::watch::Sender<IsBridgePaused>,
1538    ) {
1539        telemetry_subscribers::init_for_testing();
1540        let registry = Registry::new();
1541        mysten_metrics::init_metrics(®istry);
1542        init_all_struct_tags();
1543
1544        let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1545        let sui_key = SuiKeyPair::from(kp);
1546        let gas_object_ref = random_object_ref();
1547        let temp_dir = tempfile::tempdir().unwrap();
1548        let store = BridgeOrchestratorTables::new(temp_dir.path());
1549        let sui_client_mock = SuiMockClient::default();
1550        let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1551        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1552
1553        let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1556        let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1557
1558        let mock0 = BridgeRequestMockHandler::new();
1559        let mock1 = BridgeRequestMockHandler::new();
1560        let mock2 = BridgeRequestMockHandler::new();
1561        let mock3 = BridgeRequestMockHandler::new();
1562
1563        let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1564            vec![2500, 2500, 2500, 2500],
1565            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1566        );
1567
1568        let committee = BridgeCommittee::new(authorities).unwrap();
1569
1570        let agg = Arc::new(ArcSwap::new(Arc::new(
1571            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1572        )));
1573        let metrics = Arc::new(BridgeMetrics::new(®istry));
1574        let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1575        let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1576        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1577        let executor = BridgeActionExecutor::new(
1578            sui_client.clone(),
1579            agg.clone(),
1580            store.clone(),
1581            sui_key,
1582            sui_address,
1583            gas_object_ref.0,
1584            sui_token_type_tags.clone(),
1585            bridge_pause_rx,
1586            metrics,
1587        )
1588        .await;
1589
1590        let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1591        handles.extend(executor_handle);
1592
1593        (
1594            signing_tx,
1595            execution_tx,
1596            sui_client_mock,
1597            tx_subscription,
1598            store,
1599            secrets,
1600            dummy_sui_key,
1601            mock0,
1602            mock1,
1603            mock2,
1604            mock3,
1605            handles,
1606            gas_object_ref,
1607            sui_address,
1608            sui_token_type_tags,
1609            bridge_pause_tx,
1610        )
1611    }
1612}