sui_bridge/
action_executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! BridgeActionExecutor receives BridgeActions (from BridgeOrchestrator),
5//! collects bridge authority signatures and submit signatures on chain.
6
7use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::{
13    SuiExecutionStatus, SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse,
14};
15use sui_types::TypeTag;
16use sui_types::transaction::ObjectArg;
17use sui_types::{
18    base_types::{ObjectID, ObjectRef, SuiAddress},
19    crypto::{Signature, SuiKeyPair},
20    digests::TransactionDigest,
21    gas_coin::GasCoin,
22    object::Owner,
23    transaction::Transaction,
24};
25
26use crate::events::{
27    TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
28    TokenTransferClaimed,
29};
30use crate::metrics::BridgeMetrics;
31use crate::{
32    client::bridge_authority_aggregator::BridgeAuthorityAggregator,
33    error::BridgeError,
34    storage::BridgeOrchestratorTables,
35    sui_client::{SuiClient, SuiClientInner},
36    sui_transaction_builder::build_sui_transaction,
37    types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
38};
39use std::collections::HashMap;
40use std::sync::Arc;
41use tokio::sync::Semaphore;
42use tokio::time::Duration;
43use tracing::{Instrument, error, info, instrument, warn};
44
45pub const CHANNEL_SIZE: usize = 1000;
46pub const SIGNING_CONCURRENCY: usize = 10;
47
48// delay schedule: at most 16 times including the initial attempt
49// 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s, 25.6s, 51.2s, 102.4s, 204.8s, 409.6s, 819.2s, 1638.4s
50pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
51pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
52
53async fn delay(attempt_times: u64) {
54    let delay_ms = 100 * 2_u64.pow(attempt_times as u32);
55    tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
56}
57
58#[derive(Debug)]
59pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
60
61#[derive(Debug)]
62pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
63
64pub trait BridgeActionExecutorTrait {
65    fn run(
66        self,
67    ) -> (
68        Vec<tokio::task::JoinHandle<()>>,
69        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
70    );
71}
72
73pub struct BridgeActionExecutor<C> {
74    sui_client: Arc<SuiClient<C>>,
75    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
76    key: SuiKeyPair,
77    sui_address: SuiAddress,
78    gas_object_id: ObjectID,
79    store: Arc<BridgeOrchestratorTables>,
80    bridge_object_arg: ObjectArg,
81    sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
82    bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
83    metrics: Arc<BridgeMetrics>,
84}
85
86impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
87where
88    C: SuiClientInner + 'static,
89{
90    fn run(
91        self,
92    ) -> (
93        Vec<tokio::task::JoinHandle<()>>,
94        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
95    ) {
96        let (tasks, sender, _) = self.run_inner();
97        (tasks, sender)
98    }
99}
100
101impl<C> BridgeActionExecutor<C>
102where
103    C: SuiClientInner + 'static,
104{
105    pub async fn new(
106        sui_client: Arc<SuiClient<C>>,
107        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
108        store: Arc<BridgeOrchestratorTables>,
109        key: SuiKeyPair,
110        sui_address: SuiAddress,
111        gas_object_id: ObjectID,
112        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
113        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
114        metrics: Arc<BridgeMetrics>,
115    ) -> Self {
116        let bridge_object_arg = sui_client
117            .get_mutable_bridge_object_arg_must_succeed()
118            .await;
119        Self {
120            sui_client,
121            bridge_auth_agg,
122            store,
123            key,
124            gas_object_id,
125            sui_address,
126            bridge_object_arg,
127            sui_token_type_tags,
128            bridge_pause_rx,
129            metrics,
130        }
131    }
132
133    fn run_inner(
134        self,
135    ) -> (
136        Vec<tokio::task::JoinHandle<()>>,
137        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
138        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
139    ) {
140        let key = self.key;
141
142        let (sender, receiver) = mysten_metrics::metered_channel::channel(
143            CHANNEL_SIZE,
144            &mysten_metrics::get_metrics()
145                .unwrap()
146                .channel_inflight
147                .with_label_values(&["executor_signing_queue"]),
148        );
149
150        let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
151            CHANNEL_SIZE,
152            &mysten_metrics::get_metrics()
153                .unwrap()
154                .channel_inflight
155                .with_label_values(&["executor_execution_queue"]),
156        );
157        let execution_tx_clone = execution_tx.clone();
158        let sender_clone = sender.clone();
159        let store_clone = self.store.clone();
160        let client_clone = self.sui_client.clone();
161        let mut tasks = vec![];
162        let metrics = self.metrics.clone();
163        tasks.push(spawn_logged_monitored_task!(
164            Self::run_signature_aggregation_loop(
165                client_clone,
166                self.bridge_auth_agg,
167                store_clone,
168                sender_clone,
169                receiver,
170                execution_tx_clone,
171                metrics,
172            )
173        ));
174
175        let metrics = self.metrics.clone();
176        let execution_tx_clone = execution_tx.clone();
177        tasks.push(spawn_logged_monitored_task!(
178            Self::run_onchain_execution_loop(
179                self.sui_client.clone(),
180                key,
181                self.sui_address,
182                self.gas_object_id,
183                self.store.clone(),
184                execution_tx_clone,
185                execution_rx,
186                self.bridge_object_arg,
187                self.sui_token_type_tags,
188                self.bridge_pause_rx,
189                metrics,
190            )
191        ));
192        (tasks, sender, execution_tx)
193    }
194
195    async fn run_signature_aggregation_loop(
196        sui_client: Arc<SuiClient<C>>,
197        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
198        store: Arc<BridgeOrchestratorTables>,
199        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
200        mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
201            BridgeActionExecutionWrapper,
202        >,
203        execution_queue_sender: mysten_metrics::metered_channel::Sender<
204            CertifiedBridgeActionExecutionWrapper,
205        >,
206        metrics: Arc<BridgeMetrics>,
207    ) {
208        info!("Starting run_signature_aggregation_loop");
209        let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
210        while let Some(action) = signing_queue_receiver.recv().await {
211            Self::handle_signing_task(
212                &semaphore,
213                &auth_agg,
214                &signing_queue_sender,
215                &execution_queue_sender,
216                &sui_client,
217                &store,
218                action,
219                &metrics,
220            )
221            .await;
222        }
223    }
224
225    async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
226        let Ok(Ok(is_paused)) =
227            retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
228        else {
229            error!("Failed to get bridge status after retry");
230            return false;
231        };
232        !is_paused
233    }
234
235    #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
236    async fn handle_signing_task(
237        semaphore: &Arc<Semaphore>,
238        auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
239        signing_queue_sender: &mysten_metrics::metered_channel::Sender<
240            BridgeActionExecutionWrapper,
241        >,
242        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
243            CertifiedBridgeActionExecutionWrapper,
244        >,
245        sui_client: &Arc<SuiClient<C>>,
246        store: &Arc<BridgeOrchestratorTables>,
247        action: BridgeActionExecutionWrapper,
248        metrics: &Arc<BridgeMetrics>,
249    ) {
250        metrics.action_executor_signing_queue_received_actions.inc();
251        let action_key = action.0.key();
252        info!("Received action for signing: {:?}", action.0);
253
254        // TODO: this is a temporary fix to avoid signing when the bridge is paused.
255        // but the way is implemented is not ideal:
256        // 1. it should check the direction
257        // 2. should use a better mechanism to check the bridge status instead of polling for each action
258        let should_proceed = Self::should_proceed_signing(sui_client).await;
259        if !should_proceed {
260            metrics.action_executor_signing_queue_skipped_actions.inc();
261            warn!("skipping signing task: {:?}", action_key);
262            return;
263        }
264
265        let auth_agg_clone = auth_agg.clone();
266        let signing_queue_sender_clone = signing_queue_sender.clone();
267        let execution_queue_sender_clone = execution_queue_sender.clone();
268        let sui_client_clone = sui_client.clone();
269        let store_clone = store.clone();
270        let metrics_clone = metrics.clone();
271        let semaphore_clone = semaphore.clone();
272        spawn_logged_monitored_task!(
273            Self::request_signatures(
274                semaphore_clone,
275                sui_client_clone,
276                auth_agg_clone,
277                action,
278                store_clone,
279                signing_queue_sender_clone,
280                execution_queue_sender_clone,
281                metrics_clone,
282            )
283            .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
284            "request_signatures"
285        );
286    }
287
288    // Checks if the action is already processed on chain.
289    // If yes, skip this action and remove it from the pending log.
290    // Returns true if the action is already processed.
291    async fn handle_already_processed_token_transfer_action_maybe(
292        sui_client: &Arc<SuiClient<C>>,
293        action: &BridgeAction,
294        store: &Arc<BridgeOrchestratorTables>,
295        metrics: &Arc<BridgeMetrics>,
296    ) -> bool {
297        let status = sui_client
298            .get_token_transfer_action_onchain_status_until_success(
299                action.chain_id() as u8,
300                action.seq_number(),
301            )
302            .await;
303        match status {
304            BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
305                info!(
306                    "Action already approved or claimed, removing action from pending logs: {:?}",
307                    action
308                );
309                metrics.action_executor_already_processed_actions.inc();
310                store
311                    .remove_pending_actions(&[action.digest()])
312                    .unwrap_or_else(|e| {
313                        panic!("Write to DB should not fail: {:?}", e);
314                    });
315                true
316            }
317            // Although theoretically a legit SuiToEthBridgeAction should not have
318            // status `NotFound`
319            BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
320        }
321    }
322
323    // TODO: introduce a way to properly stagger the handling
324    // for various validators.
325    async fn request_signatures(
326        semaphore: Arc<Semaphore>,
327        sui_client: Arc<SuiClient<C>>,
328        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
329        action: BridgeActionExecutionWrapper,
330        store: Arc<BridgeOrchestratorTables>,
331        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
332        execution_queue_sender: mysten_metrics::metered_channel::Sender<
333            CertifiedBridgeActionExecutionWrapper,
334        >,
335        metrics: Arc<BridgeMetrics>,
336    ) {
337        let _permit = semaphore
338            .acquire()
339            .await
340            .expect("semaphore should not be closed");
341        info!("requesting signatures");
342        let BridgeActionExecutionWrapper(action, attempt_times) = action;
343
344        // Only token transfer action should reach here
345        match &action {
346            BridgeAction::SuiToEthBridgeAction(_)
347            | BridgeAction::SuiToEthTokenTransfer(_)
348            | BridgeAction::EthToSuiBridgeAction(_) => (),
349            _ => unreachable!("Non token transfer action should not reach here"),
350        };
351
352        // If the action is already processed, skip it.
353        if Self::handle_already_processed_token_transfer_action_maybe(
354            &sui_client,
355            &action,
356            &store,
357            &metrics,
358        )
359        .await
360        {
361            return;
362        }
363        match auth_agg
364            .load()
365            .request_committee_signatures(action.clone())
366            .await
367        {
368            Ok(certificate) => {
369                info!("Sending certificate to execution");
370                execution_queue_sender
371                    .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
372                    .await
373                    .unwrap_or_else(|e| {
374                        panic!("Sending to execution queue should not fail: {:?}", e);
375                    });
376            }
377            Err(e) => {
378                warn!("Failed to collect sigs for bridge action: {:?}", e);
379                metrics.err_signature_aggregation.inc();
380
381                // TODO: spawn a task for this
382                if attempt_times >= MAX_SIGNING_ATTEMPTS {
383                    metrics.err_signature_aggregation_too_many_failures.inc();
384                    error!(
385                        "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
386                        e
387                    );
388                    return;
389                }
390                delay(attempt_times).await;
391                signing_queue_sender
392                    .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
393                    .await
394                    .unwrap_or_else(|e| {
395                        panic!("Sending to signing queue should not fail: {:?}", e);
396                    });
397            }
398        }
399    }
400
401    // Before calling this function, `key` and `sui_address` need to be
402    // verified to match.
403    async fn run_onchain_execution_loop(
404        sui_client: Arc<SuiClient<C>>,
405        sui_key: SuiKeyPair,
406        sui_address: SuiAddress,
407        gas_object_id: ObjectID,
408        store: Arc<BridgeOrchestratorTables>,
409        execution_queue_sender: mysten_metrics::metered_channel::Sender<
410            CertifiedBridgeActionExecutionWrapper,
411        >,
412        mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
413            CertifiedBridgeActionExecutionWrapper,
414        >,
415        bridge_object_arg: ObjectArg,
416        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
417        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
418        metrics: Arc<BridgeMetrics>,
419    ) {
420        info!("Starting run_onchain_execution_loop");
421        while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
422            // When bridge is paused, skip execution.
423            // Skipped actions will be picked up upon node restarting
424            // if bridge is unpaused.
425            if *bridge_pause_rx.borrow() {
426                warn!("Bridge is paused, skipping execution");
427                metrics
428                    .action_executor_execution_queue_skipped_actions_due_to_pausing
429                    .inc();
430                continue;
431            }
432            Self::handle_execution_task(
433                certificate_wrapper,
434                &sui_client,
435                &sui_key,
436                &sui_address,
437                gas_object_id,
438                &store,
439                &execution_queue_sender,
440                &bridge_object_arg,
441                &sui_token_type_tags,
442                &metrics,
443            )
444            .await;
445        }
446        panic!("Execution queue closed unexpectedly");
447    }
448
449    #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
450    async fn handle_execution_task(
451        certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
452        sui_client: &Arc<SuiClient<C>>,
453        sui_key: &SuiKeyPair,
454        sui_address: &SuiAddress,
455        gas_object_id: ObjectID,
456        store: &Arc<BridgeOrchestratorTables>,
457        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
458            CertifiedBridgeActionExecutionWrapper,
459        >,
460        bridge_object_arg: &ObjectArg,
461        sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
462        metrics: &Arc<BridgeMetrics>,
463    ) {
464        metrics
465            .action_executor_execution_queue_received_actions
466            .inc();
467        let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
468        let action = certificate.data();
469        let action_key = action.key();
470
471        info!("Received certified action for execution: {:?}", action);
472
473        // TODO check gas coin balance here. If gas balance too low, do not proceed.
474        let (gas_coin, gas_object_ref) =
475            Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
476        metrics.gas_coin_balance.set(gas_coin.value() as i64);
477
478        let ceriticate_clone = certificate.clone();
479
480        // Check once: if the action is already processed, skip it.
481        if Self::handle_already_processed_token_transfer_action_maybe(
482            sui_client, action, store, metrics,
483        )
484        .await
485        {
486            info!("Action already processed, skipping");
487            return;
488        }
489
490        info!("Building Sui transaction");
491        let rgp = sui_client.get_reference_gas_price_until_success().await;
492        let tx_data = match build_sui_transaction(
493            *sui_address,
494            &gas_object_ref,
495            ceriticate_clone,
496            *bridge_object_arg,
497            sui_token_type_tags.load().as_ref(),
498            rgp,
499        ) {
500            Ok(tx_data) => tx_data,
501            Err(err) => {
502                metrics.err_build_sui_transaction.inc();
503                error!(
504                    "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
505                    action, err
506                );
507                // This should not happen, but in case it does, we do not want to
508                // panic, instead we log here for manual intervention.
509                return;
510            }
511        };
512        let sig = Signature::new_secure(
513            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
514            sui_key,
515        );
516        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
517        let tx_digest = *signed_tx.digest();
518
519        // Check twice: If the action is already processed, skip it.
520        if Self::handle_already_processed_token_transfer_action_maybe(
521            sui_client, action, store, metrics,
522        )
523        .await
524        {
525            info!("Action already processed, skipping");
526            return;
527        }
528
529        info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
530        match sui_client
531            .execute_transaction_block_with_effects(signed_tx)
532            .await
533        {
534            Ok(resp) => {
535                Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
536            }
537
538            // If the transaction did not go through, retry up to a certain times.
539            Err(err) => {
540                error!(
541                    ?action_key,
542                    ?tx_digest,
543                    "Sui transaction failed at signing: {err:?}"
544                );
545                metrics.err_sui_transaction_submission.inc();
546                let metrics_clone = metrics.clone();
547                // Do this in a separate task so we won't deadlock here
548                let sender_clone = execution_queue_sender.clone();
549                spawn_logged_monitored_task!(async move {
550                    // If it fails for too many times, log and ask for manual intervention.
551                    if attempt_times >= MAX_EXECUTION_ATTEMPTS {
552                        metrics_clone
553                            .err_sui_transaction_submission_too_many_failures
554                            .inc();
555                        error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
556                        return;
557                    }
558                    delay(attempt_times).await;
559                    sender_clone
560                        .send(CertifiedBridgeActionExecutionWrapper(
561                            certificate,
562                            attempt_times + 1,
563                        ))
564                        .await
565                        .unwrap_or_else(|e| {
566                            panic!("Sending to execution queue should not fail: {:?}", e);
567                        });
568                    info!("Re-enqueued certificate for execution");
569                }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
570            }
571        }
572    }
573
574    // TODO: do we need a mechanism to periodically read pending actions from DB?
575    async fn handle_execution_effects(
576        tx_digest: TransactionDigest,
577        response: SuiTransactionBlockResponse,
578        store: &Arc<BridgeOrchestratorTables>,
579        action: &BridgeAction,
580        metrics: &Arc<BridgeMetrics>,
581    ) {
582        let effects = response
583            .effects
584            .clone()
585            .expect("We requested effects but got None.");
586        let status = effects.status();
587        match status {
588            SuiExecutionStatus::Success => {
589                let events = response.events.expect("We requested events but got None.");
590                let relevant_events = events
591                    .data
592                    .iter()
593                    .filter(|e| {
594                        e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
595                            || e.type_ == *TokenTransferClaimed.get().unwrap()
596                            || e.type_ == *TokenTransferApproved.get().unwrap()
597                            || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
598                    })
599                    .collect::<Vec<_>>();
600                assert!(
601                    !relevant_events.is_empty(),
602                    "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
603                    or TokenTransferAlreadyApproved event but got: {:?}",
604                    events
605                );
606                info!(?tx_digest, "Sui transaction executed successfully");
607                // track successful approval and claim events
608                relevant_events.iter().for_each(|e| {
609                    if e.type_ == *TokenTransferClaimed.get().unwrap() {
610                        match action {
611                            BridgeAction::EthToSuiBridgeAction(_) => {
612                                metrics.eth_sui_token_transfer_claimed.inc();
613                            }
614                            BridgeAction::SuiToEthBridgeAction(_) => {
615                                metrics.sui_eth_token_transfer_claimed.inc();
616                            }
617                            _ => error!("Unexpected action type for claimed event: {:?}", action),
618                        }
619                    } else if e.type_ == *TokenTransferApproved.get().unwrap() {
620                        match action {
621                            BridgeAction::EthToSuiBridgeAction(_) => {
622                                metrics.eth_sui_token_transfer_approved.inc();
623                            }
624                            BridgeAction::SuiToEthBridgeAction(_) => {
625                                metrics.sui_eth_token_transfer_approved.inc();
626                            }
627                            _ => error!("Unexpected action type for approved event: {:?}", action),
628                        }
629                    }
630                });
631                store
632                    .remove_pending_actions(&[action.digest()])
633                    .unwrap_or_else(|e| {
634                        panic!("Write to DB should not fail: {:?}", e);
635                    })
636            }
637            SuiExecutionStatus::Failure { error } => {
638                // In practice the transaction could fail because of running out of gas, but really
639                // should not be due to other reasons.
640                // This means manual intervention is needed. So we do not push them back to
641                // the execution queue because retries are mostly likely going to fail anyway.
642                // After human examination, the node should be restarted and fetch them from WAL.
643
644                metrics.err_sui_transaction_execution.inc();
645                error!(
646                    ?tx_digest,
647                    "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
648                );
649            }
650        }
651    }
652
653    /// Panics if the gas object is not owned by the address.
654    async fn get_gas_data_assert_ownership(
655        sui_address: SuiAddress,
656        gas_object_id: ObjectID,
657        sui_client: &SuiClient<C>,
658    ) -> (GasCoin, ObjectRef) {
659        let (gas_coin, gas_obj_ref, owner) = sui_client
660            .get_gas_data_panic_if_not_gas(gas_object_id)
661            .await;
662
663        // TODO: when we add multiple gas support in the future we could discard
664        // transferred gas object instead.
665        assert_eq!(
666            owner,
667            Owner::AddressOwner(sui_address),
668            "Gas object {:?} is no longer owned by address {}",
669            gas_object_id,
670            sui_address
671        );
672        (gas_coin, gas_obj_ref)
673    }
674}
675
676pub async fn submit_to_executor(
677    tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
678    action: BridgeAction,
679) -> Result<(), BridgeError> {
680    tx.send(BridgeActionExecutionWrapper(action, 0))
681        .await
682        .map_err(|e| BridgeError::Generic(e.to_string()))
683}
684
685#[cfg(test)]
686mod tests {
687    use crate::events::init_all_struct_tags;
688    use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
689    use crate::types::BRIDGE_PAUSED;
690    use fastcrypto::traits::KeyPair;
691    use prometheus::Registry;
692    use std::collections::{BTreeMap, HashMap};
693    use std::str::FromStr;
694    use sui_json_rpc_types::SuiTransactionBlockEffects;
695    use sui_json_rpc_types::SuiTransactionBlockEvents;
696    use sui_json_rpc_types::{SuiEvent, SuiTransactionBlockResponse};
697    use sui_types::TypeTag;
698    use sui_types::crypto::get_key_pair;
699    use sui_types::gas_coin::GasCoin;
700    use sui_types::{base_types::random_object_ref, transaction::TransactionData};
701
702    use crate::{
703        crypto::{
704            BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
705            BridgeAuthorityRecoverableSignature,
706        },
707        server::mock_handler::BridgeRequestMockHandler,
708        sui_mock_client::SuiMockClient,
709        test_utils::{
710            get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
711            get_test_sui_to_eth_bridge_action, sign_action_with_key,
712        },
713        types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
714    };
715
716    use super::*;
717
718    #[tokio::test]
719    async fn test_onchain_execution_loop() {
720        let (
721            signing_tx,
722            _execution_tx,
723            sui_client_mock,
724            mut tx_subscription,
725            store,
726            secrets,
727            dummy_sui_key,
728            mock0,
729            mock1,
730            mock2,
731            mock3,
732            _handles,
733            gas_object_ref,
734            sui_address,
735            sui_token_type_tags,
736            _bridge_pause_tx,
737        ) = setup().await;
738        let (action_certificate, _, _) = get_bridge_authority_approved_action(
739            vec![&mock0, &mock1, &mock2, &mock3],
740            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
741            None,
742            true,
743        );
744        let action = action_certificate.data().clone();
745        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
746        let tx_data = build_sui_transaction(
747            sui_address,
748            &gas_object_ref,
749            action_certificate,
750            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
751            &id_token_map,
752            1000,
753        )
754        .unwrap();
755
756        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
757
758        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
759        sui_client_mock.add_gas_object_info(
760            gas_coin.clone(),
761            gas_object_ref,
762            Owner::AddressOwner(sui_address),
763        );
764
765        // Mock the transaction to be successfully executed
766        let mut event = SuiEvent::random_for_testing();
767        event.type_ = TokenTransferClaimed.get().unwrap().clone();
768        let events = vec![event];
769        mock_transaction_response(
770            &sui_client_mock,
771            tx_digest,
772            SuiExecutionStatus::Success,
773            Some(events),
774            true,
775        );
776
777        store
778            .insert_pending_actions(std::slice::from_ref(&action))
779            .unwrap();
780        assert_eq!(
781            store.get_all_pending_actions()[&action.digest()],
782            action.clone()
783        );
784
785        // Kick it
786        submit_to_executor(&signing_tx, action.clone())
787            .await
788            .unwrap();
789
790        // Expect to see the transaction to be requested and successfully executed hence removed from WAL
791        tx_subscription.recv().await.unwrap();
792        assert!(store.get_all_pending_actions().is_empty());
793
794        /////////////////////////////////////////////////////////////////////////////////////////////////
795        ////////////////////////////////////// Test execution failure ///////////////////////////////////
796        /////////////////////////////////////////////////////////////////////////////////////////////////
797
798        let (action_certificate, _, _) = get_bridge_authority_approved_action(
799            vec![&mock0, &mock1, &mock2, &mock3],
800            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
801            None,
802            true,
803        );
804
805        let action = action_certificate.data().clone();
806
807        let tx_data = build_sui_transaction(
808            sui_address,
809            &gas_object_ref,
810            action_certificate,
811            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
812            &id_token_map,
813            1000,
814        )
815        .unwrap();
816        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
817
818        // Mock the transaction to fail
819        mock_transaction_response(
820            &sui_client_mock,
821            tx_digest,
822            SuiExecutionStatus::Failure {
823                error: "failure is mother of success".to_string(),
824            },
825            None,
826            true,
827        );
828
829        store
830            .insert_pending_actions(std::slice::from_ref(&action))
831            .unwrap();
832        assert_eq!(
833            store.get_all_pending_actions()[&action.digest()],
834            action.clone()
835        );
836
837        // Kick it
838        submit_to_executor(&signing_tx, action.clone())
839            .await
840            .unwrap();
841
842        // Expect to see the transaction to be requested and but failed
843        tx_subscription.recv().await.unwrap();
844        // The action is not removed from WAL because the transaction failed
845        assert_eq!(
846            store.get_all_pending_actions()[&action.digest()],
847            action.clone()
848        );
849
850        /////////////////////////////////////////////////////////////////////////////////////////////////
851        //////////////////////////// Test transaction failed at signing stage ///////////////////////////
852        /////////////////////////////////////////////////////////////////////////////////////////////////
853
854        let (action_certificate, _, _) = get_bridge_authority_approved_action(
855            vec![&mock0, &mock1, &mock2, &mock3],
856            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
857            None,
858            true,
859        );
860
861        let action = action_certificate.data().clone();
862
863        let tx_data = build_sui_transaction(
864            sui_address,
865            &gas_object_ref,
866            action_certificate,
867            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
868            &id_token_map,
869            1000,
870        )
871        .unwrap();
872        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
873        mock_transaction_error(
874            &sui_client_mock,
875            tx_digest,
876            BridgeError::Generic("some random error".to_string()),
877            true,
878        );
879
880        store
881            .insert_pending_actions(std::slice::from_ref(&action))
882            .unwrap();
883        assert_eq!(
884            store.get_all_pending_actions()[&action.digest()],
885            action.clone()
886        );
887
888        // Kick it
889        submit_to_executor(&signing_tx, action.clone())
890            .await
891            .unwrap();
892
893        // Failure will trigger retry, we wait for 2 requests before checking WAL log
894        let tx_digest = tx_subscription.recv().await.unwrap();
895        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
896
897        // The retry is still going on, action still in WAL
898        assert!(
899            store
900                .get_all_pending_actions()
901                .contains_key(&action.digest())
902        );
903
904        // Now let it succeed
905        let mut event = SuiEvent::random_for_testing();
906        event.type_ = TokenTransferClaimed.get().unwrap().clone();
907        let events = vec![event];
908        mock_transaction_response(
909            &sui_client_mock,
910            tx_digest,
911            SuiExecutionStatus::Success,
912            Some(events),
913            true,
914        );
915
916        // Give it 1 second to retry and succeed
917        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
918        // The action is successful and should be removed from WAL now
919        assert!(
920            !store
921                .get_all_pending_actions()
922                .contains_key(&action.digest())
923        );
924    }
925
926    #[tokio::test]
927    async fn test_signature_aggregation_loop() {
928        let (
929            signing_tx,
930            _execution_tx,
931            sui_client_mock,
932            mut tx_subscription,
933            store,
934            secrets,
935            dummy_sui_key,
936            mock0,
937            mock1,
938            mock2,
939            mock3,
940            _handles,
941            gas_object_ref,
942            sui_address,
943            sui_token_type_tags,
944            _bridge_pause_tx,
945        ) = setup().await;
946        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
947        let (action_certificate, sui_tx_digest, sui_tx_event_index) =
948            get_bridge_authority_approved_action(
949                vec![&mock0, &mock1, &mock2, &mock3],
950                vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
951                None,
952                true,
953            );
954        let action = action_certificate.data().clone();
955        mock_bridge_authority_signing_errors(
956            vec![&mock0, &mock1, &mock2],
957            sui_tx_digest,
958            sui_tx_event_index,
959        );
960        let mut sigs = mock_bridge_authority_sigs(
961            vec![&mock3],
962            &action,
963            vec![&secrets[3]],
964            sui_tx_digest,
965            sui_tx_event_index,
966        );
967
968        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
969        sui_client_mock.add_gas_object_info(
970            gas_coin,
971            gas_object_ref,
972            Owner::AddressOwner(sui_address),
973        );
974        store
975            .insert_pending_actions(std::slice::from_ref(&action))
976            .unwrap();
977        assert_eq!(
978            store.get_all_pending_actions()[&action.digest()],
979            action.clone()
980        );
981
982        // Kick it
983        submit_to_executor(&signing_tx, action.clone())
984            .await
985            .unwrap();
986
987        // Wait until the transaction is retried at least once (instead of deing dropped)
988        loop {
989            let requested_times =
990                mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
991            if requested_times >= 2 {
992                break;
993            }
994            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
995        }
996        // Nothing is sent to execute yet
997        assert_eq!(
998            tx_subscription.try_recv().unwrap_err(),
999            tokio::sync::broadcast::error::TryRecvError::Empty
1000        );
1001        // Still in WAL
1002        assert_eq!(
1003            store.get_all_pending_actions()[&action.digest()],
1004            action.clone()
1005        );
1006
1007        // Let authorities sign the action too. Now we are above the threshold
1008        let sig_from_2 = mock_bridge_authority_sigs(
1009            vec![&mock2],
1010            &action,
1011            vec![&secrets[2]],
1012            sui_tx_digest,
1013            sui_tx_event_index,
1014        );
1015        sigs.extend(sig_from_2);
1016        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1017            action.clone(),
1018            BridgeCommitteeValiditySignInfo { signatures: sigs },
1019        );
1020        let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1021        let tx_data = build_sui_transaction(
1022            sui_address,
1023            &gas_object_ref,
1024            action_certificate,
1025            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1026            &id_token_map,
1027            1000,
1028        )
1029        .unwrap();
1030        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1031
1032        let mut event = SuiEvent::random_for_testing();
1033        event.type_ = TokenTransferClaimed.get().unwrap().clone();
1034        let events = vec![event];
1035        mock_transaction_response(
1036            &sui_client_mock,
1037            tx_digest,
1038            SuiExecutionStatus::Success,
1039            Some(events),
1040            true,
1041        );
1042
1043        // Expect to see the transaction to be requested and succeed
1044        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1045        // The action is removed from WAL
1046        assert!(
1047            !store
1048                .get_all_pending_actions()
1049                .contains_key(&action.digest())
1050        );
1051    }
1052
1053    #[tokio::test]
1054    async fn test_skip_request_signature_if_already_processed_on_chain() {
1055        let (
1056            signing_tx,
1057            _execution_tx,
1058            sui_client_mock,
1059            mut tx_subscription,
1060            store,
1061            _secrets,
1062            _dummy_sui_key,
1063            mock0,
1064            mock1,
1065            mock2,
1066            mock3,
1067            _handles,
1068            _gas_object_ref,
1069            _sui_address,
1070            _sui_token_type_tags,
1071            _bridge_pause_tx,
1072        ) = setup().await;
1073
1074        let sui_tx_digest = TransactionDigest::random();
1075        let sui_tx_event_index = 0;
1076        let action = get_test_sui_to_eth_bridge_action(
1077            Some(sui_tx_digest),
1078            Some(sui_tx_event_index),
1079            None,
1080            None,
1081            None,
1082            None,
1083            None,
1084        );
1085        mock_bridge_authority_signing_errors(
1086            vec![&mock0, &mock1, &mock2, &mock3],
1087            sui_tx_digest,
1088            sui_tx_event_index,
1089        );
1090        store
1091            .insert_pending_actions(std::slice::from_ref(&action))
1092            .unwrap();
1093        assert_eq!(
1094            store.get_all_pending_actions()[&action.digest()],
1095            action.clone()
1096        );
1097
1098        // Kick it
1099        submit_to_executor(&signing_tx, action.clone())
1100            .await
1101            .unwrap();
1102        let action_digest = action.digest();
1103
1104        // Wait for 1 second. It should still in the process of retrying requesting sigs becaues we mock errors above.
1105        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1106        tx_subscription.try_recv().unwrap_err();
1107        // And the action is still in WAL
1108        assert!(store.get_all_pending_actions().contains_key(&action_digest));
1109
1110        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1111
1112        // The next retry will see the action is already processed on chain and remove it from WAL
1113        let now = std::time::Instant::now();
1114        while store.get_all_pending_actions().contains_key(&action_digest) {
1115            if now.elapsed().as_secs() > 10 {
1116                panic!("Timeout waiting for action to be removed from WAL");
1117            }
1118            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1119        }
1120        tx_subscription.try_recv().unwrap_err();
1121    }
1122
1123    #[tokio::test]
1124    async fn test_skip_tx_submission_if_already_processed_on_chain() {
1125        let (
1126            _signing_tx,
1127            execution_tx,
1128            sui_client_mock,
1129            mut tx_subscription,
1130            store,
1131            secrets,
1132            dummy_sui_key,
1133            mock0,
1134            mock1,
1135            mock2,
1136            mock3,
1137            _handles,
1138            gas_object_ref,
1139            sui_address,
1140            sui_token_type_tags,
1141            _bridge_pause_tx,
1142        ) = setup().await;
1143        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1144        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1145            vec![&mock0, &mock1, &mock2, &mock3],
1146            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1147            None,
1148            true,
1149        );
1150
1151        let action = action_certificate.data().clone();
1152        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1153        let tx_data = build_sui_transaction(
1154            sui_address,
1155            &gas_object_ref,
1156            action_certificate.clone(),
1157            arg,
1158            &id_token_map,
1159            1000,
1160        )
1161        .unwrap();
1162        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1163        mock_transaction_error(
1164            &sui_client_mock,
1165            tx_digest,
1166            BridgeError::Generic("some random error".to_string()),
1167            true,
1168        );
1169
1170        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1171        sui_client_mock.add_gas_object_info(
1172            gas_coin.clone(),
1173            gas_object_ref,
1174            Owner::AddressOwner(sui_address),
1175        );
1176
1177        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1178
1179        store
1180            .insert_pending_actions(std::slice::from_ref(&action))
1181            .unwrap();
1182        assert_eq!(
1183            store.get_all_pending_actions()[&action.digest()],
1184            action.clone()
1185        );
1186
1187        // Kick it (send to the execution queue, skipping the signing queue)
1188        execution_tx
1189            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1190            .await
1191            .unwrap();
1192
1193        // Some requests come in and will fail.
1194        tx_subscription.recv().await.unwrap();
1195
1196        // Set the action to be already approved on chain
1197        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1198
1199        // The next retry will see the action is already processed on chain and remove it from WAL
1200        let now = std::time::Instant::now();
1201        let action_digest = action.digest();
1202        while store.get_all_pending_actions().contains_key(&action_digest) {
1203            if now.elapsed().as_secs() > 10 {
1204                panic!("Timeout waiting for action to be removed from WAL");
1205            }
1206            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1207        }
1208    }
1209
1210    #[tokio::test]
1211    async fn test_skip_tx_submission_if_bridge_is_paused() {
1212        let (
1213            _signing_tx,
1214            execution_tx,
1215            sui_client_mock,
1216            mut tx_subscription,
1217            store,
1218            secrets,
1219            dummy_sui_key,
1220            mock0,
1221            mock1,
1222            mock2,
1223            mock3,
1224            _handles,
1225            gas_object_ref,
1226            sui_address,
1227            sui_token_type_tags,
1228            bridge_pause_tx,
1229        ) = setup().await;
1230        let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1231        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1232            vec![&mock0, &mock1, &mock2, &mock3],
1233            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1234            None,
1235            true,
1236        );
1237
1238        let action = action_certificate.data().clone();
1239        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1240        let tx_data = build_sui_transaction(
1241            sui_address,
1242            &gas_object_ref,
1243            action_certificate.clone(),
1244            arg,
1245            &id_token_map,
1246            1000,
1247        )
1248        .unwrap();
1249        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1250        mock_transaction_error(
1251            &sui_client_mock,
1252            tx_digest,
1253            BridgeError::Generic("some random error".to_string()),
1254            true,
1255        );
1256
1257        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1258        sui_client_mock.add_gas_object_info(
1259            gas_coin.clone(),
1260            gas_object_ref,
1261            Owner::AddressOwner(sui_address),
1262        );
1263        let action_digest = action.digest();
1264        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1265
1266        // assert bridge is unpaused now
1267        assert!(!*bridge_pause_tx.borrow());
1268
1269        store
1270            .insert_pending_actions(std::slice::from_ref(&action))
1271            .unwrap();
1272        assert_eq!(
1273            store.get_all_pending_actions()[&action.digest()],
1274            action.clone()
1275        );
1276
1277        // Kick it (send to the execution queue, skipping the signing queue)
1278        execution_tx
1279            .send(CertifiedBridgeActionExecutionWrapper(
1280                action_certificate.clone(),
1281                0,
1282            ))
1283            .await
1284            .unwrap();
1285
1286        // Some requests come in
1287        tx_subscription.recv().await.unwrap();
1288
1289        // Pause the bridge
1290        bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1291
1292        // Kick it again
1293        execution_tx
1294            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1295            .await
1296            .unwrap();
1297
1298        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1299        // Nothing is sent to execute
1300        assert_eq!(
1301            tx_subscription.try_recv().unwrap_err(),
1302            tokio::sync::broadcast::error::TryRecvError::Empty
1303        );
1304        // Still in WAL
1305        assert_eq!(
1306            store.get_all_pending_actions()[&action_digest],
1307            action.clone()
1308        );
1309    }
1310
1311    #[tokio::test]
1312    async fn test_action_executor_handle_new_token() {
1313        let new_token_id = 255u8; // token id that does not exist
1314        let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1315        let (
1316            _signing_tx,
1317            execution_tx,
1318            sui_client_mock,
1319            mut tx_subscription,
1320            _store,
1321            secrets,
1322            dummy_sui_key,
1323            mock0,
1324            mock1,
1325            mock2,
1326            mock3,
1327            _handles,
1328            gas_object_ref,
1329            sui_address,
1330            sui_token_type_tags,
1331            _bridge_pause_tx,
1332        ) = setup().await;
1333        let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1334        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1335            vec![&mock0, &mock1, &mock2, &mock3],
1336            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1337            Some(new_token_id),
1338            false, // we need an eth -> sui action that entails the new token type tag in transaction building
1339        );
1340
1341        let action = action_certificate.data().clone();
1342        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1343        let tx_data = build_sui_transaction(
1344            sui_address,
1345            &gas_object_ref,
1346            action_certificate.clone(),
1347            arg,
1348            &maplit::hashmap! {
1349                new_token_id => new_type_tag.clone()
1350            },
1351            1000,
1352        )
1353        .unwrap();
1354        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1355        mock_transaction_error(
1356            &sui_client_mock,
1357            tx_digest,
1358            BridgeError::Generic("some random error".to_string()),
1359            true,
1360        );
1361
1362        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1363        sui_client_mock.add_gas_object_info(
1364            gas_coin.clone(),
1365            gas_object_ref,
1366            Owner::AddressOwner(sui_address),
1367        );
1368        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1369
1370        // Kick it (send to the execution queue, skipping the signing queue)
1371        execution_tx
1372            .send(CertifiedBridgeActionExecutionWrapper(
1373                action_certificate.clone(),
1374                0,
1375            ))
1376            .await
1377            .unwrap();
1378
1379        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1380        // Nothing is sent to execute, because the token id does not exist yet
1381        assert_eq!(
1382            tx_subscription.try_recv().unwrap_err(),
1383            tokio::sync::broadcast::error::TryRecvError::Empty
1384        );
1385
1386        // Now insert the new token id
1387        id_token_map.insert(new_token_id, new_type_tag);
1388        sui_token_type_tags.store(Arc::new(id_token_map));
1389
1390        // Kick it again
1391        execution_tx
1392            .send(CertifiedBridgeActionExecutionWrapper(
1393                action_certificate.clone(),
1394                0,
1395            ))
1396            .await
1397            .unwrap();
1398
1399        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1400        // The action is sent to execution
1401        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1402    }
1403
1404    fn mock_bridge_authority_sigs(
1405        mocks: Vec<&BridgeRequestMockHandler>,
1406        action: &BridgeAction,
1407        secrets: Vec<&BridgeAuthorityKeyPair>,
1408        sui_tx_digest: TransactionDigest,
1409        sui_tx_event_index: u16,
1410    ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1411        assert_eq!(mocks.len(), secrets.len());
1412        let mut signed_actions = BTreeMap::new();
1413        for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1414            let signed_action = sign_action_with_key(action, secret);
1415            mock.add_sui_event_response(
1416                sui_tx_digest,
1417                sui_tx_event_index,
1418                Ok(signed_action.clone()),
1419                None,
1420            );
1421            signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1422        }
1423        signed_actions
1424    }
1425
1426    fn mock_bridge_authority_signing_errors(
1427        mocks: Vec<&BridgeRequestMockHandler>,
1428        sui_tx_digest: TransactionDigest,
1429        sui_tx_event_index: u16,
1430    ) {
1431        for mock in mocks {
1432            mock.add_sui_event_response(
1433                sui_tx_digest,
1434                sui_tx_event_index,
1435                Err(BridgeError::RestAPIError("small issue".into())),
1436                None,
1437            );
1438        }
1439    }
1440
1441    /// Create a BridgeAction and mock authorities to return signatures
1442    fn get_bridge_authority_approved_action(
1443        mocks: Vec<&BridgeRequestMockHandler>,
1444        secrets: Vec<&BridgeAuthorityKeyPair>,
1445        token_id: Option<u8>,
1446        sui_to_eth: bool,
1447    ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1448        let sui_tx_digest = TransactionDigest::random();
1449        let sui_tx_event_index = 1;
1450        let action = if sui_to_eth {
1451            get_test_sui_to_eth_bridge_action(
1452                Some(sui_tx_digest),
1453                Some(sui_tx_event_index),
1454                None,
1455                None,
1456                None,
1457                None,
1458                token_id,
1459            )
1460        } else {
1461            get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1462        };
1463
1464        let sigs =
1465            mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1466        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1467            action,
1468            BridgeCommitteeValiditySignInfo { signatures: sigs },
1469        );
1470        (
1471            VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1472            sui_tx_digest,
1473            sui_tx_event_index,
1474        )
1475    }
1476
1477    fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1478        let sig = Signature::new_secure(
1479            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1480            dummy_sui_key,
1481        );
1482        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1483        *signed_tx.digest()
1484    }
1485
1486    /// Why is `wildcard` needed? This is because authority signatures
1487    /// are part of transaction data. Depending on whose signatures
1488    /// are included in what order, this may change the tx digest.
1489    fn mock_transaction_response(
1490        sui_client_mock: &SuiMockClient,
1491        tx_digest: TransactionDigest,
1492        status: SuiExecutionStatus,
1493        events: Option<Vec<SuiEvent>>,
1494        wildcard: bool,
1495    ) {
1496        let mut response = SuiTransactionBlockResponse::new(tx_digest);
1497        let effects = SuiTransactionBlockEffects::new_for_testing(tx_digest, status);
1498        if let Some(events) = events {
1499            response.events = Some(SuiTransactionBlockEvents { data: events });
1500        }
1501        response.effects = Some(effects);
1502        if wildcard {
1503            sui_client_mock.set_wildcard_transaction_response(Ok(response));
1504        } else {
1505            sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1506        }
1507    }
1508
1509    fn mock_transaction_error(
1510        sui_client_mock: &SuiMockClient,
1511        tx_digest: TransactionDigest,
1512        error: BridgeError,
1513        wildcard: bool,
1514    ) {
1515        if wildcard {
1516            sui_client_mock.set_wildcard_transaction_response(Err(error));
1517        } else {
1518            sui_client_mock.add_transaction_response(tx_digest, Err(error));
1519        }
1520    }
1521
1522    #[allow(clippy::type_complexity)]
1523    async fn setup() -> (
1524        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1525        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1526        SuiMockClient,
1527        tokio::sync::broadcast::Receiver<TransactionDigest>,
1528        Arc<BridgeOrchestratorTables>,
1529        Vec<BridgeAuthorityKeyPair>,
1530        SuiKeyPair,
1531        BridgeRequestMockHandler,
1532        BridgeRequestMockHandler,
1533        BridgeRequestMockHandler,
1534        BridgeRequestMockHandler,
1535        Vec<tokio::task::JoinHandle<()>>,
1536        ObjectRef,
1537        SuiAddress,
1538        Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1539        tokio::sync::watch::Sender<IsBridgePaused>,
1540    ) {
1541        telemetry_subscribers::init_for_testing();
1542        let registry = Registry::new();
1543        mysten_metrics::init_metrics(&registry);
1544        init_all_struct_tags();
1545
1546        let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1547        let sui_key = SuiKeyPair::from(kp);
1548        let gas_object_ref = random_object_ref();
1549        let temp_dir = tempfile::tempdir().unwrap();
1550        let store = BridgeOrchestratorTables::new(temp_dir.path());
1551        let sui_client_mock = SuiMockClient::default();
1552        let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1553        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1554
1555        // The dummy key is used to sign transaction so we can get TransactionDigest easily.
1556        // User signature is not part of the transaction so it does not matter which key it is.
1557        let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1558        let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1559
1560        let mock0 = BridgeRequestMockHandler::new();
1561        let mock1 = BridgeRequestMockHandler::new();
1562        let mock2 = BridgeRequestMockHandler::new();
1563        let mock3 = BridgeRequestMockHandler::new();
1564
1565        let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1566            vec![2500, 2500, 2500, 2500],
1567            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1568        );
1569
1570        let committee = BridgeCommittee::new(authorities).unwrap();
1571
1572        let agg = Arc::new(ArcSwap::new(Arc::new(
1573            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1574        )));
1575        let metrics = Arc::new(BridgeMetrics::new(&registry));
1576        let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1577        let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1578        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1579        let executor = BridgeActionExecutor::new(
1580            sui_client.clone(),
1581            agg.clone(),
1582            store.clone(),
1583            sui_key,
1584            sui_address,
1585            gas_object_ref.0,
1586            sui_token_type_tags.clone(),
1587            bridge_pause_rx,
1588            metrics,
1589        )
1590        .await;
1591
1592        let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1593        handles.extend(executor_handle);
1594
1595        (
1596            signing_tx,
1597            execution_tx,
1598            sui_client_mock,
1599            tx_subscription,
1600            store,
1601            secrets,
1602            dummy_sui_key,
1603            mock0,
1604            mock1,
1605            mock2,
1606            mock3,
1607            handles,
1608            gas_object_ref,
1609            sui_address,
1610            sui_token_type_tags,
1611            bridge_pause_tx,
1612        )
1613    }
1614}