sui_bridge/
action_executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! BridgeActionExecutor receives BridgeActions (from BridgeOrchestrator),
5//! collects bridge authority signatures and submit signatures on chain.
6
7use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::SuiExecutionStatus;
13use sui_types::TypeTag;
14use sui_types::transaction::ObjectArg;
15use sui_types::{
16    base_types::{ObjectID, ObjectRef, SuiAddress},
17    crypto::{Signature, SuiKeyPair},
18    digests::TransactionDigest,
19    gas_coin::GasCoin,
20    object::Owner,
21    transaction::Transaction,
22};
23
24use crate::events::{
25    TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
26    TokenTransferClaimed,
27};
28use crate::metrics::BridgeMetrics;
29use crate::{
30    client::bridge_authority_aggregator::BridgeAuthorityAggregator,
31    error::BridgeError,
32    storage::BridgeOrchestratorTables,
33    sui_client::{ExecuteTransactionResult, SuiClient, SuiClientInner},
34    sui_transaction_builder::build_sui_transaction,
35    types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
36};
37use std::collections::HashMap;
38use std::sync::Arc;
39use tokio::sync::Semaphore;
40use tokio::time::Duration;
41use tracing::{Instrument, error, info, instrument, warn};
42
43pub const CHANNEL_SIZE: usize = 1000;
44pub const SIGNING_CONCURRENCY: usize = 10;
45
46// delay schedule: at most 16 times including the initial attempt
47// 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s, 25.6s, 51.2s, 102.4s, 204.8s, 409.6s, 819.2s, 1638.4s
48pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
49pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
50
51async fn delay(attempt_times: u64) {
52    let delay_ms = 100 * 2_u64.pow(attempt_times as u32);
53    tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
54}
55
56#[derive(Debug)]
57pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
58
59#[derive(Debug)]
60pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
61
62pub trait BridgeActionExecutorTrait {
63    fn run(
64        self,
65    ) -> (
66        Vec<tokio::task::JoinHandle<()>>,
67        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
68    );
69}
70
71pub struct BridgeActionExecutor<C> {
72    sui_client: Arc<SuiClient<C>>,
73    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
74    key: SuiKeyPair,
75    sui_address: SuiAddress,
76    gas_object_id: ObjectID,
77    store: Arc<BridgeOrchestratorTables>,
78    bridge_object_arg: ObjectArg,
79    sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
80    bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
81    metrics: Arc<BridgeMetrics>,
82}
83
84impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
85where
86    C: SuiClientInner + 'static,
87{
88    fn run(
89        self,
90    ) -> (
91        Vec<tokio::task::JoinHandle<()>>,
92        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
93    ) {
94        let (tasks, sender, _) = self.run_inner();
95        (tasks, sender)
96    }
97}
98
99impl<C> BridgeActionExecutor<C>
100where
101    C: SuiClientInner + 'static,
102{
103    pub async fn new(
104        sui_client: Arc<SuiClient<C>>,
105        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
106        store: Arc<BridgeOrchestratorTables>,
107        key: SuiKeyPair,
108        sui_address: SuiAddress,
109        gas_object_id: ObjectID,
110        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
111        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
112        metrics: Arc<BridgeMetrics>,
113    ) -> Self {
114        let bridge_object_arg = sui_client
115            .get_mutable_bridge_object_arg_must_succeed()
116            .await;
117        Self {
118            sui_client,
119            bridge_auth_agg,
120            store,
121            key,
122            gas_object_id,
123            sui_address,
124            bridge_object_arg,
125            sui_token_type_tags,
126            bridge_pause_rx,
127            metrics,
128        }
129    }
130
131    fn run_inner(
132        self,
133    ) -> (
134        Vec<tokio::task::JoinHandle<()>>,
135        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
136        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
137    ) {
138        let key = self.key;
139
140        let (sender, receiver) = mysten_metrics::metered_channel::channel(
141            CHANNEL_SIZE,
142            &mysten_metrics::get_metrics()
143                .unwrap()
144                .channel_inflight
145                .with_label_values(&["executor_signing_queue"]),
146        );
147
148        let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
149            CHANNEL_SIZE,
150            &mysten_metrics::get_metrics()
151                .unwrap()
152                .channel_inflight
153                .with_label_values(&["executor_execution_queue"]),
154        );
155        let execution_tx_clone = execution_tx.clone();
156        let sender_clone = sender.clone();
157        let store_clone = self.store.clone();
158        let client_clone = self.sui_client.clone();
159        let mut tasks = vec![];
160        let metrics = self.metrics.clone();
161        tasks.push(spawn_logged_monitored_task!(
162            Self::run_signature_aggregation_loop(
163                client_clone,
164                self.bridge_auth_agg,
165                store_clone,
166                sender_clone,
167                receiver,
168                execution_tx_clone,
169                metrics,
170            )
171        ));
172
173        let metrics = self.metrics.clone();
174        let execution_tx_clone = execution_tx.clone();
175        tasks.push(spawn_logged_monitored_task!(
176            Self::run_onchain_execution_loop(
177                self.sui_client.clone(),
178                key,
179                self.sui_address,
180                self.gas_object_id,
181                self.store.clone(),
182                execution_tx_clone,
183                execution_rx,
184                self.bridge_object_arg,
185                self.sui_token_type_tags,
186                self.bridge_pause_rx,
187                metrics,
188            )
189        ));
190        (tasks, sender, execution_tx)
191    }
192
193    async fn run_signature_aggregation_loop(
194        sui_client: Arc<SuiClient<C>>,
195        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
196        store: Arc<BridgeOrchestratorTables>,
197        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
198        mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
199            BridgeActionExecutionWrapper,
200        >,
201        execution_queue_sender: mysten_metrics::metered_channel::Sender<
202            CertifiedBridgeActionExecutionWrapper,
203        >,
204        metrics: Arc<BridgeMetrics>,
205    ) {
206        info!("Starting run_signature_aggregation_loop");
207        let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
208        while let Some(action) = signing_queue_receiver.recv().await {
209            Self::handle_signing_task(
210                &semaphore,
211                &auth_agg,
212                &signing_queue_sender,
213                &execution_queue_sender,
214                &sui_client,
215                &store,
216                action,
217                &metrics,
218            )
219            .await;
220        }
221    }
222
223    async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
224        let Ok(Ok(is_paused)) =
225            retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
226        else {
227            error!("Failed to get bridge status after retry");
228            return false;
229        };
230        !is_paused
231    }
232
233    #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
234    async fn handle_signing_task(
235        semaphore: &Arc<Semaphore>,
236        auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
237        signing_queue_sender: &mysten_metrics::metered_channel::Sender<
238            BridgeActionExecutionWrapper,
239        >,
240        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
241            CertifiedBridgeActionExecutionWrapper,
242        >,
243        sui_client: &Arc<SuiClient<C>>,
244        store: &Arc<BridgeOrchestratorTables>,
245        action: BridgeActionExecutionWrapper,
246        metrics: &Arc<BridgeMetrics>,
247    ) {
248        metrics.action_executor_signing_queue_received_actions.inc();
249        let action_key = action.0.key();
250        info!("Received action for signing: {:?}", action.0);
251
252        // TODO: this is a temporary fix to avoid signing when the bridge is paused.
253        // but the way is implemented is not ideal:
254        // 1. it should check the direction
255        // 2. should use a better mechanism to check the bridge status instead of polling for each action
256        let should_proceed = Self::should_proceed_signing(sui_client).await;
257        if !should_proceed {
258            metrics.action_executor_signing_queue_skipped_actions.inc();
259            warn!("skipping signing task: {:?}", action_key);
260            return;
261        }
262
263        let auth_agg_clone = auth_agg.clone();
264        let signing_queue_sender_clone = signing_queue_sender.clone();
265        let execution_queue_sender_clone = execution_queue_sender.clone();
266        let sui_client_clone = sui_client.clone();
267        let store_clone = store.clone();
268        let metrics_clone = metrics.clone();
269        let semaphore_clone = semaphore.clone();
270        spawn_logged_monitored_task!(
271            Self::request_signatures(
272                semaphore_clone,
273                sui_client_clone,
274                auth_agg_clone,
275                action,
276                store_clone,
277                signing_queue_sender_clone,
278                execution_queue_sender_clone,
279                metrics_clone,
280            )
281            .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
282            "request_signatures"
283        );
284    }
285
286    // Checks if the action is already processed on chain.
287    // If yes, skip this action and remove it from the pending log.
288    // Returns true if the action is already processed.
289    async fn handle_already_processed_token_transfer_action_maybe(
290        sui_client: &Arc<SuiClient<C>>,
291        action: &BridgeAction,
292        store: &Arc<BridgeOrchestratorTables>,
293        metrics: &Arc<BridgeMetrics>,
294    ) -> bool {
295        let status = sui_client
296            .get_token_transfer_action_onchain_status_until_success(
297                action.chain_id() as u8,
298                action.seq_number(),
299            )
300            .await;
301        match status {
302            BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
303                info!(
304                    "Action already approved or claimed, removing action from pending logs: {:?}",
305                    action
306                );
307                metrics.action_executor_already_processed_actions.inc();
308                store
309                    .remove_pending_actions(&[action.digest()])
310                    .unwrap_or_else(|e| {
311                        panic!("Write to DB should not fail: {:?}", e);
312                    });
313                true
314            }
315            // Although theoretically a legit SuiToEthBridgeAction should not have
316            // status `NotFound`
317            BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
318        }
319    }
320
321    // TODO: introduce a way to properly stagger the handling
322    // for various validators.
323    async fn request_signatures(
324        semaphore: Arc<Semaphore>,
325        sui_client: Arc<SuiClient<C>>,
326        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
327        action: BridgeActionExecutionWrapper,
328        store: Arc<BridgeOrchestratorTables>,
329        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
330        execution_queue_sender: mysten_metrics::metered_channel::Sender<
331            CertifiedBridgeActionExecutionWrapper,
332        >,
333        metrics: Arc<BridgeMetrics>,
334    ) {
335        let _permit = semaphore
336            .acquire()
337            .await
338            .expect("semaphore should not be closed");
339        info!("requesting signatures");
340        let BridgeActionExecutionWrapper(action, attempt_times) = action;
341
342        // Only token transfer action should reach here
343        match &action {
344            BridgeAction::SuiToEthBridgeAction(_)
345            | BridgeAction::SuiToEthTokenTransfer(_)
346            | BridgeAction::SuiToEthTokenTransferV2(_)
347            | BridgeAction::EthToSuiBridgeAction(_)
348            | BridgeAction::EthToSuiTokenTransferV2(_) => (),
349            _ => unreachable!("Non token transfer action should not reach here"),
350        };
351
352        // If the action is already processed, skip it.
353        if Self::handle_already_processed_token_transfer_action_maybe(
354            &sui_client,
355            &action,
356            &store,
357            &metrics,
358        )
359        .await
360        {
361            return;
362        }
363        match auth_agg
364            .load()
365            .request_committee_signatures(action.clone())
366            .await
367        {
368            Ok(certificate) => {
369                info!("Sending certificate to execution");
370                execution_queue_sender
371                    .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
372                    .await
373                    .unwrap_or_else(|e| {
374                        panic!("Sending to execution queue should not fail: {:?}", e);
375                    });
376            }
377            Err(e) => {
378                warn!("Failed to collect sigs for bridge action: {:?}", e);
379                metrics.err_signature_aggregation.inc();
380
381                // TODO: spawn a task for this
382                if attempt_times >= MAX_SIGNING_ATTEMPTS {
383                    metrics.err_signature_aggregation_too_many_failures.inc();
384                    error!(
385                        "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
386                        e
387                    );
388                    return;
389                }
390                delay(attempt_times).await;
391                signing_queue_sender
392                    .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
393                    .await
394                    .unwrap_or_else(|e| {
395                        panic!("Sending to signing queue should not fail: {:?}", e);
396                    });
397            }
398        }
399    }
400
401    // Before calling this function, `key` and `sui_address` need to be
402    // verified to match.
403    async fn run_onchain_execution_loop(
404        sui_client: Arc<SuiClient<C>>,
405        sui_key: SuiKeyPair,
406        sui_address: SuiAddress,
407        gas_object_id: ObjectID,
408        store: Arc<BridgeOrchestratorTables>,
409        execution_queue_sender: mysten_metrics::metered_channel::Sender<
410            CertifiedBridgeActionExecutionWrapper,
411        >,
412        mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
413            CertifiedBridgeActionExecutionWrapper,
414        >,
415        bridge_object_arg: ObjectArg,
416        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
417        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
418        metrics: Arc<BridgeMetrics>,
419    ) {
420        info!("Starting run_onchain_execution_loop");
421        while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
422            // When bridge is paused, skip execution.
423            // Skipped actions will be picked up upon node restarting
424            // if bridge is unpaused.
425            if *bridge_pause_rx.borrow() {
426                warn!("Bridge is paused, skipping execution");
427                metrics
428                    .action_executor_execution_queue_skipped_actions_due_to_pausing
429                    .inc();
430                continue;
431            }
432            Self::handle_execution_task(
433                certificate_wrapper,
434                &sui_client,
435                &sui_key,
436                &sui_address,
437                gas_object_id,
438                &store,
439                &execution_queue_sender,
440                &bridge_object_arg,
441                &sui_token_type_tags,
442                &metrics,
443            )
444            .await;
445        }
446        panic!("Execution queue closed unexpectedly");
447    }
448
449    #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
450    async fn handle_execution_task(
451        certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
452        sui_client: &Arc<SuiClient<C>>,
453        sui_key: &SuiKeyPair,
454        sui_address: &SuiAddress,
455        gas_object_id: ObjectID,
456        store: &Arc<BridgeOrchestratorTables>,
457        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
458            CertifiedBridgeActionExecutionWrapper,
459        >,
460        bridge_object_arg: &ObjectArg,
461        sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
462        metrics: &Arc<BridgeMetrics>,
463    ) {
464        metrics
465            .action_executor_execution_queue_received_actions
466            .inc();
467        let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
468        let action = certificate.data();
469        let action_key = action.key();
470
471        info!("Received certified action for execution: {:?}", action);
472
473        // TODO check gas coin balance here. If gas balance too low, do not proceed.
474        let (gas_coin, gas_object_ref) =
475            Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
476        metrics.gas_coin_balance.set(gas_coin.value() as i64);
477
478        let ceriticate_clone = certificate.clone();
479
480        // Check once: if the action is already processed, skip it.
481        if Self::handle_already_processed_token_transfer_action_maybe(
482            sui_client, action, store, metrics,
483        )
484        .await
485        {
486            info!("Action already processed, skipping");
487            return;
488        }
489
490        info!("Building Sui transaction");
491        let rgp = sui_client.get_reference_gas_price_until_success().await;
492        let tx_data = match build_sui_transaction(
493            *sui_address,
494            &gas_object_ref,
495            ceriticate_clone,
496            *bridge_object_arg,
497            sui_token_type_tags.load().as_ref(),
498            rgp,
499        ) {
500            Ok(tx_data) => tx_data,
501            Err(err) => {
502                metrics.err_build_sui_transaction.inc();
503                error!(
504                    "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
505                    action, err
506                );
507                // This should not happen, but in case it does, we do not want to
508                // panic, instead we log here for manual intervention.
509                return;
510            }
511        };
512        let sig = Signature::new_secure(
513            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
514            sui_key,
515        );
516        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
517        let tx_digest = *signed_tx.digest();
518
519        // Check twice: If the action is already processed, skip it.
520        if Self::handle_already_processed_token_transfer_action_maybe(
521            sui_client, action, store, metrics,
522        )
523        .await
524        {
525            info!("Action already processed, skipping");
526            return;
527        }
528
529        info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
530        match sui_client
531            .execute_transaction_block_with_effects(signed_tx)
532            .await
533        {
534            Ok(resp) => {
535                Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
536            }
537
538            // If the transaction did not go through, retry up to a certain times.
539            Err(err) => {
540                error!(
541                    ?action_key,
542                    ?tx_digest,
543                    "Sui transaction failed at signing: {err:?}"
544                );
545                metrics.err_sui_transaction_submission.inc();
546                let metrics_clone = metrics.clone();
547                // Do this in a separate task so we won't deadlock here
548                let sender_clone = execution_queue_sender.clone();
549                spawn_logged_monitored_task!(async move {
550                    // If it fails for too many times, log and ask for manual intervention.
551                    if attempt_times >= MAX_EXECUTION_ATTEMPTS {
552                        metrics_clone
553                            .err_sui_transaction_submission_too_many_failures
554                            .inc();
555                        error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
556                        return;
557                    }
558                    delay(attempt_times).await;
559                    sender_clone
560                        .send(CertifiedBridgeActionExecutionWrapper(
561                            certificate,
562                            attempt_times + 1,
563                        ))
564                        .await
565                        .unwrap_or_else(|e| {
566                            panic!("Sending to execution queue should not fail: {:?}", e);
567                        });
568                    info!("Re-enqueued certificate for execution");
569                }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
570            }
571        }
572    }
573
574    // TODO: do we need a mechanism to periodically read pending actions from DB?
575    async fn handle_execution_effects(
576        tx_digest: TransactionDigest,
577        response: ExecuteTransactionResult,
578        store: &Arc<BridgeOrchestratorTables>,
579        action: &BridgeAction,
580        metrics: &Arc<BridgeMetrics>,
581    ) {
582        match &response.status {
583            SuiExecutionStatus::Success => {
584                let relevant_events = response
585                    .events
586                    .iter()
587                    .filter(|e| {
588                        e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
589                            || e.type_ == *TokenTransferClaimed.get().unwrap()
590                            || e.type_ == *TokenTransferApproved.get().unwrap()
591                            || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
592                    })
593                    .collect::<Vec<_>>();
594                assert!(
595                    !relevant_events.is_empty(),
596                    "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
597                    or TokenTransferAlreadyApproved event but got: {:?}",
598                    response.events
599                );
600                info!(?tx_digest, "Sui transaction executed successfully");
601                // track successful approval and claim events
602                relevant_events.iter().for_each(|e| {
603                    if e.type_ == *TokenTransferClaimed.get().unwrap() {
604                        match action {
605                            BridgeAction::EthToSuiBridgeAction(_)
606                            | BridgeAction::EthToSuiTokenTransferV2(_) => {
607                                metrics.eth_sui_token_transfer_claimed.inc();
608                            }
609                            BridgeAction::SuiToEthBridgeAction(_)
610                            | BridgeAction::SuiToEthTokenTransfer(_)
611                            | BridgeAction::SuiToEthTokenTransferV2(_) => {
612                                metrics.sui_eth_token_transfer_claimed.inc();
613                            }
614                            _ => error!("Unexpected action type for claimed event: {:?}", action),
615                        }
616                    } else if e.type_ == *TokenTransferApproved.get().unwrap() {
617                        match action {
618                            BridgeAction::EthToSuiBridgeAction(_)
619                            | BridgeAction::EthToSuiTokenTransferV2(_) => {
620                                metrics.eth_sui_token_transfer_approved.inc();
621                            }
622                            BridgeAction::SuiToEthBridgeAction(_)
623                            | BridgeAction::SuiToEthTokenTransfer(_)
624                            | BridgeAction::SuiToEthTokenTransferV2(_) => {
625                                metrics.sui_eth_token_transfer_approved.inc();
626                            }
627                            _ => error!("Unexpected action type for approved event: {:?}", action),
628                        }
629                    }
630                });
631                store
632                    .remove_pending_actions(&[action.digest()])
633                    .unwrap_or_else(|e| {
634                        panic!("Write to DB should not fail: {:?}", e);
635                    })
636            }
637            SuiExecutionStatus::Failure { error } => {
638                // In practice the transaction could fail because of running out of gas, but really
639                // should not be due to other reasons.
640                // This means manual intervention is needed. So we do not push them back to
641                // the execution queue because retries are mostly likely going to fail anyway.
642                // After human examination, the node should be restarted and fetch them from WAL.
643
644                metrics.err_sui_transaction_execution.inc();
645                error!(
646                    ?tx_digest,
647                    "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
648                );
649            }
650        }
651    }
652
653    /// Panics if the gas object is not owned by the address.
654    async fn get_gas_data_assert_ownership(
655        sui_address: SuiAddress,
656        gas_object_id: ObjectID,
657        sui_client: &SuiClient<C>,
658    ) -> (GasCoin, ObjectRef) {
659        let (gas_coin, gas_obj_ref, owner) = sui_client
660            .get_gas_data_panic_if_not_gas(gas_object_id)
661            .await;
662
663        // TODO: when we add multiple gas support in the future we could discard
664        // transferred gas object instead.
665        assert_eq!(
666            owner,
667            Owner::AddressOwner(sui_address),
668            "Gas object {:?} is no longer owned by address {}",
669            gas_object_id,
670            sui_address
671        );
672        (gas_coin, gas_obj_ref)
673    }
674}
675
676pub async fn submit_to_executor(
677    tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
678    action: BridgeAction,
679) -> Result<(), BridgeError> {
680    tx.send(BridgeActionExecutionWrapper(action, 0))
681        .await
682        .map_err(|e| BridgeError::Generic(e.to_string()))
683}
684
685#[cfg(test)]
686mod tests {
687    use crate::events::init_all_struct_tags;
688    use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
689    use crate::types::BRIDGE_PAUSED;
690    use fastcrypto::traits::KeyPair;
691    use prometheus::Registry;
692    use std::collections::{BTreeMap, HashMap};
693    use std::str::FromStr;
694    use sui_json_rpc_types::SuiEvent;
695    use sui_types::TypeTag;
696    use sui_types::crypto::get_key_pair;
697    use sui_types::gas_coin::GasCoin;
698    use sui_types::{base_types::random_object_ref, transaction::TransactionData};
699
700    use crate::{
701        crypto::{
702            BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
703            BridgeAuthorityRecoverableSignature,
704        },
705        server::mock_handler::BridgeRequestMockHandler,
706        sui_mock_client::SuiMockClient,
707        test_utils::{
708            get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
709            get_test_sui_to_eth_bridge_action, sign_action_with_key,
710        },
711        types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
712    };
713
714    use super::*;
715
716    #[tokio::test]
717    async fn test_onchain_execution_loop() {
718        let (
719            signing_tx,
720            _execution_tx,
721            sui_client_mock,
722            mut tx_subscription,
723            store,
724            secrets,
725            dummy_sui_key,
726            mock0,
727            mock1,
728            mock2,
729            mock3,
730            _handles,
731            gas_object_ref,
732            sui_address,
733            sui_token_type_tags,
734            _bridge_pause_tx,
735        ) = setup().await;
736        let (action_certificate, _, _) = get_bridge_authority_approved_action(
737            vec![&mock0, &mock1, &mock2, &mock3],
738            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
739            None,
740            true,
741        );
742        let action = action_certificate.data().clone();
743        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
744        let tx_data = build_sui_transaction(
745            sui_address,
746            &gas_object_ref,
747            action_certificate,
748            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
749            &id_token_map,
750            1000,
751        )
752        .unwrap();
753
754        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
755
756        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
757        sui_client_mock.add_gas_object_info(
758            gas_coin.clone(),
759            gas_object_ref,
760            Owner::AddressOwner(sui_address),
761        );
762
763        // Mock the transaction to be successfully executed
764        let mut event = SuiEvent::random_for_testing();
765        event.type_ = TokenTransferClaimed.get().unwrap().clone();
766        let events = vec![event];
767        mock_transaction_response(
768            &sui_client_mock,
769            tx_digest,
770            SuiExecutionStatus::Success,
771            Some(events),
772            true,
773        );
774
775        store
776            .insert_pending_actions(std::slice::from_ref(&action))
777            .unwrap();
778        assert_eq!(
779            store.get_all_pending_actions()[&action.digest()],
780            action.clone()
781        );
782
783        // Kick it
784        submit_to_executor(&signing_tx, action.clone())
785            .await
786            .unwrap();
787
788        // Expect to see the transaction to be requested and successfully executed hence removed from WAL
789        tx_subscription.recv().await.unwrap();
790        assert!(store.get_all_pending_actions().is_empty());
791
792        /////////////////////////////////////////////////////////////////////////////////////////////////
793        ////////////////////////////////////// Test execution failure ///////////////////////////////////
794        /////////////////////////////////////////////////////////////////////////////////////////////////
795
796        let (action_certificate, _, _) = get_bridge_authority_approved_action(
797            vec![&mock0, &mock1, &mock2, &mock3],
798            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
799            None,
800            true,
801        );
802
803        let action = action_certificate.data().clone();
804
805        let tx_data = build_sui_transaction(
806            sui_address,
807            &gas_object_ref,
808            action_certificate,
809            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
810            &id_token_map,
811            1000,
812        )
813        .unwrap();
814        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
815
816        // Mock the transaction to fail
817        mock_transaction_response(
818            &sui_client_mock,
819            tx_digest,
820            SuiExecutionStatus::Failure {
821                error: "failure is mother of success".to_string(),
822            },
823            None,
824            true,
825        );
826
827        store
828            .insert_pending_actions(std::slice::from_ref(&action))
829            .unwrap();
830        assert_eq!(
831            store.get_all_pending_actions()[&action.digest()],
832            action.clone()
833        );
834
835        // Kick it
836        submit_to_executor(&signing_tx, action.clone())
837            .await
838            .unwrap();
839
840        // Expect to see the transaction to be requested and but failed
841        tx_subscription.recv().await.unwrap();
842        // The action is not removed from WAL because the transaction failed
843        assert_eq!(
844            store.get_all_pending_actions()[&action.digest()],
845            action.clone()
846        );
847
848        /////////////////////////////////////////////////////////////////////////////////////////////////
849        //////////////////////////// Test transaction failed at signing stage ///////////////////////////
850        /////////////////////////////////////////////////////////////////////////////////////////////////
851
852        let (action_certificate, _, _) = get_bridge_authority_approved_action(
853            vec![&mock0, &mock1, &mock2, &mock3],
854            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
855            None,
856            true,
857        );
858
859        let action = action_certificate.data().clone();
860
861        let tx_data = build_sui_transaction(
862            sui_address,
863            &gas_object_ref,
864            action_certificate,
865            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
866            &id_token_map,
867            1000,
868        )
869        .unwrap();
870        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
871        mock_transaction_error(
872            &sui_client_mock,
873            tx_digest,
874            BridgeError::Generic("some random error".to_string()),
875            true,
876        );
877
878        store
879            .insert_pending_actions(std::slice::from_ref(&action))
880            .unwrap();
881        assert_eq!(
882            store.get_all_pending_actions()[&action.digest()],
883            action.clone()
884        );
885
886        // Kick it
887        submit_to_executor(&signing_tx, action.clone())
888            .await
889            .unwrap();
890
891        // Failure will trigger retry, we wait for 2 requests before checking WAL log
892        let tx_digest = tx_subscription.recv().await.unwrap();
893        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
894
895        // The retry is still going on, action still in WAL
896        assert!(
897            store
898                .get_all_pending_actions()
899                .contains_key(&action.digest())
900        );
901
902        // Now let it succeed
903        let mut event = SuiEvent::random_for_testing();
904        event.type_ = TokenTransferClaimed.get().unwrap().clone();
905        let events = vec![event];
906        mock_transaction_response(
907            &sui_client_mock,
908            tx_digest,
909            SuiExecutionStatus::Success,
910            Some(events),
911            true,
912        );
913
914        // Give it 1 second to retry and succeed
915        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
916        // The action is successful and should be removed from WAL now
917        assert!(
918            !store
919                .get_all_pending_actions()
920                .contains_key(&action.digest())
921        );
922    }
923
924    #[tokio::test]
925    async fn test_signature_aggregation_loop() {
926        let (
927            signing_tx,
928            _execution_tx,
929            sui_client_mock,
930            mut tx_subscription,
931            store,
932            secrets,
933            dummy_sui_key,
934            mock0,
935            mock1,
936            mock2,
937            mock3,
938            _handles,
939            gas_object_ref,
940            sui_address,
941            sui_token_type_tags,
942            _bridge_pause_tx,
943        ) = setup().await;
944        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
945        let (action_certificate, sui_tx_digest, sui_tx_event_index) =
946            get_bridge_authority_approved_action(
947                vec![&mock0, &mock1, &mock2, &mock3],
948                vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
949                None,
950                true,
951            );
952        let action = action_certificate.data().clone();
953        mock_bridge_authority_signing_errors(
954            vec![&mock0, &mock1, &mock2],
955            sui_tx_digest,
956            sui_tx_event_index,
957        );
958        let mut sigs = mock_bridge_authority_sigs(
959            vec![&mock3],
960            &action,
961            vec![&secrets[3]],
962            sui_tx_digest,
963            sui_tx_event_index,
964        );
965
966        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
967        sui_client_mock.add_gas_object_info(
968            gas_coin,
969            gas_object_ref,
970            Owner::AddressOwner(sui_address),
971        );
972        store
973            .insert_pending_actions(std::slice::from_ref(&action))
974            .unwrap();
975        assert_eq!(
976            store.get_all_pending_actions()[&action.digest()],
977            action.clone()
978        );
979
980        // Kick it
981        submit_to_executor(&signing_tx, action.clone())
982            .await
983            .unwrap();
984
985        // Wait until the transaction is retried at least once (instead of deing dropped)
986        loop {
987            let requested_times =
988                mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
989            if requested_times >= 2 {
990                break;
991            }
992            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
993        }
994        // Nothing is sent to execute yet
995        assert_eq!(
996            tx_subscription.try_recv().unwrap_err(),
997            tokio::sync::broadcast::error::TryRecvError::Empty
998        );
999        // Still in WAL
1000        assert_eq!(
1001            store.get_all_pending_actions()[&action.digest()],
1002            action.clone()
1003        );
1004
1005        // Let authorities sign the action too. Now we are above the threshold
1006        let sig_from_2 = mock_bridge_authority_sigs(
1007            vec![&mock2],
1008            &action,
1009            vec![&secrets[2]],
1010            sui_tx_digest,
1011            sui_tx_event_index,
1012        );
1013        sigs.extend(sig_from_2);
1014        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1015            action.clone(),
1016            BridgeCommitteeValiditySignInfo { signatures: sigs },
1017        );
1018        let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1019        let tx_data = build_sui_transaction(
1020            sui_address,
1021            &gas_object_ref,
1022            action_certificate,
1023            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1024            &id_token_map,
1025            1000,
1026        )
1027        .unwrap();
1028        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1029
1030        let mut event = SuiEvent::random_for_testing();
1031        event.type_ = TokenTransferClaimed.get().unwrap().clone();
1032        let events = vec![event];
1033        mock_transaction_response(
1034            &sui_client_mock,
1035            tx_digest,
1036            SuiExecutionStatus::Success,
1037            Some(events),
1038            true,
1039        );
1040
1041        // Expect to see the transaction to be requested and succeed
1042        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1043        // The action is removed from WAL
1044        assert!(
1045            !store
1046                .get_all_pending_actions()
1047                .contains_key(&action.digest())
1048        );
1049    }
1050
1051    #[tokio::test]
1052    async fn test_skip_request_signature_if_already_processed_on_chain() {
1053        let (
1054            signing_tx,
1055            _execution_tx,
1056            sui_client_mock,
1057            mut tx_subscription,
1058            store,
1059            _secrets,
1060            _dummy_sui_key,
1061            mock0,
1062            mock1,
1063            mock2,
1064            mock3,
1065            _handles,
1066            _gas_object_ref,
1067            _sui_address,
1068            _sui_token_type_tags,
1069            _bridge_pause_tx,
1070        ) = setup().await;
1071
1072        let sui_tx_digest = TransactionDigest::random();
1073        let sui_tx_event_index = 0;
1074        let action = get_test_sui_to_eth_bridge_action(
1075            Some(sui_tx_digest),
1076            Some(sui_tx_event_index),
1077            None,
1078            None,
1079            None,
1080            None,
1081            None,
1082        );
1083        mock_bridge_authority_signing_errors(
1084            vec![&mock0, &mock1, &mock2, &mock3],
1085            sui_tx_digest,
1086            sui_tx_event_index,
1087        );
1088        store
1089            .insert_pending_actions(std::slice::from_ref(&action))
1090            .unwrap();
1091        assert_eq!(
1092            store.get_all_pending_actions()[&action.digest()],
1093            action.clone()
1094        );
1095
1096        // Kick it
1097        submit_to_executor(&signing_tx, action.clone())
1098            .await
1099            .unwrap();
1100        let action_digest = action.digest();
1101
1102        // Wait for 1 second. It should still in the process of retrying requesting sigs becaues we mock errors above.
1103        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1104        tx_subscription.try_recv().unwrap_err();
1105        // And the action is still in WAL
1106        assert!(store.get_all_pending_actions().contains_key(&action_digest));
1107
1108        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1109
1110        // The next retry will see the action is already processed on chain and remove it from WAL
1111        let now = std::time::Instant::now();
1112        while store.get_all_pending_actions().contains_key(&action_digest) {
1113            if now.elapsed().as_secs() > 10 {
1114                panic!("Timeout waiting for action to be removed from WAL");
1115            }
1116            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1117        }
1118        tx_subscription.try_recv().unwrap_err();
1119    }
1120
1121    #[tokio::test]
1122    async fn test_skip_tx_submission_if_already_processed_on_chain() {
1123        let (
1124            _signing_tx,
1125            execution_tx,
1126            sui_client_mock,
1127            mut tx_subscription,
1128            store,
1129            secrets,
1130            dummy_sui_key,
1131            mock0,
1132            mock1,
1133            mock2,
1134            mock3,
1135            _handles,
1136            gas_object_ref,
1137            sui_address,
1138            sui_token_type_tags,
1139            _bridge_pause_tx,
1140        ) = setup().await;
1141        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1142        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1143            vec![&mock0, &mock1, &mock2, &mock3],
1144            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1145            None,
1146            true,
1147        );
1148
1149        let action = action_certificate.data().clone();
1150        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1151        let tx_data = build_sui_transaction(
1152            sui_address,
1153            &gas_object_ref,
1154            action_certificate.clone(),
1155            arg,
1156            &id_token_map,
1157            1000,
1158        )
1159        .unwrap();
1160        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1161        mock_transaction_error(
1162            &sui_client_mock,
1163            tx_digest,
1164            BridgeError::Generic("some random error".to_string()),
1165            true,
1166        );
1167
1168        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1169        sui_client_mock.add_gas_object_info(
1170            gas_coin.clone(),
1171            gas_object_ref,
1172            Owner::AddressOwner(sui_address),
1173        );
1174
1175        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1176
1177        store
1178            .insert_pending_actions(std::slice::from_ref(&action))
1179            .unwrap();
1180        assert_eq!(
1181            store.get_all_pending_actions()[&action.digest()],
1182            action.clone()
1183        );
1184
1185        // Kick it (send to the execution queue, skipping the signing queue)
1186        execution_tx
1187            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1188            .await
1189            .unwrap();
1190
1191        // Some requests come in and will fail.
1192        tx_subscription.recv().await.unwrap();
1193
1194        // Set the action to be already approved on chain
1195        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1196
1197        // The next retry will see the action is already processed on chain and remove it from WAL
1198        let now = std::time::Instant::now();
1199        let action_digest = action.digest();
1200        while store.get_all_pending_actions().contains_key(&action_digest) {
1201            if now.elapsed().as_secs() > 10 {
1202                panic!("Timeout waiting for action to be removed from WAL");
1203            }
1204            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1205        }
1206    }
1207
1208    #[tokio::test]
1209    async fn test_skip_tx_submission_if_bridge_is_paused() {
1210        let (
1211            _signing_tx,
1212            execution_tx,
1213            sui_client_mock,
1214            mut tx_subscription,
1215            store,
1216            secrets,
1217            dummy_sui_key,
1218            mock0,
1219            mock1,
1220            mock2,
1221            mock3,
1222            _handles,
1223            gas_object_ref,
1224            sui_address,
1225            sui_token_type_tags,
1226            bridge_pause_tx,
1227        ) = setup().await;
1228        let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1229        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1230            vec![&mock0, &mock1, &mock2, &mock3],
1231            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1232            None,
1233            true,
1234        );
1235
1236        let action = action_certificate.data().clone();
1237        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1238        let tx_data = build_sui_transaction(
1239            sui_address,
1240            &gas_object_ref,
1241            action_certificate.clone(),
1242            arg,
1243            &id_token_map,
1244            1000,
1245        )
1246        .unwrap();
1247        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1248        mock_transaction_error(
1249            &sui_client_mock,
1250            tx_digest,
1251            BridgeError::Generic("some random error".to_string()),
1252            true,
1253        );
1254
1255        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1256        sui_client_mock.add_gas_object_info(
1257            gas_coin.clone(),
1258            gas_object_ref,
1259            Owner::AddressOwner(sui_address),
1260        );
1261        let action_digest = action.digest();
1262        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1263
1264        // assert bridge is unpaused now
1265        assert!(!*bridge_pause_tx.borrow());
1266
1267        store
1268            .insert_pending_actions(std::slice::from_ref(&action))
1269            .unwrap();
1270        assert_eq!(
1271            store.get_all_pending_actions()[&action.digest()],
1272            action.clone()
1273        );
1274
1275        // Kick it (send to the execution queue, skipping the signing queue)
1276        execution_tx
1277            .send(CertifiedBridgeActionExecutionWrapper(
1278                action_certificate.clone(),
1279                0,
1280            ))
1281            .await
1282            .unwrap();
1283
1284        // Some requests come in
1285        tx_subscription.recv().await.unwrap();
1286
1287        // Pause the bridge
1288        bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1289
1290        // Kick it again
1291        execution_tx
1292            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1293            .await
1294            .unwrap();
1295
1296        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1297        // Nothing is sent to execute
1298        assert_eq!(
1299            tx_subscription.try_recv().unwrap_err(),
1300            tokio::sync::broadcast::error::TryRecvError::Empty
1301        );
1302        // Still in WAL
1303        assert_eq!(
1304            store.get_all_pending_actions()[&action_digest],
1305            action.clone()
1306        );
1307    }
1308
1309    #[tokio::test]
1310    async fn test_action_executor_handle_new_token() {
1311        let new_token_id = 255u8; // token id that does not exist
1312        let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1313        let (
1314            _signing_tx,
1315            execution_tx,
1316            sui_client_mock,
1317            mut tx_subscription,
1318            _store,
1319            secrets,
1320            dummy_sui_key,
1321            mock0,
1322            mock1,
1323            mock2,
1324            mock3,
1325            _handles,
1326            gas_object_ref,
1327            sui_address,
1328            sui_token_type_tags,
1329            _bridge_pause_tx,
1330        ) = setup().await;
1331        let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1332        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1333            vec![&mock0, &mock1, &mock2, &mock3],
1334            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1335            Some(new_token_id),
1336            false, // we need an eth -> sui action that entails the new token type tag in transaction building
1337        );
1338
1339        let action = action_certificate.data().clone();
1340        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1341        let tx_data = build_sui_transaction(
1342            sui_address,
1343            &gas_object_ref,
1344            action_certificate.clone(),
1345            arg,
1346            &maplit::hashmap! {
1347                new_token_id => new_type_tag.clone()
1348            },
1349            1000,
1350        )
1351        .unwrap();
1352        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1353        mock_transaction_error(
1354            &sui_client_mock,
1355            tx_digest,
1356            BridgeError::Generic("some random error".to_string()),
1357            true,
1358        );
1359
1360        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1361        sui_client_mock.add_gas_object_info(
1362            gas_coin.clone(),
1363            gas_object_ref,
1364            Owner::AddressOwner(sui_address),
1365        );
1366        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1367
1368        // Kick it (send to the execution queue, skipping the signing queue)
1369        execution_tx
1370            .send(CertifiedBridgeActionExecutionWrapper(
1371                action_certificate.clone(),
1372                0,
1373            ))
1374            .await
1375            .unwrap();
1376
1377        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1378        // Nothing is sent to execute, because the token id does not exist yet
1379        assert_eq!(
1380            tx_subscription.try_recv().unwrap_err(),
1381            tokio::sync::broadcast::error::TryRecvError::Empty
1382        );
1383
1384        // Now insert the new token id
1385        id_token_map.insert(new_token_id, new_type_tag);
1386        sui_token_type_tags.store(Arc::new(id_token_map));
1387
1388        // Kick it again
1389        execution_tx
1390            .send(CertifiedBridgeActionExecutionWrapper(
1391                action_certificate.clone(),
1392                0,
1393            ))
1394            .await
1395            .unwrap();
1396
1397        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1398        // The action is sent to execution
1399        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1400    }
1401
1402    fn mock_bridge_authority_sigs(
1403        mocks: Vec<&BridgeRequestMockHandler>,
1404        action: &BridgeAction,
1405        secrets: Vec<&BridgeAuthorityKeyPair>,
1406        sui_tx_digest: TransactionDigest,
1407        sui_tx_event_index: u16,
1408    ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1409        assert_eq!(mocks.len(), secrets.len());
1410        let mut signed_actions = BTreeMap::new();
1411        for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1412            let signed_action = sign_action_with_key(action, secret);
1413            mock.add_sui_event_response(
1414                sui_tx_digest,
1415                sui_tx_event_index,
1416                Ok(signed_action.clone()),
1417                None,
1418            );
1419            signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1420        }
1421        signed_actions
1422    }
1423
1424    fn mock_bridge_authority_signing_errors(
1425        mocks: Vec<&BridgeRequestMockHandler>,
1426        sui_tx_digest: TransactionDigest,
1427        sui_tx_event_index: u16,
1428    ) {
1429        for mock in mocks {
1430            mock.add_sui_event_response(
1431                sui_tx_digest,
1432                sui_tx_event_index,
1433                Err(BridgeError::RestAPIError("small issue".into())),
1434                None,
1435            );
1436        }
1437    }
1438
1439    /// Create a BridgeAction and mock authorities to return signatures
1440    fn get_bridge_authority_approved_action(
1441        mocks: Vec<&BridgeRequestMockHandler>,
1442        secrets: Vec<&BridgeAuthorityKeyPair>,
1443        token_id: Option<u8>,
1444        sui_to_eth: bool,
1445    ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1446        let sui_tx_digest = TransactionDigest::random();
1447        let sui_tx_event_index = 1;
1448        let action = if sui_to_eth {
1449            get_test_sui_to_eth_bridge_action(
1450                Some(sui_tx_digest),
1451                Some(sui_tx_event_index),
1452                None,
1453                None,
1454                None,
1455                None,
1456                token_id,
1457            )
1458        } else {
1459            get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1460        };
1461
1462        let sigs =
1463            mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1464        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1465            action,
1466            BridgeCommitteeValiditySignInfo { signatures: sigs },
1467        );
1468        (
1469            VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1470            sui_tx_digest,
1471            sui_tx_event_index,
1472        )
1473    }
1474
1475    fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1476        let sig = Signature::new_secure(
1477            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1478            dummy_sui_key,
1479        );
1480        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1481        *signed_tx.digest()
1482    }
1483
1484    /// Why is `wildcard` needed? This is because authority signatures
1485    /// are part of transaction data. Depending on whose signatures
1486    /// are included in what order, this may change the tx digest.
1487    fn mock_transaction_response(
1488        sui_client_mock: &SuiMockClient,
1489        tx_digest: TransactionDigest,
1490        status: SuiExecutionStatus,
1491        events: Option<Vec<SuiEvent>>,
1492        wildcard: bool,
1493    ) {
1494        let response = ExecuteTransactionResult {
1495            status,
1496            events: events.unwrap_or_default(),
1497        };
1498        if wildcard {
1499            sui_client_mock.set_wildcard_transaction_response(Ok(response));
1500        } else {
1501            sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1502        }
1503    }
1504
1505    fn mock_transaction_error(
1506        sui_client_mock: &SuiMockClient,
1507        tx_digest: TransactionDigest,
1508        error: BridgeError,
1509        wildcard: bool,
1510    ) {
1511        if wildcard {
1512            sui_client_mock.set_wildcard_transaction_response(Err(error));
1513        } else {
1514            sui_client_mock.add_transaction_response(tx_digest, Err(error));
1515        }
1516    }
1517
1518    #[allow(clippy::type_complexity)]
1519    async fn setup() -> (
1520        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1521        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1522        SuiMockClient,
1523        tokio::sync::broadcast::Receiver<TransactionDigest>,
1524        Arc<BridgeOrchestratorTables>,
1525        Vec<BridgeAuthorityKeyPair>,
1526        SuiKeyPair,
1527        BridgeRequestMockHandler,
1528        BridgeRequestMockHandler,
1529        BridgeRequestMockHandler,
1530        BridgeRequestMockHandler,
1531        Vec<tokio::task::JoinHandle<()>>,
1532        ObjectRef,
1533        SuiAddress,
1534        Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1535        tokio::sync::watch::Sender<IsBridgePaused>,
1536    ) {
1537        telemetry_subscribers::init_for_testing();
1538        let registry = Registry::new();
1539        mysten_metrics::init_metrics(&registry);
1540        init_all_struct_tags();
1541
1542        let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1543        let sui_key = SuiKeyPair::from(kp);
1544        let gas_object_ref = random_object_ref();
1545        let temp_dir = tempfile::tempdir().unwrap();
1546        let store = BridgeOrchestratorTables::new(temp_dir.path());
1547        let sui_client_mock = SuiMockClient::default();
1548        let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1549        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1550
1551        // The dummy key is used to sign transaction so we can get TransactionDigest easily.
1552        // User signature is not part of the transaction so it does not matter which key it is.
1553        let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1554        let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1555
1556        let mock0 = BridgeRequestMockHandler::new();
1557        let mock1 = BridgeRequestMockHandler::new();
1558        let mock2 = BridgeRequestMockHandler::new();
1559        let mock3 = BridgeRequestMockHandler::new();
1560
1561        let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1562            vec![2500, 2500, 2500, 2500],
1563            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1564        );
1565
1566        let committee = BridgeCommittee::new(authorities).unwrap();
1567
1568        let agg = Arc::new(ArcSwap::new(Arc::new(
1569            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1570        )));
1571        let metrics = Arc::new(BridgeMetrics::new(&registry));
1572        let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1573        let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1574        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1575        let executor = BridgeActionExecutor::new(
1576            sui_client.clone(),
1577            agg.clone(),
1578            store.clone(),
1579            sui_key,
1580            sui_address,
1581            gas_object_ref.0,
1582            sui_token_type_tags.clone(),
1583            bridge_pause_rx,
1584            metrics,
1585        )
1586        .await;
1587
1588        let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1589        handles.extend(executor_handle);
1590
1591        (
1592            signing_tx,
1593            execution_tx,
1594            sui_client_mock,
1595            tx_subscription,
1596            store,
1597            secrets,
1598            dummy_sui_key,
1599            mock0,
1600            mock1,
1601            mock2,
1602            mock3,
1603            handles,
1604            gas_object_ref,
1605            sui_address,
1606            sui_token_type_tags,
1607            bridge_pause_tx,
1608        )
1609    }
1610}