sui_bridge/
action_executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! BridgeActionExecutor receives BridgeActions (from BridgeOrchestrator),
5//! collects bridge authority signatures and submit signatures on chain.
6
7use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::SuiExecutionStatus;
13use sui_types::TypeTag;
14use sui_types::transaction::ObjectArg;
15use sui_types::{
16    base_types::{ObjectID, ObjectRef, SuiAddress},
17    crypto::{Signature, SuiKeyPair},
18    digests::TransactionDigest,
19    gas_coin::GasCoin,
20    object::Owner,
21    transaction::Transaction,
22};
23
24use crate::events::{
25    TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
26    TokenTransferClaimed,
27};
28use crate::metrics::BridgeMetrics;
29use crate::{
30    client::bridge_authority_aggregator::BridgeAuthorityAggregator,
31    error::BridgeError,
32    storage::BridgeOrchestratorTables,
33    sui_client::{ExecuteTransactionResult, SuiClient, SuiClientInner},
34    sui_transaction_builder::build_sui_transaction,
35    types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
36};
37use std::collections::HashMap;
38use std::sync::Arc;
39use tokio::sync::Semaphore;
40use tokio::time::Duration;
41use tracing::{Instrument, error, info, instrument, warn};
42
43pub const CHANNEL_SIZE: usize = 1000;
44pub const SIGNING_CONCURRENCY: usize = 10;
45
46// delay schedule: at most 16 times including the initial attempt
47// 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s, 25.6s, 51.2s, 102.4s, 204.8s, 409.6s, 819.2s, 1638.4s
48pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
49pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
50
51async fn delay(attempt_times: u64) {
52    let delay_ms = 100 * 2_u64.pow(attempt_times as u32);
53    tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
54}
55
56#[derive(Debug)]
57pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
58
59#[derive(Debug)]
60pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
61
62pub trait BridgeActionExecutorTrait {
63    fn run(
64        self,
65    ) -> (
66        Vec<tokio::task::JoinHandle<()>>,
67        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
68    );
69}
70
71pub struct BridgeActionExecutor<C> {
72    sui_client: Arc<SuiClient<C>>,
73    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
74    key: SuiKeyPair,
75    sui_address: SuiAddress,
76    gas_object_id: ObjectID,
77    store: Arc<BridgeOrchestratorTables>,
78    bridge_object_arg: ObjectArg,
79    sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
80    bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
81    metrics: Arc<BridgeMetrics>,
82}
83
84impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
85where
86    C: SuiClientInner + 'static,
87{
88    fn run(
89        self,
90    ) -> (
91        Vec<tokio::task::JoinHandle<()>>,
92        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
93    ) {
94        let (tasks, sender, _) = self.run_inner();
95        (tasks, sender)
96    }
97}
98
99impl<C> BridgeActionExecutor<C>
100where
101    C: SuiClientInner + 'static,
102{
103    pub async fn new(
104        sui_client: Arc<SuiClient<C>>,
105        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
106        store: Arc<BridgeOrchestratorTables>,
107        key: SuiKeyPair,
108        sui_address: SuiAddress,
109        gas_object_id: ObjectID,
110        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
111        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
112        metrics: Arc<BridgeMetrics>,
113    ) -> Self {
114        let bridge_object_arg = sui_client
115            .get_mutable_bridge_object_arg_must_succeed()
116            .await;
117        Self {
118            sui_client,
119            bridge_auth_agg,
120            store,
121            key,
122            gas_object_id,
123            sui_address,
124            bridge_object_arg,
125            sui_token_type_tags,
126            bridge_pause_rx,
127            metrics,
128        }
129    }
130
131    fn run_inner(
132        self,
133    ) -> (
134        Vec<tokio::task::JoinHandle<()>>,
135        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
136        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
137    ) {
138        let key = self.key;
139
140        let (sender, receiver) = mysten_metrics::metered_channel::channel(
141            CHANNEL_SIZE,
142            &mysten_metrics::get_metrics()
143                .unwrap()
144                .channel_inflight
145                .with_label_values(&["executor_signing_queue"]),
146        );
147
148        let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
149            CHANNEL_SIZE,
150            &mysten_metrics::get_metrics()
151                .unwrap()
152                .channel_inflight
153                .with_label_values(&["executor_execution_queue"]),
154        );
155        let execution_tx_clone = execution_tx.clone();
156        let sender_clone = sender.clone();
157        let store_clone = self.store.clone();
158        let client_clone = self.sui_client.clone();
159        let mut tasks = vec![];
160        let metrics = self.metrics.clone();
161        tasks.push(spawn_logged_monitored_task!(
162            Self::run_signature_aggregation_loop(
163                client_clone,
164                self.bridge_auth_agg,
165                store_clone,
166                sender_clone,
167                receiver,
168                execution_tx_clone,
169                metrics,
170            )
171        ));
172
173        let metrics = self.metrics.clone();
174        let execution_tx_clone = execution_tx.clone();
175        tasks.push(spawn_logged_monitored_task!(
176            Self::run_onchain_execution_loop(
177                self.sui_client.clone(),
178                key,
179                self.sui_address,
180                self.gas_object_id,
181                self.store.clone(),
182                execution_tx_clone,
183                execution_rx,
184                self.bridge_object_arg,
185                self.sui_token_type_tags,
186                self.bridge_pause_rx,
187                metrics,
188            )
189        ));
190        (tasks, sender, execution_tx)
191    }
192
193    async fn run_signature_aggregation_loop(
194        sui_client: Arc<SuiClient<C>>,
195        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
196        store: Arc<BridgeOrchestratorTables>,
197        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
198        mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
199            BridgeActionExecutionWrapper,
200        >,
201        execution_queue_sender: mysten_metrics::metered_channel::Sender<
202            CertifiedBridgeActionExecutionWrapper,
203        >,
204        metrics: Arc<BridgeMetrics>,
205    ) {
206        info!("Starting run_signature_aggregation_loop");
207        let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
208        while let Some(action) = signing_queue_receiver.recv().await {
209            Self::handle_signing_task(
210                &semaphore,
211                &auth_agg,
212                &signing_queue_sender,
213                &execution_queue_sender,
214                &sui_client,
215                &store,
216                action,
217                &metrics,
218            )
219            .await;
220        }
221    }
222
223    async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
224        let Ok(Ok(is_paused)) =
225            retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
226        else {
227            error!("Failed to get bridge status after retry");
228            return false;
229        };
230        !is_paused
231    }
232
233    #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
234    async fn handle_signing_task(
235        semaphore: &Arc<Semaphore>,
236        auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
237        signing_queue_sender: &mysten_metrics::metered_channel::Sender<
238            BridgeActionExecutionWrapper,
239        >,
240        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
241            CertifiedBridgeActionExecutionWrapper,
242        >,
243        sui_client: &Arc<SuiClient<C>>,
244        store: &Arc<BridgeOrchestratorTables>,
245        action: BridgeActionExecutionWrapper,
246        metrics: &Arc<BridgeMetrics>,
247    ) {
248        metrics.action_executor_signing_queue_received_actions.inc();
249        let action_key = action.0.key();
250        info!("Received action for signing: {:?}", action.0);
251
252        // TODO: this is a temporary fix to avoid signing when the bridge is paused.
253        // but the way is implemented is not ideal:
254        // 1. it should check the direction
255        // 2. should use a better mechanism to check the bridge status instead of polling for each action
256        let should_proceed = Self::should_proceed_signing(sui_client).await;
257        if !should_proceed {
258            metrics.action_executor_signing_queue_skipped_actions.inc();
259            warn!("skipping signing task: {:?}", action_key);
260            return;
261        }
262
263        let auth_agg_clone = auth_agg.clone();
264        let signing_queue_sender_clone = signing_queue_sender.clone();
265        let execution_queue_sender_clone = execution_queue_sender.clone();
266        let sui_client_clone = sui_client.clone();
267        let store_clone = store.clone();
268        let metrics_clone = metrics.clone();
269        let semaphore_clone = semaphore.clone();
270        spawn_logged_monitored_task!(
271            Self::request_signatures(
272                semaphore_clone,
273                sui_client_clone,
274                auth_agg_clone,
275                action,
276                store_clone,
277                signing_queue_sender_clone,
278                execution_queue_sender_clone,
279                metrics_clone,
280            )
281            .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
282            "request_signatures"
283        );
284    }
285
286    // Checks if the action is already processed on chain.
287    // If yes, skip this action and remove it from the pending log.
288    // Returns true if the action is already processed.
289    async fn handle_already_processed_token_transfer_action_maybe(
290        sui_client: &Arc<SuiClient<C>>,
291        action: &BridgeAction,
292        store: &Arc<BridgeOrchestratorTables>,
293        metrics: &Arc<BridgeMetrics>,
294    ) -> bool {
295        let status = sui_client
296            .get_token_transfer_action_onchain_status_until_success(
297                action.chain_id() as u8,
298                action.seq_number(),
299            )
300            .await;
301        match status {
302            BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
303                info!(
304                    "Action already approved or claimed, removing action from pending logs: {:?}",
305                    action
306                );
307                metrics.action_executor_already_processed_actions.inc();
308                store
309                    .remove_pending_actions(&[action.digest()])
310                    .unwrap_or_else(|e| {
311                        panic!("Write to DB should not fail: {:?}", e);
312                    });
313                true
314            }
315            // Although theoretically a legit SuiToEthBridgeAction should not have
316            // status `NotFound`
317            BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
318        }
319    }
320
321    // TODO: introduce a way to properly stagger the handling
322    // for various validators.
323    async fn request_signatures(
324        semaphore: Arc<Semaphore>,
325        sui_client: Arc<SuiClient<C>>,
326        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
327        action: BridgeActionExecutionWrapper,
328        store: Arc<BridgeOrchestratorTables>,
329        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
330        execution_queue_sender: mysten_metrics::metered_channel::Sender<
331            CertifiedBridgeActionExecutionWrapper,
332        >,
333        metrics: Arc<BridgeMetrics>,
334    ) {
335        let _permit = semaphore
336            .acquire()
337            .await
338            .expect("semaphore should not be closed");
339        info!("requesting signatures");
340        let BridgeActionExecutionWrapper(action, attempt_times) = action;
341
342        // Only token transfer action should reach here
343        match &action {
344            BridgeAction::SuiToEthBridgeAction(_)
345            | BridgeAction::SuiToEthTokenTransfer(_)
346            | BridgeAction::EthToSuiBridgeAction(_) => (),
347            _ => unreachable!("Non token transfer action should not reach here"),
348        };
349
350        // If the action is already processed, skip it.
351        if Self::handle_already_processed_token_transfer_action_maybe(
352            &sui_client,
353            &action,
354            &store,
355            &metrics,
356        )
357        .await
358        {
359            return;
360        }
361        match auth_agg
362            .load()
363            .request_committee_signatures(action.clone())
364            .await
365        {
366            Ok(certificate) => {
367                info!("Sending certificate to execution");
368                execution_queue_sender
369                    .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
370                    .await
371                    .unwrap_or_else(|e| {
372                        panic!("Sending to execution queue should not fail: {:?}", e);
373                    });
374            }
375            Err(e) => {
376                warn!("Failed to collect sigs for bridge action: {:?}", e);
377                metrics.err_signature_aggregation.inc();
378
379                // TODO: spawn a task for this
380                if attempt_times >= MAX_SIGNING_ATTEMPTS {
381                    metrics.err_signature_aggregation_too_many_failures.inc();
382                    error!(
383                        "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
384                        e
385                    );
386                    return;
387                }
388                delay(attempt_times).await;
389                signing_queue_sender
390                    .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
391                    .await
392                    .unwrap_or_else(|e| {
393                        panic!("Sending to signing queue should not fail: {:?}", e);
394                    });
395            }
396        }
397    }
398
399    // Before calling this function, `key` and `sui_address` need to be
400    // verified to match.
401    async fn run_onchain_execution_loop(
402        sui_client: Arc<SuiClient<C>>,
403        sui_key: SuiKeyPair,
404        sui_address: SuiAddress,
405        gas_object_id: ObjectID,
406        store: Arc<BridgeOrchestratorTables>,
407        execution_queue_sender: mysten_metrics::metered_channel::Sender<
408            CertifiedBridgeActionExecutionWrapper,
409        >,
410        mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
411            CertifiedBridgeActionExecutionWrapper,
412        >,
413        bridge_object_arg: ObjectArg,
414        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
415        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
416        metrics: Arc<BridgeMetrics>,
417    ) {
418        info!("Starting run_onchain_execution_loop");
419        while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
420            // When bridge is paused, skip execution.
421            // Skipped actions will be picked up upon node restarting
422            // if bridge is unpaused.
423            if *bridge_pause_rx.borrow() {
424                warn!("Bridge is paused, skipping execution");
425                metrics
426                    .action_executor_execution_queue_skipped_actions_due_to_pausing
427                    .inc();
428                continue;
429            }
430            Self::handle_execution_task(
431                certificate_wrapper,
432                &sui_client,
433                &sui_key,
434                &sui_address,
435                gas_object_id,
436                &store,
437                &execution_queue_sender,
438                &bridge_object_arg,
439                &sui_token_type_tags,
440                &metrics,
441            )
442            .await;
443        }
444        panic!("Execution queue closed unexpectedly");
445    }
446
447    #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
448    async fn handle_execution_task(
449        certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
450        sui_client: &Arc<SuiClient<C>>,
451        sui_key: &SuiKeyPair,
452        sui_address: &SuiAddress,
453        gas_object_id: ObjectID,
454        store: &Arc<BridgeOrchestratorTables>,
455        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
456            CertifiedBridgeActionExecutionWrapper,
457        >,
458        bridge_object_arg: &ObjectArg,
459        sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
460        metrics: &Arc<BridgeMetrics>,
461    ) {
462        metrics
463            .action_executor_execution_queue_received_actions
464            .inc();
465        let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
466        let action = certificate.data();
467        let action_key = action.key();
468
469        info!("Received certified action for execution: {:?}", action);
470
471        // TODO check gas coin balance here. If gas balance too low, do not proceed.
472        let (gas_coin, gas_object_ref) =
473            Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
474        metrics.gas_coin_balance.set(gas_coin.value() as i64);
475
476        let ceriticate_clone = certificate.clone();
477
478        // Check once: if the action is already processed, skip it.
479        if Self::handle_already_processed_token_transfer_action_maybe(
480            sui_client, action, store, metrics,
481        )
482        .await
483        {
484            info!("Action already processed, skipping");
485            return;
486        }
487
488        info!("Building Sui transaction");
489        let rgp = sui_client.get_reference_gas_price_until_success().await;
490        let tx_data = match build_sui_transaction(
491            *sui_address,
492            &gas_object_ref,
493            ceriticate_clone,
494            *bridge_object_arg,
495            sui_token_type_tags.load().as_ref(),
496            rgp,
497        ) {
498            Ok(tx_data) => tx_data,
499            Err(err) => {
500                metrics.err_build_sui_transaction.inc();
501                error!(
502                    "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
503                    action, err
504                );
505                // This should not happen, but in case it does, we do not want to
506                // panic, instead we log here for manual intervention.
507                return;
508            }
509        };
510        let sig = Signature::new_secure(
511            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
512            sui_key,
513        );
514        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
515        let tx_digest = *signed_tx.digest();
516
517        // Check twice: If the action is already processed, skip it.
518        if Self::handle_already_processed_token_transfer_action_maybe(
519            sui_client, action, store, metrics,
520        )
521        .await
522        {
523            info!("Action already processed, skipping");
524            return;
525        }
526
527        info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
528        match sui_client
529            .execute_transaction_block_with_effects(signed_tx)
530            .await
531        {
532            Ok(resp) => {
533                Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
534            }
535
536            // If the transaction did not go through, retry up to a certain times.
537            Err(err) => {
538                error!(
539                    ?action_key,
540                    ?tx_digest,
541                    "Sui transaction failed at signing: {err:?}"
542                );
543                metrics.err_sui_transaction_submission.inc();
544                let metrics_clone = metrics.clone();
545                // Do this in a separate task so we won't deadlock here
546                let sender_clone = execution_queue_sender.clone();
547                spawn_logged_monitored_task!(async move {
548                    // If it fails for too many times, log and ask for manual intervention.
549                    if attempt_times >= MAX_EXECUTION_ATTEMPTS {
550                        metrics_clone
551                            .err_sui_transaction_submission_too_many_failures
552                            .inc();
553                        error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
554                        return;
555                    }
556                    delay(attempt_times).await;
557                    sender_clone
558                        .send(CertifiedBridgeActionExecutionWrapper(
559                            certificate,
560                            attempt_times + 1,
561                        ))
562                        .await
563                        .unwrap_or_else(|e| {
564                            panic!("Sending to execution queue should not fail: {:?}", e);
565                        });
566                    info!("Re-enqueued certificate for execution");
567                }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
568            }
569        }
570    }
571
572    // TODO: do we need a mechanism to periodically read pending actions from DB?
573    async fn handle_execution_effects(
574        tx_digest: TransactionDigest,
575        response: ExecuteTransactionResult,
576        store: &Arc<BridgeOrchestratorTables>,
577        action: &BridgeAction,
578        metrics: &Arc<BridgeMetrics>,
579    ) {
580        match &response.status {
581            SuiExecutionStatus::Success => {
582                let relevant_events = response
583                    .events
584                    .iter()
585                    .filter(|e| {
586                        e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
587                            || e.type_ == *TokenTransferClaimed.get().unwrap()
588                            || e.type_ == *TokenTransferApproved.get().unwrap()
589                            || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
590                    })
591                    .collect::<Vec<_>>();
592                assert!(
593                    !relevant_events.is_empty(),
594                    "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
595                    or TokenTransferAlreadyApproved event but got: {:?}",
596                    response.events
597                );
598                info!(?tx_digest, "Sui transaction executed successfully");
599                // track successful approval and claim events
600                relevant_events.iter().for_each(|e| {
601                    if e.type_ == *TokenTransferClaimed.get().unwrap() {
602                        match action {
603                            BridgeAction::EthToSuiBridgeAction(_) => {
604                                metrics.eth_sui_token_transfer_claimed.inc();
605                            }
606                            BridgeAction::SuiToEthBridgeAction(_) => {
607                                metrics.sui_eth_token_transfer_claimed.inc();
608                            }
609                            _ => error!("Unexpected action type for claimed event: {:?}", action),
610                        }
611                    } else if e.type_ == *TokenTransferApproved.get().unwrap() {
612                        match action {
613                            BridgeAction::EthToSuiBridgeAction(_) => {
614                                metrics.eth_sui_token_transfer_approved.inc();
615                            }
616                            BridgeAction::SuiToEthBridgeAction(_) => {
617                                metrics.sui_eth_token_transfer_approved.inc();
618                            }
619                            _ => error!("Unexpected action type for approved event: {:?}", action),
620                        }
621                    }
622                });
623                store
624                    .remove_pending_actions(&[action.digest()])
625                    .unwrap_or_else(|e| {
626                        panic!("Write to DB should not fail: {:?}", e);
627                    })
628            }
629            SuiExecutionStatus::Failure { error } => {
630                // In practice the transaction could fail because of running out of gas, but really
631                // should not be due to other reasons.
632                // This means manual intervention is needed. So we do not push them back to
633                // the execution queue because retries are mostly likely going to fail anyway.
634                // After human examination, the node should be restarted and fetch them from WAL.
635
636                metrics.err_sui_transaction_execution.inc();
637                error!(
638                    ?tx_digest,
639                    "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
640                );
641            }
642        }
643    }
644
645    /// Panics if the gas object is not owned by the address.
646    async fn get_gas_data_assert_ownership(
647        sui_address: SuiAddress,
648        gas_object_id: ObjectID,
649        sui_client: &SuiClient<C>,
650    ) -> (GasCoin, ObjectRef) {
651        let (gas_coin, gas_obj_ref, owner) = sui_client
652            .get_gas_data_panic_if_not_gas(gas_object_id)
653            .await;
654
655        // TODO: when we add multiple gas support in the future we could discard
656        // transferred gas object instead.
657        assert_eq!(
658            owner,
659            Owner::AddressOwner(sui_address),
660            "Gas object {:?} is no longer owned by address {}",
661            gas_object_id,
662            sui_address
663        );
664        (gas_coin, gas_obj_ref)
665    }
666}
667
668pub async fn submit_to_executor(
669    tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
670    action: BridgeAction,
671) -> Result<(), BridgeError> {
672    tx.send(BridgeActionExecutionWrapper(action, 0))
673        .await
674        .map_err(|e| BridgeError::Generic(e.to_string()))
675}
676
677#[cfg(test)]
678mod tests {
679    use crate::events::init_all_struct_tags;
680    use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
681    use crate::types::BRIDGE_PAUSED;
682    use fastcrypto::traits::KeyPair;
683    use prometheus::Registry;
684    use std::collections::{BTreeMap, HashMap};
685    use std::str::FromStr;
686    use sui_json_rpc_types::SuiEvent;
687    use sui_types::TypeTag;
688    use sui_types::crypto::get_key_pair;
689    use sui_types::gas_coin::GasCoin;
690    use sui_types::{base_types::random_object_ref, transaction::TransactionData};
691
692    use crate::{
693        crypto::{
694            BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
695            BridgeAuthorityRecoverableSignature,
696        },
697        server::mock_handler::BridgeRequestMockHandler,
698        sui_mock_client::SuiMockClient,
699        test_utils::{
700            get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
701            get_test_sui_to_eth_bridge_action, sign_action_with_key,
702        },
703        types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
704    };
705
706    use super::*;
707
708    #[tokio::test]
709    async fn test_onchain_execution_loop() {
710        let (
711            signing_tx,
712            _execution_tx,
713            sui_client_mock,
714            mut tx_subscription,
715            store,
716            secrets,
717            dummy_sui_key,
718            mock0,
719            mock1,
720            mock2,
721            mock3,
722            _handles,
723            gas_object_ref,
724            sui_address,
725            sui_token_type_tags,
726            _bridge_pause_tx,
727        ) = setup().await;
728        let (action_certificate, _, _) = get_bridge_authority_approved_action(
729            vec![&mock0, &mock1, &mock2, &mock3],
730            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
731            None,
732            true,
733        );
734        let action = action_certificate.data().clone();
735        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
736        let tx_data = build_sui_transaction(
737            sui_address,
738            &gas_object_ref,
739            action_certificate,
740            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
741            &id_token_map,
742            1000,
743        )
744        .unwrap();
745
746        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
747
748        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
749        sui_client_mock.add_gas_object_info(
750            gas_coin.clone(),
751            gas_object_ref,
752            Owner::AddressOwner(sui_address),
753        );
754
755        // Mock the transaction to be successfully executed
756        let mut event = SuiEvent::random_for_testing();
757        event.type_ = TokenTransferClaimed.get().unwrap().clone();
758        let events = vec![event];
759        mock_transaction_response(
760            &sui_client_mock,
761            tx_digest,
762            SuiExecutionStatus::Success,
763            Some(events),
764            true,
765        );
766
767        store
768            .insert_pending_actions(std::slice::from_ref(&action))
769            .unwrap();
770        assert_eq!(
771            store.get_all_pending_actions()[&action.digest()],
772            action.clone()
773        );
774
775        // Kick it
776        submit_to_executor(&signing_tx, action.clone())
777            .await
778            .unwrap();
779
780        // Expect to see the transaction to be requested and successfully executed hence removed from WAL
781        tx_subscription.recv().await.unwrap();
782        assert!(store.get_all_pending_actions().is_empty());
783
784        /////////////////////////////////////////////////////////////////////////////////////////////////
785        ////////////////////////////////////// Test execution failure ///////////////////////////////////
786        /////////////////////////////////////////////////////////////////////////////////////////////////
787
788        let (action_certificate, _, _) = get_bridge_authority_approved_action(
789            vec![&mock0, &mock1, &mock2, &mock3],
790            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
791            None,
792            true,
793        );
794
795        let action = action_certificate.data().clone();
796
797        let tx_data = build_sui_transaction(
798            sui_address,
799            &gas_object_ref,
800            action_certificate,
801            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
802            &id_token_map,
803            1000,
804        )
805        .unwrap();
806        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
807
808        // Mock the transaction to fail
809        mock_transaction_response(
810            &sui_client_mock,
811            tx_digest,
812            SuiExecutionStatus::Failure {
813                error: "failure is mother of success".to_string(),
814            },
815            None,
816            true,
817        );
818
819        store
820            .insert_pending_actions(std::slice::from_ref(&action))
821            .unwrap();
822        assert_eq!(
823            store.get_all_pending_actions()[&action.digest()],
824            action.clone()
825        );
826
827        // Kick it
828        submit_to_executor(&signing_tx, action.clone())
829            .await
830            .unwrap();
831
832        // Expect to see the transaction to be requested and but failed
833        tx_subscription.recv().await.unwrap();
834        // The action is not removed from WAL because the transaction failed
835        assert_eq!(
836            store.get_all_pending_actions()[&action.digest()],
837            action.clone()
838        );
839
840        /////////////////////////////////////////////////////////////////////////////////////////////////
841        //////////////////////////// Test transaction failed at signing stage ///////////////////////////
842        /////////////////////////////////////////////////////////////////////////////////////////////////
843
844        let (action_certificate, _, _) = get_bridge_authority_approved_action(
845            vec![&mock0, &mock1, &mock2, &mock3],
846            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
847            None,
848            true,
849        );
850
851        let action = action_certificate.data().clone();
852
853        let tx_data = build_sui_transaction(
854            sui_address,
855            &gas_object_ref,
856            action_certificate,
857            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
858            &id_token_map,
859            1000,
860        )
861        .unwrap();
862        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
863        mock_transaction_error(
864            &sui_client_mock,
865            tx_digest,
866            BridgeError::Generic("some random error".to_string()),
867            true,
868        );
869
870        store
871            .insert_pending_actions(std::slice::from_ref(&action))
872            .unwrap();
873        assert_eq!(
874            store.get_all_pending_actions()[&action.digest()],
875            action.clone()
876        );
877
878        // Kick it
879        submit_to_executor(&signing_tx, action.clone())
880            .await
881            .unwrap();
882
883        // Failure will trigger retry, we wait for 2 requests before checking WAL log
884        let tx_digest = tx_subscription.recv().await.unwrap();
885        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
886
887        // The retry is still going on, action still in WAL
888        assert!(
889            store
890                .get_all_pending_actions()
891                .contains_key(&action.digest())
892        );
893
894        // Now let it succeed
895        let mut event = SuiEvent::random_for_testing();
896        event.type_ = TokenTransferClaimed.get().unwrap().clone();
897        let events = vec![event];
898        mock_transaction_response(
899            &sui_client_mock,
900            tx_digest,
901            SuiExecutionStatus::Success,
902            Some(events),
903            true,
904        );
905
906        // Give it 1 second to retry and succeed
907        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
908        // The action is successful and should be removed from WAL now
909        assert!(
910            !store
911                .get_all_pending_actions()
912                .contains_key(&action.digest())
913        );
914    }
915
916    #[tokio::test]
917    async fn test_signature_aggregation_loop() {
918        let (
919            signing_tx,
920            _execution_tx,
921            sui_client_mock,
922            mut tx_subscription,
923            store,
924            secrets,
925            dummy_sui_key,
926            mock0,
927            mock1,
928            mock2,
929            mock3,
930            _handles,
931            gas_object_ref,
932            sui_address,
933            sui_token_type_tags,
934            _bridge_pause_tx,
935        ) = setup().await;
936        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
937        let (action_certificate, sui_tx_digest, sui_tx_event_index) =
938            get_bridge_authority_approved_action(
939                vec![&mock0, &mock1, &mock2, &mock3],
940                vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
941                None,
942                true,
943            );
944        let action = action_certificate.data().clone();
945        mock_bridge_authority_signing_errors(
946            vec![&mock0, &mock1, &mock2],
947            sui_tx_digest,
948            sui_tx_event_index,
949        );
950        let mut sigs = mock_bridge_authority_sigs(
951            vec![&mock3],
952            &action,
953            vec![&secrets[3]],
954            sui_tx_digest,
955            sui_tx_event_index,
956        );
957
958        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
959        sui_client_mock.add_gas_object_info(
960            gas_coin,
961            gas_object_ref,
962            Owner::AddressOwner(sui_address),
963        );
964        store
965            .insert_pending_actions(std::slice::from_ref(&action))
966            .unwrap();
967        assert_eq!(
968            store.get_all_pending_actions()[&action.digest()],
969            action.clone()
970        );
971
972        // Kick it
973        submit_to_executor(&signing_tx, action.clone())
974            .await
975            .unwrap();
976
977        // Wait until the transaction is retried at least once (instead of deing dropped)
978        loop {
979            let requested_times =
980                mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
981            if requested_times >= 2 {
982                break;
983            }
984            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
985        }
986        // Nothing is sent to execute yet
987        assert_eq!(
988            tx_subscription.try_recv().unwrap_err(),
989            tokio::sync::broadcast::error::TryRecvError::Empty
990        );
991        // Still in WAL
992        assert_eq!(
993            store.get_all_pending_actions()[&action.digest()],
994            action.clone()
995        );
996
997        // Let authorities sign the action too. Now we are above the threshold
998        let sig_from_2 = mock_bridge_authority_sigs(
999            vec![&mock2],
1000            &action,
1001            vec![&secrets[2]],
1002            sui_tx_digest,
1003            sui_tx_event_index,
1004        );
1005        sigs.extend(sig_from_2);
1006        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1007            action.clone(),
1008            BridgeCommitteeValiditySignInfo { signatures: sigs },
1009        );
1010        let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1011        let tx_data = build_sui_transaction(
1012            sui_address,
1013            &gas_object_ref,
1014            action_certificate,
1015            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1016            &id_token_map,
1017            1000,
1018        )
1019        .unwrap();
1020        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1021
1022        let mut event = SuiEvent::random_for_testing();
1023        event.type_ = TokenTransferClaimed.get().unwrap().clone();
1024        let events = vec![event];
1025        mock_transaction_response(
1026            &sui_client_mock,
1027            tx_digest,
1028            SuiExecutionStatus::Success,
1029            Some(events),
1030            true,
1031        );
1032
1033        // Expect to see the transaction to be requested and succeed
1034        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1035        // The action is removed from WAL
1036        assert!(
1037            !store
1038                .get_all_pending_actions()
1039                .contains_key(&action.digest())
1040        );
1041    }
1042
1043    #[tokio::test]
1044    async fn test_skip_request_signature_if_already_processed_on_chain() {
1045        let (
1046            signing_tx,
1047            _execution_tx,
1048            sui_client_mock,
1049            mut tx_subscription,
1050            store,
1051            _secrets,
1052            _dummy_sui_key,
1053            mock0,
1054            mock1,
1055            mock2,
1056            mock3,
1057            _handles,
1058            _gas_object_ref,
1059            _sui_address,
1060            _sui_token_type_tags,
1061            _bridge_pause_tx,
1062        ) = setup().await;
1063
1064        let sui_tx_digest = TransactionDigest::random();
1065        let sui_tx_event_index = 0;
1066        let action = get_test_sui_to_eth_bridge_action(
1067            Some(sui_tx_digest),
1068            Some(sui_tx_event_index),
1069            None,
1070            None,
1071            None,
1072            None,
1073            None,
1074        );
1075        mock_bridge_authority_signing_errors(
1076            vec![&mock0, &mock1, &mock2, &mock3],
1077            sui_tx_digest,
1078            sui_tx_event_index,
1079        );
1080        store
1081            .insert_pending_actions(std::slice::from_ref(&action))
1082            .unwrap();
1083        assert_eq!(
1084            store.get_all_pending_actions()[&action.digest()],
1085            action.clone()
1086        );
1087
1088        // Kick it
1089        submit_to_executor(&signing_tx, action.clone())
1090            .await
1091            .unwrap();
1092        let action_digest = action.digest();
1093
1094        // Wait for 1 second. It should still in the process of retrying requesting sigs becaues we mock errors above.
1095        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1096        tx_subscription.try_recv().unwrap_err();
1097        // And the action is still in WAL
1098        assert!(store.get_all_pending_actions().contains_key(&action_digest));
1099
1100        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1101
1102        // The next retry will see the action is already processed on chain and remove it from WAL
1103        let now = std::time::Instant::now();
1104        while store.get_all_pending_actions().contains_key(&action_digest) {
1105            if now.elapsed().as_secs() > 10 {
1106                panic!("Timeout waiting for action to be removed from WAL");
1107            }
1108            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1109        }
1110        tx_subscription.try_recv().unwrap_err();
1111    }
1112
1113    #[tokio::test]
1114    async fn test_skip_tx_submission_if_already_processed_on_chain() {
1115        let (
1116            _signing_tx,
1117            execution_tx,
1118            sui_client_mock,
1119            mut tx_subscription,
1120            store,
1121            secrets,
1122            dummy_sui_key,
1123            mock0,
1124            mock1,
1125            mock2,
1126            mock3,
1127            _handles,
1128            gas_object_ref,
1129            sui_address,
1130            sui_token_type_tags,
1131            _bridge_pause_tx,
1132        ) = setup().await;
1133        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1134        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1135            vec![&mock0, &mock1, &mock2, &mock3],
1136            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1137            None,
1138            true,
1139        );
1140
1141        let action = action_certificate.data().clone();
1142        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1143        let tx_data = build_sui_transaction(
1144            sui_address,
1145            &gas_object_ref,
1146            action_certificate.clone(),
1147            arg,
1148            &id_token_map,
1149            1000,
1150        )
1151        .unwrap();
1152        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1153        mock_transaction_error(
1154            &sui_client_mock,
1155            tx_digest,
1156            BridgeError::Generic("some random error".to_string()),
1157            true,
1158        );
1159
1160        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1161        sui_client_mock.add_gas_object_info(
1162            gas_coin.clone(),
1163            gas_object_ref,
1164            Owner::AddressOwner(sui_address),
1165        );
1166
1167        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1168
1169        store
1170            .insert_pending_actions(std::slice::from_ref(&action))
1171            .unwrap();
1172        assert_eq!(
1173            store.get_all_pending_actions()[&action.digest()],
1174            action.clone()
1175        );
1176
1177        // Kick it (send to the execution queue, skipping the signing queue)
1178        execution_tx
1179            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1180            .await
1181            .unwrap();
1182
1183        // Some requests come in and will fail.
1184        tx_subscription.recv().await.unwrap();
1185
1186        // Set the action to be already approved on chain
1187        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1188
1189        // The next retry will see the action is already processed on chain and remove it from WAL
1190        let now = std::time::Instant::now();
1191        let action_digest = action.digest();
1192        while store.get_all_pending_actions().contains_key(&action_digest) {
1193            if now.elapsed().as_secs() > 10 {
1194                panic!("Timeout waiting for action to be removed from WAL");
1195            }
1196            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1197        }
1198    }
1199
1200    #[tokio::test]
1201    async fn test_skip_tx_submission_if_bridge_is_paused() {
1202        let (
1203            _signing_tx,
1204            execution_tx,
1205            sui_client_mock,
1206            mut tx_subscription,
1207            store,
1208            secrets,
1209            dummy_sui_key,
1210            mock0,
1211            mock1,
1212            mock2,
1213            mock3,
1214            _handles,
1215            gas_object_ref,
1216            sui_address,
1217            sui_token_type_tags,
1218            bridge_pause_tx,
1219        ) = setup().await;
1220        let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1221        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1222            vec![&mock0, &mock1, &mock2, &mock3],
1223            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1224            None,
1225            true,
1226        );
1227
1228        let action = action_certificate.data().clone();
1229        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1230        let tx_data = build_sui_transaction(
1231            sui_address,
1232            &gas_object_ref,
1233            action_certificate.clone(),
1234            arg,
1235            &id_token_map,
1236            1000,
1237        )
1238        .unwrap();
1239        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1240        mock_transaction_error(
1241            &sui_client_mock,
1242            tx_digest,
1243            BridgeError::Generic("some random error".to_string()),
1244            true,
1245        );
1246
1247        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1248        sui_client_mock.add_gas_object_info(
1249            gas_coin.clone(),
1250            gas_object_ref,
1251            Owner::AddressOwner(sui_address),
1252        );
1253        let action_digest = action.digest();
1254        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1255
1256        // assert bridge is unpaused now
1257        assert!(!*bridge_pause_tx.borrow());
1258
1259        store
1260            .insert_pending_actions(std::slice::from_ref(&action))
1261            .unwrap();
1262        assert_eq!(
1263            store.get_all_pending_actions()[&action.digest()],
1264            action.clone()
1265        );
1266
1267        // Kick it (send to the execution queue, skipping the signing queue)
1268        execution_tx
1269            .send(CertifiedBridgeActionExecutionWrapper(
1270                action_certificate.clone(),
1271                0,
1272            ))
1273            .await
1274            .unwrap();
1275
1276        // Some requests come in
1277        tx_subscription.recv().await.unwrap();
1278
1279        // Pause the bridge
1280        bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1281
1282        // Kick it again
1283        execution_tx
1284            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1285            .await
1286            .unwrap();
1287
1288        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1289        // Nothing is sent to execute
1290        assert_eq!(
1291            tx_subscription.try_recv().unwrap_err(),
1292            tokio::sync::broadcast::error::TryRecvError::Empty
1293        );
1294        // Still in WAL
1295        assert_eq!(
1296            store.get_all_pending_actions()[&action_digest],
1297            action.clone()
1298        );
1299    }
1300
1301    #[tokio::test]
1302    async fn test_action_executor_handle_new_token() {
1303        let new_token_id = 255u8; // token id that does not exist
1304        let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1305        let (
1306            _signing_tx,
1307            execution_tx,
1308            sui_client_mock,
1309            mut tx_subscription,
1310            _store,
1311            secrets,
1312            dummy_sui_key,
1313            mock0,
1314            mock1,
1315            mock2,
1316            mock3,
1317            _handles,
1318            gas_object_ref,
1319            sui_address,
1320            sui_token_type_tags,
1321            _bridge_pause_tx,
1322        ) = setup().await;
1323        let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1324        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1325            vec![&mock0, &mock1, &mock2, &mock3],
1326            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1327            Some(new_token_id),
1328            false, // we need an eth -> sui action that entails the new token type tag in transaction building
1329        );
1330
1331        let action = action_certificate.data().clone();
1332        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1333        let tx_data = build_sui_transaction(
1334            sui_address,
1335            &gas_object_ref,
1336            action_certificate.clone(),
1337            arg,
1338            &maplit::hashmap! {
1339                new_token_id => new_type_tag.clone()
1340            },
1341            1000,
1342        )
1343        .unwrap();
1344        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1345        mock_transaction_error(
1346            &sui_client_mock,
1347            tx_digest,
1348            BridgeError::Generic("some random error".to_string()),
1349            true,
1350        );
1351
1352        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1353        sui_client_mock.add_gas_object_info(
1354            gas_coin.clone(),
1355            gas_object_ref,
1356            Owner::AddressOwner(sui_address),
1357        );
1358        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1359
1360        // Kick it (send to the execution queue, skipping the signing queue)
1361        execution_tx
1362            .send(CertifiedBridgeActionExecutionWrapper(
1363                action_certificate.clone(),
1364                0,
1365            ))
1366            .await
1367            .unwrap();
1368
1369        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1370        // Nothing is sent to execute, because the token id does not exist yet
1371        assert_eq!(
1372            tx_subscription.try_recv().unwrap_err(),
1373            tokio::sync::broadcast::error::TryRecvError::Empty
1374        );
1375
1376        // Now insert the new token id
1377        id_token_map.insert(new_token_id, new_type_tag);
1378        sui_token_type_tags.store(Arc::new(id_token_map));
1379
1380        // Kick it again
1381        execution_tx
1382            .send(CertifiedBridgeActionExecutionWrapper(
1383                action_certificate.clone(),
1384                0,
1385            ))
1386            .await
1387            .unwrap();
1388
1389        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1390        // The action is sent to execution
1391        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1392    }
1393
1394    fn mock_bridge_authority_sigs(
1395        mocks: Vec<&BridgeRequestMockHandler>,
1396        action: &BridgeAction,
1397        secrets: Vec<&BridgeAuthorityKeyPair>,
1398        sui_tx_digest: TransactionDigest,
1399        sui_tx_event_index: u16,
1400    ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1401        assert_eq!(mocks.len(), secrets.len());
1402        let mut signed_actions = BTreeMap::new();
1403        for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1404            let signed_action = sign_action_with_key(action, secret);
1405            mock.add_sui_event_response(
1406                sui_tx_digest,
1407                sui_tx_event_index,
1408                Ok(signed_action.clone()),
1409                None,
1410            );
1411            signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1412        }
1413        signed_actions
1414    }
1415
1416    fn mock_bridge_authority_signing_errors(
1417        mocks: Vec<&BridgeRequestMockHandler>,
1418        sui_tx_digest: TransactionDigest,
1419        sui_tx_event_index: u16,
1420    ) {
1421        for mock in mocks {
1422            mock.add_sui_event_response(
1423                sui_tx_digest,
1424                sui_tx_event_index,
1425                Err(BridgeError::RestAPIError("small issue".into())),
1426                None,
1427            );
1428        }
1429    }
1430
1431    /// Create a BridgeAction and mock authorities to return signatures
1432    fn get_bridge_authority_approved_action(
1433        mocks: Vec<&BridgeRequestMockHandler>,
1434        secrets: Vec<&BridgeAuthorityKeyPair>,
1435        token_id: Option<u8>,
1436        sui_to_eth: bool,
1437    ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1438        let sui_tx_digest = TransactionDigest::random();
1439        let sui_tx_event_index = 1;
1440        let action = if sui_to_eth {
1441            get_test_sui_to_eth_bridge_action(
1442                Some(sui_tx_digest),
1443                Some(sui_tx_event_index),
1444                None,
1445                None,
1446                None,
1447                None,
1448                token_id,
1449            )
1450        } else {
1451            get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1452        };
1453
1454        let sigs =
1455            mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1456        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1457            action,
1458            BridgeCommitteeValiditySignInfo { signatures: sigs },
1459        );
1460        (
1461            VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1462            sui_tx_digest,
1463            sui_tx_event_index,
1464        )
1465    }
1466
1467    fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1468        let sig = Signature::new_secure(
1469            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1470            dummy_sui_key,
1471        );
1472        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1473        *signed_tx.digest()
1474    }
1475
1476    /// Why is `wildcard` needed? This is because authority signatures
1477    /// are part of transaction data. Depending on whose signatures
1478    /// are included in what order, this may change the tx digest.
1479    fn mock_transaction_response(
1480        sui_client_mock: &SuiMockClient,
1481        tx_digest: TransactionDigest,
1482        status: SuiExecutionStatus,
1483        events: Option<Vec<SuiEvent>>,
1484        wildcard: bool,
1485    ) {
1486        let response = ExecuteTransactionResult {
1487            status,
1488            events: events.unwrap_or_default(),
1489        };
1490        if wildcard {
1491            sui_client_mock.set_wildcard_transaction_response(Ok(response));
1492        } else {
1493            sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1494        }
1495    }
1496
1497    fn mock_transaction_error(
1498        sui_client_mock: &SuiMockClient,
1499        tx_digest: TransactionDigest,
1500        error: BridgeError,
1501        wildcard: bool,
1502    ) {
1503        if wildcard {
1504            sui_client_mock.set_wildcard_transaction_response(Err(error));
1505        } else {
1506            sui_client_mock.add_transaction_response(tx_digest, Err(error));
1507        }
1508    }
1509
1510    #[allow(clippy::type_complexity)]
1511    async fn setup() -> (
1512        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1513        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1514        SuiMockClient,
1515        tokio::sync::broadcast::Receiver<TransactionDigest>,
1516        Arc<BridgeOrchestratorTables>,
1517        Vec<BridgeAuthorityKeyPair>,
1518        SuiKeyPair,
1519        BridgeRequestMockHandler,
1520        BridgeRequestMockHandler,
1521        BridgeRequestMockHandler,
1522        BridgeRequestMockHandler,
1523        Vec<tokio::task::JoinHandle<()>>,
1524        ObjectRef,
1525        SuiAddress,
1526        Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1527        tokio::sync::watch::Sender<IsBridgePaused>,
1528    ) {
1529        telemetry_subscribers::init_for_testing();
1530        let registry = Registry::new();
1531        mysten_metrics::init_metrics(&registry);
1532        init_all_struct_tags();
1533
1534        let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1535        let sui_key = SuiKeyPair::from(kp);
1536        let gas_object_ref = random_object_ref();
1537        let temp_dir = tempfile::tempdir().unwrap();
1538        let store = BridgeOrchestratorTables::new(temp_dir.path());
1539        let sui_client_mock = SuiMockClient::default();
1540        let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1541        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1542
1543        // The dummy key is used to sign transaction so we can get TransactionDigest easily.
1544        // User signature is not part of the transaction so it does not matter which key it is.
1545        let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1546        let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1547
1548        let mock0 = BridgeRequestMockHandler::new();
1549        let mock1 = BridgeRequestMockHandler::new();
1550        let mock2 = BridgeRequestMockHandler::new();
1551        let mock3 = BridgeRequestMockHandler::new();
1552
1553        let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1554            vec![2500, 2500, 2500, 2500],
1555            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1556        );
1557
1558        let committee = BridgeCommittee::new(authorities).unwrap();
1559
1560        let agg = Arc::new(ArcSwap::new(Arc::new(
1561            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1562        )));
1563        let metrics = Arc::new(BridgeMetrics::new(&registry));
1564        let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1565        let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1566        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1567        let executor = BridgeActionExecutor::new(
1568            sui_client.clone(),
1569            agg.clone(),
1570            store.clone(),
1571            sui_key,
1572            sui_address,
1573            gas_object_ref.0,
1574            sui_token_type_tags.clone(),
1575            bridge_pause_rx,
1576            metrics,
1577        )
1578        .await;
1579
1580        let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1581        handles.extend(executor_handle);
1582
1583        (
1584            signing_tx,
1585            execution_tx,
1586            sui_client_mock,
1587            tx_subscription,
1588            store,
1589            secrets,
1590            dummy_sui_key,
1591            mock0,
1592            mock1,
1593            mock2,
1594            mock3,
1595            handles,
1596            gas_object_ref,
1597            sui_address,
1598            sui_token_type_tags,
1599            bridge_pause_tx,
1600        )
1601    }
1602}