sui_bridge/
action_executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! BridgeActionExecutor receives BridgeActions (from BridgeOrchestrator),
5//! collects bridge authority signatures and submit signatures on chain.
6
7use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::SuiExecutionStatus;
13use sui_types::TypeTag;
14use sui_types::transaction::ObjectArg;
15use sui_types::{
16    base_types::{ObjectID, ObjectRef, SuiAddress},
17    crypto::{Signature, SuiKeyPair},
18    digests::TransactionDigest,
19    gas_coin::GasCoin,
20    object::Owner,
21    transaction::Transaction,
22};
23
24use crate::events::{
25    TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
26    TokenTransferClaimed,
27};
28use crate::metrics::BridgeMetrics;
29use crate::{
30    client::bridge_authority_aggregator::BridgeAuthorityAggregator,
31    error::BridgeError,
32    storage::BridgeOrchestratorTables,
33    sui_client::{ExecuteTransactionResult, SuiClient, SuiClientInner},
34    sui_transaction_builder::build_sui_transaction,
35    types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
36};
37use std::collections::HashMap;
38use std::sync::Arc;
39use tokio::sync::Semaphore;
40use tokio::time::Duration;
41use tracing::{Instrument, error, info, instrument, warn};
42
43pub const CHANNEL_SIZE: usize = 1000;
44pub const SIGNING_CONCURRENCY: usize = 10;
45
46// delay schedule: at most 16 times including the initial attempt
47// 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s, 25.6s, 51.2s, 102.4s, 204.8s, 409.6s, 819.2s, 1638.4s
48pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
49pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
50
51async fn delay(attempt_times: u64) {
52    let delay_ms = 100 * 2_u64.pow(attempt_times as u32);
53    tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
54}
55
56#[derive(Debug)]
57pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
58
59#[derive(Debug)]
60pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
61
62pub trait BridgeActionExecutorTrait {
63    fn run(
64        self,
65    ) -> (
66        Vec<tokio::task::JoinHandle<()>>,
67        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
68    );
69}
70
71pub struct BridgeActionExecutor<C> {
72    sui_client: Arc<SuiClient<C>>,
73    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
74    key: SuiKeyPair,
75    sui_address: SuiAddress,
76    gas_object_id: ObjectID,
77    store: Arc<BridgeOrchestratorTables>,
78    bridge_object_arg: ObjectArg,
79    sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
80    bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
81    metrics: Arc<BridgeMetrics>,
82}
83
84impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
85where
86    C: SuiClientInner + 'static,
87{
88    fn run(
89        self,
90    ) -> (
91        Vec<tokio::task::JoinHandle<()>>,
92        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
93    ) {
94        let (tasks, sender, _) = self.run_inner();
95        (tasks, sender)
96    }
97}
98
99impl<C> BridgeActionExecutor<C>
100where
101    C: SuiClientInner + 'static,
102{
103    pub async fn new(
104        sui_client: Arc<SuiClient<C>>,
105        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
106        store: Arc<BridgeOrchestratorTables>,
107        key: SuiKeyPair,
108        sui_address: SuiAddress,
109        gas_object_id: ObjectID,
110        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
111        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
112        metrics: Arc<BridgeMetrics>,
113    ) -> Self {
114        let bridge_object_arg = sui_client
115            .get_mutable_bridge_object_arg_must_succeed()
116            .await;
117        Self {
118            sui_client,
119            bridge_auth_agg,
120            store,
121            key,
122            gas_object_id,
123            sui_address,
124            bridge_object_arg,
125            sui_token_type_tags,
126            bridge_pause_rx,
127            metrics,
128        }
129    }
130
131    fn run_inner(
132        self,
133    ) -> (
134        Vec<tokio::task::JoinHandle<()>>,
135        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
136        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
137    ) {
138        let key = self.key;
139
140        let (sender, receiver) = mysten_metrics::metered_channel::channel(
141            CHANNEL_SIZE,
142            &mysten_metrics::get_metrics()
143                .unwrap()
144                .channel_inflight
145                .with_label_values(&["executor_signing_queue"]),
146        );
147
148        let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
149            CHANNEL_SIZE,
150            &mysten_metrics::get_metrics()
151                .unwrap()
152                .channel_inflight
153                .with_label_values(&["executor_execution_queue"]),
154        );
155        let execution_tx_clone = execution_tx.clone();
156        let sender_clone = sender.clone();
157        let store_clone = self.store.clone();
158        let client_clone = self.sui_client.clone();
159        let mut tasks = vec![];
160        let metrics = self.metrics.clone();
161        tasks.push(spawn_logged_monitored_task!(
162            Self::run_signature_aggregation_loop(
163                client_clone,
164                self.bridge_auth_agg,
165                store_clone,
166                sender_clone,
167                receiver,
168                execution_tx_clone,
169                metrics,
170            )
171        ));
172
173        let metrics = self.metrics.clone();
174        let execution_tx_clone = execution_tx.clone();
175        tasks.push(spawn_logged_monitored_task!(
176            Self::run_onchain_execution_loop(
177                self.sui_client.clone(),
178                key,
179                self.sui_address,
180                self.gas_object_id,
181                self.store.clone(),
182                execution_tx_clone,
183                execution_rx,
184                self.bridge_object_arg,
185                self.sui_token_type_tags,
186                self.bridge_pause_rx,
187                metrics,
188            )
189        ));
190        (tasks, sender, execution_tx)
191    }
192
193    async fn run_signature_aggregation_loop(
194        sui_client: Arc<SuiClient<C>>,
195        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
196        store: Arc<BridgeOrchestratorTables>,
197        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
198        mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
199            BridgeActionExecutionWrapper,
200        >,
201        execution_queue_sender: mysten_metrics::metered_channel::Sender<
202            CertifiedBridgeActionExecutionWrapper,
203        >,
204        metrics: Arc<BridgeMetrics>,
205    ) {
206        info!("Starting run_signature_aggregation_loop");
207        let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
208        while let Some(action) = signing_queue_receiver.recv().await {
209            Self::handle_signing_task(
210                &semaphore,
211                &auth_agg,
212                &signing_queue_sender,
213                &execution_queue_sender,
214                &sui_client,
215                &store,
216                action,
217                &metrics,
218            )
219            .await;
220        }
221    }
222
223    async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
224        let Ok(Ok(is_paused)) =
225            retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
226        else {
227            error!("Failed to get bridge status after retry");
228            return false;
229        };
230        !is_paused
231    }
232
233    #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
234    async fn handle_signing_task(
235        semaphore: &Arc<Semaphore>,
236        auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
237        signing_queue_sender: &mysten_metrics::metered_channel::Sender<
238            BridgeActionExecutionWrapper,
239        >,
240        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
241            CertifiedBridgeActionExecutionWrapper,
242        >,
243        sui_client: &Arc<SuiClient<C>>,
244        store: &Arc<BridgeOrchestratorTables>,
245        action: BridgeActionExecutionWrapper,
246        metrics: &Arc<BridgeMetrics>,
247    ) {
248        metrics.action_executor_signing_queue_received_actions.inc();
249        let action_key = action.0.key();
250        info!("Received action for signing: {:?}", action.0);
251
252        // TODO: this is a temporary fix to avoid signing when the bridge is paused.
253        // but the way is implemented is not ideal:
254        // 1. it should check the direction
255        // 2. should use a better mechanism to check the bridge status instead of polling for each action
256        let should_proceed = Self::should_proceed_signing(sui_client).await;
257        if !should_proceed {
258            metrics.action_executor_signing_queue_skipped_actions.inc();
259            warn!("skipping signing task: {:?}", action_key);
260            return;
261        }
262
263        let auth_agg_clone = auth_agg.clone();
264        let signing_queue_sender_clone = signing_queue_sender.clone();
265        let execution_queue_sender_clone = execution_queue_sender.clone();
266        let sui_client_clone = sui_client.clone();
267        let store_clone = store.clone();
268        let metrics_clone = metrics.clone();
269        let semaphore_clone = semaphore.clone();
270        spawn_logged_monitored_task!(
271            Self::request_signatures(
272                semaphore_clone,
273                sui_client_clone,
274                auth_agg_clone,
275                action,
276                store_clone,
277                signing_queue_sender_clone,
278                execution_queue_sender_clone,
279                metrics_clone,
280            )
281            .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
282            "request_signatures"
283        );
284    }
285
286    // Checks if the action is already processed on chain.
287    // If yes, skip this action and remove it from the pending log.
288    // Returns true if the action is already processed.
289    async fn handle_already_processed_token_transfer_action_maybe(
290        sui_client: &Arc<SuiClient<C>>,
291        action: &BridgeAction,
292        store: &Arc<BridgeOrchestratorTables>,
293        metrics: &Arc<BridgeMetrics>,
294    ) -> bool {
295        let status = sui_client
296            .get_token_transfer_action_onchain_status_until_success(
297                action.chain_id() as u8,
298                action.seq_number(),
299            )
300            .await;
301        match status {
302            BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
303                info!(
304                    "Action already approved or claimed, removing action from pending logs: {:?}",
305                    action
306                );
307                metrics.action_executor_already_processed_actions.inc();
308                store
309                    .remove_pending_actions(&[action.digest()])
310                    .unwrap_or_else(|e| {
311                        panic!("Write to DB should not fail: {:?}", e);
312                    });
313                true
314            }
315            // Although theoretically a legit SuiToEthBridgeAction should not have
316            // status `NotFound`
317            BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
318        }
319    }
320
321    // TODO: introduce a way to properly stagger the handling
322    // for various validators.
323    async fn request_signatures(
324        semaphore: Arc<Semaphore>,
325        sui_client: Arc<SuiClient<C>>,
326        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
327        action: BridgeActionExecutionWrapper,
328        store: Arc<BridgeOrchestratorTables>,
329        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
330        execution_queue_sender: mysten_metrics::metered_channel::Sender<
331            CertifiedBridgeActionExecutionWrapper,
332        >,
333        metrics: Arc<BridgeMetrics>,
334    ) {
335        let _permit = semaphore
336            .acquire()
337            .await
338            .expect("semaphore should not be closed");
339        info!("requesting signatures");
340        let BridgeActionExecutionWrapper(action, attempt_times) = action;
341
342        // Only token transfer action should reach here
343        match &action {
344            BridgeAction::SuiToEthBridgeAction(_)
345            | BridgeAction::SuiToEthTokenTransfer(_)
346            | BridgeAction::SuiToEthTokenTransferV2(_)
347            | BridgeAction::EthToSuiBridgeAction(_)
348            | BridgeAction::EthToSuiTokenTransferV2(_) => (),
349            _ => unreachable!("Non token transfer action should not reach here"),
350        };
351
352        // If the action is already processed, skip it.
353        if Self::handle_already_processed_token_transfer_action_maybe(
354            &sui_client,
355            &action,
356            &store,
357            &metrics,
358        )
359        .await
360        {
361            return;
362        }
363        match auth_agg
364            .load()
365            .request_committee_signatures(action.clone())
366            .await
367        {
368            Ok(certificate) => {
369                info!("Sending certificate to execution");
370                execution_queue_sender
371                    .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
372                    .await
373                    .unwrap_or_else(|e| {
374                        panic!("Sending to execution queue should not fail: {:?}", e);
375                    });
376            }
377            Err(e) => {
378                warn!("Failed to collect sigs for bridge action: {:?}", e);
379                metrics.err_signature_aggregation.inc();
380
381                // TODO: spawn a task for this
382                if attempt_times >= MAX_SIGNING_ATTEMPTS {
383                    metrics.err_signature_aggregation_too_many_failures.inc();
384                    error!(
385                        "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
386                        e
387                    );
388                    return;
389                }
390                delay(attempt_times).await;
391                signing_queue_sender
392                    .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
393                    .await
394                    .unwrap_or_else(|e| {
395                        panic!("Sending to signing queue should not fail: {:?}", e);
396                    });
397            }
398        }
399    }
400
401    // Before calling this function, `key` and `sui_address` need to be
402    // verified to match.
403    async fn run_onchain_execution_loop(
404        sui_client: Arc<SuiClient<C>>,
405        sui_key: SuiKeyPair,
406        sui_address: SuiAddress,
407        gas_object_id: ObjectID,
408        store: Arc<BridgeOrchestratorTables>,
409        execution_queue_sender: mysten_metrics::metered_channel::Sender<
410            CertifiedBridgeActionExecutionWrapper,
411        >,
412        mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
413            CertifiedBridgeActionExecutionWrapper,
414        >,
415        bridge_object_arg: ObjectArg,
416        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
417        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
418        metrics: Arc<BridgeMetrics>,
419    ) {
420        info!("Starting run_onchain_execution_loop");
421        while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
422            // When bridge is paused, skip execution.
423            // Skipped actions will be picked up upon node restarting
424            // if bridge is unpaused.
425            if *bridge_pause_rx.borrow() {
426                warn!("Bridge is paused, skipping execution");
427                metrics
428                    .action_executor_execution_queue_skipped_actions_due_to_pausing
429                    .inc();
430                continue;
431            }
432            Self::handle_execution_task(
433                certificate_wrapper,
434                &sui_client,
435                &sui_key,
436                &sui_address,
437                gas_object_id,
438                &store,
439                &execution_queue_sender,
440                &bridge_object_arg,
441                &sui_token_type_tags,
442                &metrics,
443            )
444            .await;
445        }
446        panic!("Execution queue closed unexpectedly");
447    }
448
449    #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
450    async fn handle_execution_task(
451        certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
452        sui_client: &Arc<SuiClient<C>>,
453        sui_key: &SuiKeyPair,
454        sui_address: &SuiAddress,
455        gas_object_id: ObjectID,
456        store: &Arc<BridgeOrchestratorTables>,
457        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
458            CertifiedBridgeActionExecutionWrapper,
459        >,
460        bridge_object_arg: &ObjectArg,
461        sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
462        metrics: &Arc<BridgeMetrics>,
463    ) {
464        metrics
465            .action_executor_execution_queue_received_actions
466            .inc();
467        let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
468        let action = certificate.data();
469        let action_key = action.key();
470
471        info!("Received certified action for execution: {:?}", action);
472
473        // TODO check gas coin balance here. If gas balance too low, do not proceed.
474        let (gas_coin, gas_object_ref) =
475            Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
476        metrics.gas_coin_balance.set(gas_coin.value() as i64);
477
478        let ceriticate_clone = certificate.clone();
479
480        // Check once: if the action is already processed, skip it.
481        if Self::handle_already_processed_token_transfer_action_maybe(
482            sui_client, action, store, metrics,
483        )
484        .await
485        {
486            info!("Action already processed, skipping");
487            return;
488        }
489
490        info!("Building Sui transaction");
491        let rgp = sui_client.get_reference_gas_price_until_success().await;
492        let tx_data = match build_sui_transaction(
493            *sui_address,
494            &gas_object_ref,
495            ceriticate_clone.clone(),
496            *bridge_object_arg,
497            sui_token_type_tags.load().as_ref(),
498            rgp,
499        ) {
500            Ok(tx_data) => tx_data,
501            Err(BridgeError::UnknownTokenId(token_id)) => {
502                // Token not found in local cache - it might be newly registered.
503                // Refresh token map from chain and retry.
504                info!(
505                    "Unknown token_id {}, refreshing token map from chain and retrying",
506                    token_id
507                );
508                match sui_client.get_token_id_map().await {
509                    Ok(new_token_map) => {
510                        sui_token_type_tags.store(Arc::new(new_token_map));
511                        // Retry building transaction with refreshed token map
512                        match build_sui_transaction(
513                            *sui_address,
514                            &gas_object_ref,
515                            ceriticate_clone,
516                            *bridge_object_arg,
517                            sui_token_type_tags.load().as_ref(),
518                            rgp,
519                        ) {
520                            Ok(tx_data) => tx_data,
521                            Err(err) => {
522                                metrics.err_build_sui_transaction.inc();
523                                error!(
524                                    "Manual intervention is required. Failed to build transaction after token map refresh for action {:?}: {:?}",
525                                    action, err
526                                );
527                                return;
528                            }
529                        }
530                    }
531                    Err(e) => {
532                        metrics.err_build_sui_transaction.inc();
533                        error!(
534                            "Manual intervention is required. Failed to refresh token map: {:?}",
535                            e
536                        );
537                        return;
538                    }
539                }
540            }
541            Err(err) => {
542                metrics.err_build_sui_transaction.inc();
543                error!(
544                    "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
545                    action, err
546                );
547                // This should not happen, but in case it does, we do not want to
548                // panic, instead we log here for manual intervention.
549                return;
550            }
551        };
552        let sig = Signature::new_secure(
553            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
554            sui_key,
555        );
556        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
557        let tx_digest = *signed_tx.digest();
558
559        // Check twice: If the action is already processed, skip it.
560        if Self::handle_already_processed_token_transfer_action_maybe(
561            sui_client, action, store, metrics,
562        )
563        .await
564        {
565            info!("Action already processed, skipping");
566            return;
567        }
568
569        info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
570        match sui_client
571            .execute_transaction_block_with_effects(signed_tx)
572            .await
573        {
574            Ok(resp) => {
575                Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
576            }
577
578            // If the transaction did not go through, retry up to a certain times.
579            Err(err) => {
580                error!(
581                    ?action_key,
582                    ?tx_digest,
583                    "Sui transaction failed at signing: {err:?}"
584                );
585                metrics.err_sui_transaction_submission.inc();
586                let metrics_clone = metrics.clone();
587                // Do this in a separate task so we won't deadlock here
588                let sender_clone = execution_queue_sender.clone();
589                spawn_logged_monitored_task!(async move {
590                    // If it fails for too many times, log and ask for manual intervention.
591                    if attempt_times >= MAX_EXECUTION_ATTEMPTS {
592                        metrics_clone
593                            .err_sui_transaction_submission_too_many_failures
594                            .inc();
595                        error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
596                        return;
597                    }
598                    delay(attempt_times).await;
599                    sender_clone
600                        .send(CertifiedBridgeActionExecutionWrapper(
601                            certificate,
602                            attempt_times + 1,
603                        ))
604                        .await
605                        .unwrap_or_else(|e| {
606                            panic!("Sending to execution queue should not fail: {:?}", e);
607                        });
608                    info!("Re-enqueued certificate for execution");
609                }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
610            }
611        }
612    }
613
614    // TODO: do we need a mechanism to periodically read pending actions from DB?
615    async fn handle_execution_effects(
616        tx_digest: TransactionDigest,
617        response: ExecuteTransactionResult,
618        store: &Arc<BridgeOrchestratorTables>,
619        action: &BridgeAction,
620        metrics: &Arc<BridgeMetrics>,
621    ) {
622        match &response.status {
623            SuiExecutionStatus::Success => {
624                let relevant_events = response
625                    .events
626                    .iter()
627                    .filter(|e| {
628                        e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
629                            || e.type_ == *TokenTransferClaimed.get().unwrap()
630                            || e.type_ == *TokenTransferApproved.get().unwrap()
631                            || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
632                    })
633                    .collect::<Vec<_>>();
634                assert!(
635                    !relevant_events.is_empty(),
636                    "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
637                    or TokenTransferAlreadyApproved event but got: {:?}",
638                    response.events
639                );
640                info!(?tx_digest, "Sui transaction executed successfully");
641                // track successful approval and claim events
642                relevant_events.iter().for_each(|e| {
643                    if e.type_ == *TokenTransferClaimed.get().unwrap() {
644                        match action {
645                            BridgeAction::EthToSuiBridgeAction(_)
646                            | BridgeAction::EthToSuiTokenTransferV2(_) => {
647                                metrics.eth_sui_token_transfer_claimed.inc();
648                            }
649                            BridgeAction::SuiToEthBridgeAction(_)
650                            | BridgeAction::SuiToEthTokenTransfer(_)
651                            | BridgeAction::SuiToEthTokenTransferV2(_) => {
652                                metrics.sui_eth_token_transfer_claimed.inc();
653                            }
654                            _ => error!("Unexpected action type for claimed event: {:?}", action),
655                        }
656                    } else if e.type_ == *TokenTransferApproved.get().unwrap() {
657                        match action {
658                            BridgeAction::EthToSuiBridgeAction(_)
659                            | BridgeAction::EthToSuiTokenTransferV2(_) => {
660                                metrics.eth_sui_token_transfer_approved.inc();
661                            }
662                            BridgeAction::SuiToEthBridgeAction(_)
663                            | BridgeAction::SuiToEthTokenTransfer(_)
664                            | BridgeAction::SuiToEthTokenTransferV2(_) => {
665                                metrics.sui_eth_token_transfer_approved.inc();
666                            }
667                            _ => error!("Unexpected action type for approved event: {:?}", action),
668                        }
669                    }
670                });
671                store
672                    .remove_pending_actions(&[action.digest()])
673                    .unwrap_or_else(|e| {
674                        panic!("Write to DB should not fail: {:?}", e);
675                    })
676            }
677            SuiExecutionStatus::Failure { error } => {
678                // In practice the transaction could fail because of running out of gas, but really
679                // should not be due to other reasons.
680                // This means manual intervention is needed. So we do not push them back to
681                // the execution queue because retries are mostly likely going to fail anyway.
682                // After human examination, the node should be restarted and fetch them from WAL.
683
684                metrics.err_sui_transaction_execution.inc();
685                error!(
686                    ?tx_digest,
687                    "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
688                );
689            }
690        }
691    }
692
693    /// Panics if the gas object is not owned by the address.
694    async fn get_gas_data_assert_ownership(
695        sui_address: SuiAddress,
696        gas_object_id: ObjectID,
697        sui_client: &SuiClient<C>,
698    ) -> (GasCoin, ObjectRef) {
699        let (gas_coin, gas_obj_ref, owner) = sui_client
700            .get_gas_data_panic_if_not_gas(gas_object_id)
701            .await;
702
703        // TODO: when we add multiple gas support in the future we could discard
704        // transferred gas object instead.
705        assert_eq!(
706            owner,
707            Owner::AddressOwner(sui_address),
708            "Gas object {:?} is no longer owned by address {}",
709            gas_object_id,
710            sui_address
711        );
712        (gas_coin, gas_obj_ref)
713    }
714}
715
716pub async fn submit_to_executor(
717    tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
718    action: BridgeAction,
719) -> Result<(), BridgeError> {
720    tx.send(BridgeActionExecutionWrapper(action, 0))
721        .await
722        .map_err(|e| BridgeError::Generic(e.to_string()))
723}
724
725#[cfg(test)]
726mod tests {
727    use crate::events::init_all_struct_tags;
728    use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
729    use crate::types::BRIDGE_PAUSED;
730    use fastcrypto::traits::KeyPair;
731    use prometheus::Registry;
732    use std::collections::{BTreeMap, HashMap};
733    use std::str::FromStr;
734    use sui_json_rpc_types::SuiEvent;
735    use sui_types::TypeTag;
736    use sui_types::crypto::get_key_pair;
737    use sui_types::gas_coin::GasCoin;
738    use sui_types::{base_types::random_object_ref, transaction::TransactionData};
739
740    use crate::{
741        crypto::{
742            BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
743            BridgeAuthorityRecoverableSignature,
744        },
745        server::mock_handler::BridgeRequestMockHandler,
746        sui_mock_client::SuiMockClient,
747        test_utils::{
748            get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
749            get_test_sui_to_eth_bridge_action, sign_action_with_key,
750        },
751        types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
752    };
753
754    use super::*;
755
756    #[tokio::test]
757    async fn test_onchain_execution_loop() {
758        let (
759            signing_tx,
760            _execution_tx,
761            sui_client_mock,
762            mut tx_subscription,
763            store,
764            secrets,
765            dummy_sui_key,
766            mock0,
767            mock1,
768            mock2,
769            mock3,
770            _handles,
771            gas_object_ref,
772            sui_address,
773            sui_token_type_tags,
774            _bridge_pause_tx,
775        ) = setup().await;
776        let (action_certificate, _, _) = get_bridge_authority_approved_action(
777            vec![&mock0, &mock1, &mock2, &mock3],
778            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
779            None,
780            true,
781        );
782        let action = action_certificate.data().clone();
783        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
784        let tx_data = build_sui_transaction(
785            sui_address,
786            &gas_object_ref,
787            action_certificate,
788            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
789            &id_token_map,
790            1000,
791        )
792        .unwrap();
793
794        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
795
796        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
797        sui_client_mock.add_gas_object_info(
798            gas_coin.clone(),
799            gas_object_ref,
800            Owner::AddressOwner(sui_address),
801        );
802
803        // Mock the transaction to be successfully executed
804        let mut event = SuiEvent::random_for_testing();
805        event.type_ = TokenTransferClaimed.get().unwrap().clone();
806        let events = vec![event];
807        mock_transaction_response(
808            &sui_client_mock,
809            tx_digest,
810            SuiExecutionStatus::Success,
811            Some(events),
812            true,
813        );
814
815        store
816            .insert_pending_actions(std::slice::from_ref(&action))
817            .unwrap();
818        assert_eq!(
819            store.get_all_pending_actions()[&action.digest()],
820            action.clone()
821        );
822
823        // Kick it
824        submit_to_executor(&signing_tx, action.clone())
825            .await
826            .unwrap();
827
828        // Expect to see the transaction to be requested and successfully executed hence removed from WAL
829        tx_subscription.recv().await.unwrap();
830        assert!(store.get_all_pending_actions().is_empty());
831
832        /////////////////////////////////////////////////////////////////////////////////////////////////
833        ////////////////////////////////////// Test execution failure ///////////////////////////////////
834        /////////////////////////////////////////////////////////////////////////////////////////////////
835
836        let (action_certificate, _, _) = get_bridge_authority_approved_action(
837            vec![&mock0, &mock1, &mock2, &mock3],
838            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
839            None,
840            true,
841        );
842
843        let action = action_certificate.data().clone();
844
845        let tx_data = build_sui_transaction(
846            sui_address,
847            &gas_object_ref,
848            action_certificate,
849            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
850            &id_token_map,
851            1000,
852        )
853        .unwrap();
854        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
855
856        // Mock the transaction to fail
857        mock_transaction_response(
858            &sui_client_mock,
859            tx_digest,
860            SuiExecutionStatus::Failure {
861                error: "failure is mother of success".to_string(),
862            },
863            None,
864            true,
865        );
866
867        store
868            .insert_pending_actions(std::slice::from_ref(&action))
869            .unwrap();
870        assert_eq!(
871            store.get_all_pending_actions()[&action.digest()],
872            action.clone()
873        );
874
875        // Kick it
876        submit_to_executor(&signing_tx, action.clone())
877            .await
878            .unwrap();
879
880        // Expect to see the transaction to be requested and but failed
881        tx_subscription.recv().await.unwrap();
882        // The action is not removed from WAL because the transaction failed
883        assert_eq!(
884            store.get_all_pending_actions()[&action.digest()],
885            action.clone()
886        );
887
888        /////////////////////////////////////////////////////////////////////////////////////////////////
889        //////////////////////////// Test transaction failed at signing stage ///////////////////////////
890        /////////////////////////////////////////////////////////////////////////////////////////////////
891
892        let (action_certificate, _, _) = get_bridge_authority_approved_action(
893            vec![&mock0, &mock1, &mock2, &mock3],
894            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
895            None,
896            true,
897        );
898
899        let action = action_certificate.data().clone();
900
901        let tx_data = build_sui_transaction(
902            sui_address,
903            &gas_object_ref,
904            action_certificate,
905            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
906            &id_token_map,
907            1000,
908        )
909        .unwrap();
910        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
911        mock_transaction_error(
912            &sui_client_mock,
913            tx_digest,
914            BridgeError::Generic("some random error".to_string()),
915            true,
916        );
917
918        store
919            .insert_pending_actions(std::slice::from_ref(&action))
920            .unwrap();
921        assert_eq!(
922            store.get_all_pending_actions()[&action.digest()],
923            action.clone()
924        );
925
926        // Kick it
927        submit_to_executor(&signing_tx, action.clone())
928            .await
929            .unwrap();
930
931        // Failure will trigger retry, we wait for 2 requests before checking WAL log
932        let tx_digest = tx_subscription.recv().await.unwrap();
933        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
934
935        // The retry is still going on, action still in WAL
936        assert!(
937            store
938                .get_all_pending_actions()
939                .contains_key(&action.digest())
940        );
941
942        // Now let it succeed
943        let mut event = SuiEvent::random_for_testing();
944        event.type_ = TokenTransferClaimed.get().unwrap().clone();
945        let events = vec![event];
946        mock_transaction_response(
947            &sui_client_mock,
948            tx_digest,
949            SuiExecutionStatus::Success,
950            Some(events),
951            true,
952        );
953
954        // Give it 1 second to retry and succeed
955        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
956        // The action is successful and should be removed from WAL now
957        assert!(
958            !store
959                .get_all_pending_actions()
960                .contains_key(&action.digest())
961        );
962    }
963
964    #[tokio::test]
965    async fn test_signature_aggregation_loop() {
966        let (
967            signing_tx,
968            _execution_tx,
969            sui_client_mock,
970            mut tx_subscription,
971            store,
972            secrets,
973            dummy_sui_key,
974            mock0,
975            mock1,
976            mock2,
977            mock3,
978            _handles,
979            gas_object_ref,
980            sui_address,
981            sui_token_type_tags,
982            _bridge_pause_tx,
983        ) = setup().await;
984        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
985        let (action_certificate, sui_tx_digest, sui_tx_event_index) =
986            get_bridge_authority_approved_action(
987                vec![&mock0, &mock1, &mock2, &mock3],
988                vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
989                None,
990                true,
991            );
992        let action = action_certificate.data().clone();
993        mock_bridge_authority_signing_errors(
994            vec![&mock0, &mock1, &mock2],
995            sui_tx_digest,
996            sui_tx_event_index,
997        );
998        let mut sigs = mock_bridge_authority_sigs(
999            vec![&mock3],
1000            &action,
1001            vec![&secrets[3]],
1002            sui_tx_digest,
1003            sui_tx_event_index,
1004        );
1005
1006        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1007        sui_client_mock.add_gas_object_info(
1008            gas_coin,
1009            gas_object_ref,
1010            Owner::AddressOwner(sui_address),
1011        );
1012        store
1013            .insert_pending_actions(std::slice::from_ref(&action))
1014            .unwrap();
1015        assert_eq!(
1016            store.get_all_pending_actions()[&action.digest()],
1017            action.clone()
1018        );
1019
1020        // Kick it
1021        submit_to_executor(&signing_tx, action.clone())
1022            .await
1023            .unwrap();
1024
1025        // Wait until the transaction is retried at least once (instead of deing dropped)
1026        loop {
1027            let requested_times =
1028                mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
1029            if requested_times >= 2 {
1030                break;
1031            }
1032            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1033        }
1034        // Nothing is sent to execute yet
1035        assert_eq!(
1036            tx_subscription.try_recv().unwrap_err(),
1037            tokio::sync::broadcast::error::TryRecvError::Empty
1038        );
1039        // Still in WAL
1040        assert_eq!(
1041            store.get_all_pending_actions()[&action.digest()],
1042            action.clone()
1043        );
1044
1045        // Let authorities sign the action too. Now we are above the threshold
1046        let sig_from_2 = mock_bridge_authority_sigs(
1047            vec![&mock2],
1048            &action,
1049            vec![&secrets[2]],
1050            sui_tx_digest,
1051            sui_tx_event_index,
1052        );
1053        sigs.extend(sig_from_2);
1054        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1055            action.clone(),
1056            BridgeCommitteeValiditySignInfo { signatures: sigs },
1057        );
1058        let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1059        let tx_data = build_sui_transaction(
1060            sui_address,
1061            &gas_object_ref,
1062            action_certificate,
1063            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1064            &id_token_map,
1065            1000,
1066        )
1067        .unwrap();
1068        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1069
1070        let mut event = SuiEvent::random_for_testing();
1071        event.type_ = TokenTransferClaimed.get().unwrap().clone();
1072        let events = vec![event];
1073        mock_transaction_response(
1074            &sui_client_mock,
1075            tx_digest,
1076            SuiExecutionStatus::Success,
1077            Some(events),
1078            true,
1079        );
1080
1081        // Expect to see the transaction to be requested and succeed
1082        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1083        // The action is removed from WAL
1084        assert!(
1085            !store
1086                .get_all_pending_actions()
1087                .contains_key(&action.digest())
1088        );
1089    }
1090
1091    #[tokio::test]
1092    async fn test_skip_request_signature_if_already_processed_on_chain() {
1093        let (
1094            signing_tx,
1095            _execution_tx,
1096            sui_client_mock,
1097            mut tx_subscription,
1098            store,
1099            _secrets,
1100            _dummy_sui_key,
1101            mock0,
1102            mock1,
1103            mock2,
1104            mock3,
1105            _handles,
1106            _gas_object_ref,
1107            _sui_address,
1108            _sui_token_type_tags,
1109            _bridge_pause_tx,
1110        ) = setup().await;
1111
1112        let sui_tx_digest = TransactionDigest::random();
1113        let sui_tx_event_index = 0;
1114        let action = get_test_sui_to_eth_bridge_action(
1115            Some(sui_tx_digest),
1116            Some(sui_tx_event_index),
1117            None,
1118            None,
1119            None,
1120            None,
1121            None,
1122        );
1123        mock_bridge_authority_signing_errors(
1124            vec![&mock0, &mock1, &mock2, &mock3],
1125            sui_tx_digest,
1126            sui_tx_event_index,
1127        );
1128        store
1129            .insert_pending_actions(std::slice::from_ref(&action))
1130            .unwrap();
1131        assert_eq!(
1132            store.get_all_pending_actions()[&action.digest()],
1133            action.clone()
1134        );
1135
1136        // Kick it
1137        submit_to_executor(&signing_tx, action.clone())
1138            .await
1139            .unwrap();
1140        let action_digest = action.digest();
1141
1142        // Wait for 1 second. It should still in the process of retrying requesting sigs becaues we mock errors above.
1143        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1144        tx_subscription.try_recv().unwrap_err();
1145        // And the action is still in WAL
1146        assert!(store.get_all_pending_actions().contains_key(&action_digest));
1147
1148        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1149
1150        // The next retry will see the action is already processed on chain and remove it from WAL
1151        let now = std::time::Instant::now();
1152        while store.get_all_pending_actions().contains_key(&action_digest) {
1153            if now.elapsed().as_secs() > 10 {
1154                panic!("Timeout waiting for action to be removed from WAL");
1155            }
1156            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1157        }
1158        tx_subscription.try_recv().unwrap_err();
1159    }
1160
1161    #[tokio::test]
1162    async fn test_skip_tx_submission_if_already_processed_on_chain() {
1163        let (
1164            _signing_tx,
1165            execution_tx,
1166            sui_client_mock,
1167            mut tx_subscription,
1168            store,
1169            secrets,
1170            dummy_sui_key,
1171            mock0,
1172            mock1,
1173            mock2,
1174            mock3,
1175            _handles,
1176            gas_object_ref,
1177            sui_address,
1178            sui_token_type_tags,
1179            _bridge_pause_tx,
1180        ) = setup().await;
1181        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1182        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1183            vec![&mock0, &mock1, &mock2, &mock3],
1184            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1185            None,
1186            true,
1187        );
1188
1189        let action = action_certificate.data().clone();
1190        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1191        let tx_data = build_sui_transaction(
1192            sui_address,
1193            &gas_object_ref,
1194            action_certificate.clone(),
1195            arg,
1196            &id_token_map,
1197            1000,
1198        )
1199        .unwrap();
1200        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1201        mock_transaction_error(
1202            &sui_client_mock,
1203            tx_digest,
1204            BridgeError::Generic("some random error".to_string()),
1205            true,
1206        );
1207
1208        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1209        sui_client_mock.add_gas_object_info(
1210            gas_coin.clone(),
1211            gas_object_ref,
1212            Owner::AddressOwner(sui_address),
1213        );
1214
1215        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1216
1217        store
1218            .insert_pending_actions(std::slice::from_ref(&action))
1219            .unwrap();
1220        assert_eq!(
1221            store.get_all_pending_actions()[&action.digest()],
1222            action.clone()
1223        );
1224
1225        // Kick it (send to the execution queue, skipping the signing queue)
1226        execution_tx
1227            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1228            .await
1229            .unwrap();
1230
1231        // Some requests come in and will fail.
1232        tx_subscription.recv().await.unwrap();
1233
1234        // Set the action to be already approved on chain
1235        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1236
1237        // The next retry will see the action is already processed on chain and remove it from WAL
1238        let now = std::time::Instant::now();
1239        let action_digest = action.digest();
1240        while store.get_all_pending_actions().contains_key(&action_digest) {
1241            if now.elapsed().as_secs() > 10 {
1242                panic!("Timeout waiting for action to be removed from WAL");
1243            }
1244            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1245        }
1246    }
1247
1248    #[tokio::test]
1249    async fn test_skip_tx_submission_if_bridge_is_paused() {
1250        let (
1251            _signing_tx,
1252            execution_tx,
1253            sui_client_mock,
1254            mut tx_subscription,
1255            store,
1256            secrets,
1257            dummy_sui_key,
1258            mock0,
1259            mock1,
1260            mock2,
1261            mock3,
1262            _handles,
1263            gas_object_ref,
1264            sui_address,
1265            sui_token_type_tags,
1266            bridge_pause_tx,
1267        ) = setup().await;
1268        let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1269        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1270            vec![&mock0, &mock1, &mock2, &mock3],
1271            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1272            None,
1273            true,
1274        );
1275
1276        let action = action_certificate.data().clone();
1277        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1278        let tx_data = build_sui_transaction(
1279            sui_address,
1280            &gas_object_ref,
1281            action_certificate.clone(),
1282            arg,
1283            &id_token_map,
1284            1000,
1285        )
1286        .unwrap();
1287        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1288        mock_transaction_error(
1289            &sui_client_mock,
1290            tx_digest,
1291            BridgeError::Generic("some random error".to_string()),
1292            true,
1293        );
1294
1295        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1296        sui_client_mock.add_gas_object_info(
1297            gas_coin.clone(),
1298            gas_object_ref,
1299            Owner::AddressOwner(sui_address),
1300        );
1301        let action_digest = action.digest();
1302        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1303
1304        // assert bridge is unpaused now
1305        assert!(!*bridge_pause_tx.borrow());
1306
1307        store
1308            .insert_pending_actions(std::slice::from_ref(&action))
1309            .unwrap();
1310        assert_eq!(
1311            store.get_all_pending_actions()[&action.digest()],
1312            action.clone()
1313        );
1314
1315        // Kick it (send to the execution queue, skipping the signing queue)
1316        execution_tx
1317            .send(CertifiedBridgeActionExecutionWrapper(
1318                action_certificate.clone(),
1319                0,
1320            ))
1321            .await
1322            .unwrap();
1323
1324        // Some requests come in
1325        tx_subscription.recv().await.unwrap();
1326
1327        // Pause the bridge
1328        bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1329
1330        // Kick it again
1331        execution_tx
1332            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1333            .await
1334            .unwrap();
1335
1336        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1337        // Nothing is sent to execute
1338        assert_eq!(
1339            tx_subscription.try_recv().unwrap_err(),
1340            tokio::sync::broadcast::error::TryRecvError::Empty
1341        );
1342        // Still in WAL
1343        assert_eq!(
1344            store.get_all_pending_actions()[&action_digest],
1345            action.clone()
1346        );
1347    }
1348
1349    #[tokio::test]
1350    async fn test_action_executor_handle_new_token() {
1351        let new_token_id = 255u8; // token id that does not exist
1352        let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1353        let (
1354            _signing_tx,
1355            execution_tx,
1356            sui_client_mock,
1357            mut tx_subscription,
1358            _store,
1359            secrets,
1360            dummy_sui_key,
1361            mock0,
1362            mock1,
1363            mock2,
1364            mock3,
1365            _handles,
1366            gas_object_ref,
1367            sui_address,
1368            sui_token_type_tags,
1369            _bridge_pause_tx,
1370        ) = setup().await;
1371        let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1372        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1373            vec![&mock0, &mock1, &mock2, &mock3],
1374            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1375            Some(new_token_id),
1376            false, // we need an eth -> sui action that entails the new token type tag in transaction building
1377        );
1378
1379        let action = action_certificate.data().clone();
1380        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1381        let tx_data = build_sui_transaction(
1382            sui_address,
1383            &gas_object_ref,
1384            action_certificate.clone(),
1385            arg,
1386            &maplit::hashmap! {
1387                new_token_id => new_type_tag.clone()
1388            },
1389            1000,
1390        )
1391        .unwrap();
1392        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1393        mock_transaction_error(
1394            &sui_client_mock,
1395            tx_digest,
1396            BridgeError::Generic("some random error".to_string()),
1397            true,
1398        );
1399
1400        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1401        sui_client_mock.add_gas_object_info(
1402            gas_coin.clone(),
1403            gas_object_ref,
1404            Owner::AddressOwner(sui_address),
1405        );
1406        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1407
1408        // Kick it (send to the execution queue, skipping the signing queue)
1409        execution_tx
1410            .send(CertifiedBridgeActionExecutionWrapper(
1411                action_certificate.clone(),
1412                0,
1413            ))
1414            .await
1415            .unwrap();
1416
1417        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1418        // Nothing is sent to execute, because the token id does not exist yet
1419        assert_eq!(
1420            tx_subscription.try_recv().unwrap_err(),
1421            tokio::sync::broadcast::error::TryRecvError::Empty
1422        );
1423
1424        // Now insert the new token id
1425        id_token_map.insert(new_token_id, new_type_tag);
1426        sui_token_type_tags.store(Arc::new(id_token_map));
1427
1428        // Kick it again
1429        execution_tx
1430            .send(CertifiedBridgeActionExecutionWrapper(
1431                action_certificate.clone(),
1432                0,
1433            ))
1434            .await
1435            .unwrap();
1436
1437        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1438        // The action is sent to execution
1439        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1440    }
1441
1442    fn mock_bridge_authority_sigs(
1443        mocks: Vec<&BridgeRequestMockHandler>,
1444        action: &BridgeAction,
1445        secrets: Vec<&BridgeAuthorityKeyPair>,
1446        sui_tx_digest: TransactionDigest,
1447        sui_tx_event_index: u16,
1448    ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1449        assert_eq!(mocks.len(), secrets.len());
1450        let mut signed_actions = BTreeMap::new();
1451        for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1452            let signed_action = sign_action_with_key(action, secret);
1453            mock.add_sui_event_response(
1454                sui_tx_digest,
1455                sui_tx_event_index,
1456                Ok(signed_action.clone()),
1457                None,
1458            );
1459            signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1460        }
1461        signed_actions
1462    }
1463
1464    fn mock_bridge_authority_signing_errors(
1465        mocks: Vec<&BridgeRequestMockHandler>,
1466        sui_tx_digest: TransactionDigest,
1467        sui_tx_event_index: u16,
1468    ) {
1469        for mock in mocks {
1470            mock.add_sui_event_response(
1471                sui_tx_digest,
1472                sui_tx_event_index,
1473                Err(BridgeError::RestAPIError("small issue".into())),
1474                None,
1475            );
1476        }
1477    }
1478
1479    /// Create a BridgeAction and mock authorities to return signatures
1480    fn get_bridge_authority_approved_action(
1481        mocks: Vec<&BridgeRequestMockHandler>,
1482        secrets: Vec<&BridgeAuthorityKeyPair>,
1483        token_id: Option<u8>,
1484        sui_to_eth: bool,
1485    ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1486        let sui_tx_digest = TransactionDigest::random();
1487        let sui_tx_event_index = 1;
1488        let action = if sui_to_eth {
1489            get_test_sui_to_eth_bridge_action(
1490                Some(sui_tx_digest),
1491                Some(sui_tx_event_index),
1492                None,
1493                None,
1494                None,
1495                None,
1496                token_id,
1497            )
1498        } else {
1499            get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1500        };
1501
1502        let sigs =
1503            mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1504        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1505            action,
1506            BridgeCommitteeValiditySignInfo { signatures: sigs },
1507        );
1508        (
1509            VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1510            sui_tx_digest,
1511            sui_tx_event_index,
1512        )
1513    }
1514
1515    fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1516        let sig = Signature::new_secure(
1517            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1518            dummy_sui_key,
1519        );
1520        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1521        *signed_tx.digest()
1522    }
1523
1524    /// Why is `wildcard` needed? This is because authority signatures
1525    /// are part of transaction data. Depending on whose signatures
1526    /// are included in what order, this may change the tx digest.
1527    fn mock_transaction_response(
1528        sui_client_mock: &SuiMockClient,
1529        tx_digest: TransactionDigest,
1530        status: SuiExecutionStatus,
1531        events: Option<Vec<SuiEvent>>,
1532        wildcard: bool,
1533    ) {
1534        let response = ExecuteTransactionResult {
1535            status,
1536            events: events.unwrap_or_default(),
1537        };
1538        if wildcard {
1539            sui_client_mock.set_wildcard_transaction_response(Ok(response));
1540        } else {
1541            sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1542        }
1543    }
1544
1545    fn mock_transaction_error(
1546        sui_client_mock: &SuiMockClient,
1547        tx_digest: TransactionDigest,
1548        error: BridgeError,
1549        wildcard: bool,
1550    ) {
1551        if wildcard {
1552            sui_client_mock.set_wildcard_transaction_response(Err(error));
1553        } else {
1554            sui_client_mock.add_transaction_response(tx_digest, Err(error));
1555        }
1556    }
1557
1558    #[allow(clippy::type_complexity)]
1559    async fn setup() -> (
1560        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1561        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1562        SuiMockClient,
1563        tokio::sync::broadcast::Receiver<TransactionDigest>,
1564        Arc<BridgeOrchestratorTables>,
1565        Vec<BridgeAuthorityKeyPair>,
1566        SuiKeyPair,
1567        BridgeRequestMockHandler,
1568        BridgeRequestMockHandler,
1569        BridgeRequestMockHandler,
1570        BridgeRequestMockHandler,
1571        Vec<tokio::task::JoinHandle<()>>,
1572        ObjectRef,
1573        SuiAddress,
1574        Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1575        tokio::sync::watch::Sender<IsBridgePaused>,
1576    ) {
1577        telemetry_subscribers::init_for_testing();
1578        let registry = Registry::new();
1579        mysten_metrics::init_metrics(&registry);
1580        init_all_struct_tags();
1581
1582        let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1583        let sui_key = SuiKeyPair::from(kp);
1584        let gas_object_ref = random_object_ref();
1585        let temp_dir = tempfile::tempdir().unwrap();
1586        let store = BridgeOrchestratorTables::new(temp_dir.path());
1587        let sui_client_mock = SuiMockClient::default();
1588        let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1589        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1590
1591        // The dummy key is used to sign transaction so we can get TransactionDigest easily.
1592        // User signature is not part of the transaction so it does not matter which key it is.
1593        let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1594        let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1595
1596        let mock0 = BridgeRequestMockHandler::new();
1597        let mock1 = BridgeRequestMockHandler::new();
1598        let mock2 = BridgeRequestMockHandler::new();
1599        let mock3 = BridgeRequestMockHandler::new();
1600
1601        let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1602            vec![2500, 2500, 2500, 2500],
1603            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1604        );
1605
1606        let committee = BridgeCommittee::new(authorities).unwrap();
1607
1608        let agg = Arc::new(ArcSwap::new(Arc::new(
1609            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1610        )));
1611        let metrics = Arc::new(BridgeMetrics::new(&registry));
1612        let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1613        let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1614        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1615        let executor = BridgeActionExecutor::new(
1616            sui_client.clone(),
1617            agg.clone(),
1618            store.clone(),
1619            sui_key,
1620            sui_address,
1621            gas_object_ref.0,
1622            sui_token_type_tags.clone(),
1623            bridge_pause_rx,
1624            metrics,
1625        )
1626        .await;
1627
1628        let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1629        handles.extend(executor_handle);
1630
1631        (
1632            signing_tx,
1633            execution_tx,
1634            sui_client_mock,
1635            tx_subscription,
1636            store,
1637            secrets,
1638            dummy_sui_key,
1639            mock0,
1640            mock1,
1641            mock2,
1642            mock3,
1643            handles,
1644            gas_object_ref,
1645            sui_address,
1646            sui_token_type_tags,
1647            bridge_pause_tx,
1648        )
1649    }
1650}