sui_bridge/
action_executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! BridgeActionExecutor receives BridgeActions (from BridgeOrchestrator),
5//! collects bridge authority signatures and submit signatures on chain.
6
7use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::SuiExecutionStatus;
13use sui_types::TypeTag;
14use sui_types::transaction::ObjectArg;
15use sui_types::{
16    base_types::{ObjectID, ObjectRef, SuiAddress},
17    crypto::{Signature, SuiKeyPair},
18    digests::TransactionDigest,
19    gas_coin::GasCoin,
20    object::Owner,
21    transaction::Transaction,
22};
23
24use crate::events::{
25    TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
26    TokenTransferClaimed,
27};
28use crate::metrics::BridgeMetrics;
29use crate::{
30    client::bridge_authority_aggregator::BridgeAuthorityAggregator,
31    error::BridgeError,
32    storage::BridgeOrchestratorTables,
33    sui_client::{ExecuteTransactionResult, SuiClient, SuiClientInner},
34    sui_transaction_builder::build_sui_transaction,
35    types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
36};
37use std::collections::HashMap;
38use std::sync::Arc;
39use tokio::sync::Semaphore;
40use tokio::time::Duration;
41use tracing::{Instrument, error, info, instrument, warn};
42
43pub const CHANNEL_SIZE: usize = 1000;
44pub const SIGNING_CONCURRENCY: usize = 10;
45
46// delay schedule: at most 16 times including the initial attempt
47// 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s, 25.6s, 51.2s, 102.4s, 204.8s, 409.6s, 819.2s, 1638.4s
48pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
49pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
50
51async fn delay(attempt_times: u64) {
52    let delay_ms = 100 * 2_u64.pow(attempt_times as u32);
53    tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
54}
55
56#[derive(Debug)]
57pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
58
59#[derive(Debug)]
60pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
61
62pub trait BridgeActionExecutorTrait {
63    fn run(
64        self,
65    ) -> (
66        Vec<tokio::task::JoinHandle<()>>,
67        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
68    );
69}
70
71pub struct BridgeActionExecutor<C> {
72    sui_client: Arc<SuiClient<C>>,
73    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
74    key: SuiKeyPair,
75    sui_address: SuiAddress,
76    gas_object_id: ObjectID,
77    store: Arc<BridgeOrchestratorTables>,
78    bridge_object_arg: ObjectArg,
79    sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
80    bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
81    metrics: Arc<BridgeMetrics>,
82}
83
84impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
85where
86    C: SuiClientInner + 'static,
87{
88    fn run(
89        self,
90    ) -> (
91        Vec<tokio::task::JoinHandle<()>>,
92        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
93    ) {
94        let (tasks, sender, _) = self.run_inner();
95        (tasks, sender)
96    }
97}
98
99impl<C> BridgeActionExecutor<C>
100where
101    C: SuiClientInner + 'static,
102{
103    pub async fn new(
104        sui_client: Arc<SuiClient<C>>,
105        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
106        store: Arc<BridgeOrchestratorTables>,
107        key: SuiKeyPair,
108        sui_address: SuiAddress,
109        gas_object_id: ObjectID,
110        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
111        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
112        metrics: Arc<BridgeMetrics>,
113    ) -> Self {
114        let bridge_object_arg = sui_client
115            .get_mutable_bridge_object_arg_must_succeed()
116            .await;
117        Self {
118            sui_client,
119            bridge_auth_agg,
120            store,
121            key,
122            gas_object_id,
123            sui_address,
124            bridge_object_arg,
125            sui_token_type_tags,
126            bridge_pause_rx,
127            metrics,
128        }
129    }
130
131    fn run_inner(
132        self,
133    ) -> (
134        Vec<tokio::task::JoinHandle<()>>,
135        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
136        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
137    ) {
138        let key = self.key;
139
140        let (sender, receiver) = mysten_metrics::metered_channel::channel(
141            CHANNEL_SIZE,
142            &mysten_metrics::get_metrics()
143                .unwrap()
144                .channel_inflight
145                .with_label_values(&["executor_signing_queue"]),
146        );
147
148        let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
149            CHANNEL_SIZE,
150            &mysten_metrics::get_metrics()
151                .unwrap()
152                .channel_inflight
153                .with_label_values(&["executor_execution_queue"]),
154        );
155        let execution_tx_clone = execution_tx.clone();
156        let sender_clone = sender.clone();
157        let store_clone = self.store.clone();
158        let client_clone = self.sui_client.clone();
159        let mut tasks = vec![];
160        let metrics = self.metrics.clone();
161        tasks.push(spawn_logged_monitored_task!(
162            Self::run_signature_aggregation_loop(
163                client_clone,
164                self.bridge_auth_agg,
165                store_clone,
166                sender_clone,
167                receiver,
168                execution_tx_clone,
169                metrics,
170            )
171        ));
172
173        let metrics = self.metrics.clone();
174        let execution_tx_clone = execution_tx.clone();
175        tasks.push(spawn_logged_monitored_task!(
176            Self::run_onchain_execution_loop(
177                self.sui_client.clone(),
178                key,
179                self.sui_address,
180                self.gas_object_id,
181                self.store.clone(),
182                execution_tx_clone,
183                execution_rx,
184                self.bridge_object_arg,
185                self.sui_token_type_tags,
186                self.bridge_pause_rx,
187                metrics,
188            )
189        ));
190        (tasks, sender, execution_tx)
191    }
192
193    async fn run_signature_aggregation_loop(
194        sui_client: Arc<SuiClient<C>>,
195        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
196        store: Arc<BridgeOrchestratorTables>,
197        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
198        mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
199            BridgeActionExecutionWrapper,
200        >,
201        execution_queue_sender: mysten_metrics::metered_channel::Sender<
202            CertifiedBridgeActionExecutionWrapper,
203        >,
204        metrics: Arc<BridgeMetrics>,
205    ) {
206        info!("Starting run_signature_aggregation_loop");
207        let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
208        while let Some(action) = signing_queue_receiver.recv().await {
209            Self::handle_signing_task(
210                &semaphore,
211                &auth_agg,
212                &signing_queue_sender,
213                &execution_queue_sender,
214                &sui_client,
215                &store,
216                action,
217                &metrics,
218            )
219            .await;
220        }
221    }
222
223    async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
224        let Ok(Ok(is_paused)) =
225            retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
226        else {
227            error!("Failed to get bridge status after retry");
228            return false;
229        };
230        !is_paused
231    }
232
233    #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
234    async fn handle_signing_task(
235        semaphore: &Arc<Semaphore>,
236        auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
237        signing_queue_sender: &mysten_metrics::metered_channel::Sender<
238            BridgeActionExecutionWrapper,
239        >,
240        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
241            CertifiedBridgeActionExecutionWrapper,
242        >,
243        sui_client: &Arc<SuiClient<C>>,
244        store: &Arc<BridgeOrchestratorTables>,
245        action: BridgeActionExecutionWrapper,
246        metrics: &Arc<BridgeMetrics>,
247    ) {
248        metrics.action_executor_signing_queue_received_actions.inc();
249        let action_key = action.0.key();
250        info!("Received action for signing: {:?}", action.0);
251
252        // TODO: this is a temporary fix to avoid signing when the bridge is paused.
253        // but the way is implemented is not ideal:
254        // 1. it should check the direction
255        // 2. should use a better mechanism to check the bridge status instead of polling for each action
256        let should_proceed = Self::should_proceed_signing(sui_client).await;
257        if !should_proceed {
258            metrics.action_executor_signing_queue_skipped_actions.inc();
259            warn!("skipping signing task: {:?}", action_key);
260            return;
261        }
262
263        let auth_agg_clone = auth_agg.clone();
264        let signing_queue_sender_clone = signing_queue_sender.clone();
265        let execution_queue_sender_clone = execution_queue_sender.clone();
266        let sui_client_clone = sui_client.clone();
267        let store_clone = store.clone();
268        let metrics_clone = metrics.clone();
269        let semaphore_clone = semaphore.clone();
270        spawn_logged_monitored_task!(
271            Self::request_signatures(
272                semaphore_clone,
273                sui_client_clone,
274                auth_agg_clone,
275                action,
276                store_clone,
277                signing_queue_sender_clone,
278                execution_queue_sender_clone,
279                metrics_clone,
280            )
281            .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
282            "request_signatures"
283        );
284    }
285
286    // Checks if the action is already processed on chain.
287    // If yes, skip this action and remove it from the pending log.
288    // Returns true if the action is already processed.
289    async fn handle_already_processed_token_transfer_action_maybe(
290        sui_client: &Arc<SuiClient<C>>,
291        action: &BridgeAction,
292        store: &Arc<BridgeOrchestratorTables>,
293        metrics: &Arc<BridgeMetrics>,
294    ) -> bool {
295        let status = sui_client
296            .get_token_transfer_action_onchain_status_until_success(
297                action.chain_id() as u8,
298                action.seq_number(),
299            )
300            .await;
301        match status {
302            BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
303                info!(
304                    "Action already approved or claimed, removing action from pending logs: {:?}",
305                    action
306                );
307                metrics.action_executor_already_processed_actions.inc();
308                store
309                    .remove_pending_actions(&[action.digest()])
310                    .unwrap_or_else(|e| {
311                        panic!("Write to DB should not fail: {:?}", e);
312                    });
313                true
314            }
315            // Although theoretically a legit SuiToEthBridgeAction should not have
316            // status `NotFound`
317            BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
318        }
319    }
320
321    // TODO: introduce a way to properly stagger the handling
322    // for various validators.
323    async fn request_signatures(
324        semaphore: Arc<Semaphore>,
325        sui_client: Arc<SuiClient<C>>,
326        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
327        action: BridgeActionExecutionWrapper,
328        store: Arc<BridgeOrchestratorTables>,
329        signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
330        execution_queue_sender: mysten_metrics::metered_channel::Sender<
331            CertifiedBridgeActionExecutionWrapper,
332        >,
333        metrics: Arc<BridgeMetrics>,
334    ) {
335        let _permit = semaphore
336            .acquire()
337            .await
338            .expect("semaphore should not be closed");
339        info!("requesting signatures");
340        let BridgeActionExecutionWrapper(action, attempt_times) = action;
341
342        // Only token transfer action should reach here
343        match &action {
344            BridgeAction::SuiToEthBridgeAction(_)
345            | BridgeAction::SuiToEthTokenTransfer(_)
346            | BridgeAction::SuiToEthTokenTransferV2(_)
347            | BridgeAction::EthToSuiBridgeAction(_)
348            | BridgeAction::EthToSuiTokenTransferV2(_) => (),
349            _ => unreachable!("Non token transfer action should not reach here"),
350        };
351
352        // If the action is already processed, skip it.
353        if Self::handle_already_processed_token_transfer_action_maybe(
354            &sui_client,
355            &action,
356            &store,
357            &metrics,
358        )
359        .await
360        {
361            return;
362        }
363        match auth_agg
364            .load()
365            .request_committee_signatures(action.clone())
366            .await
367        {
368            Ok(certificate) => {
369                info!("Sending certificate to execution");
370                execution_queue_sender
371                    .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
372                    .await
373                    .unwrap_or_else(|e| {
374                        panic!("Sending to execution queue should not fail: {:?}", e);
375                    });
376            }
377            Err(e) => {
378                warn!("Failed to collect sigs for bridge action: {:?}", e);
379                metrics.err_signature_aggregation.inc();
380
381                // TODO: spawn a task for this
382                if attempt_times >= MAX_SIGNING_ATTEMPTS {
383                    metrics.err_signature_aggregation_too_many_failures.inc();
384                    error!(
385                        "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
386                        e
387                    );
388                    return;
389                }
390                delay(attempt_times).await;
391                signing_queue_sender
392                    .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
393                    .await
394                    .unwrap_or_else(|e| {
395                        panic!("Sending to signing queue should not fail: {:?}", e);
396                    });
397            }
398        }
399    }
400
401    // Before calling this function, `key` and `sui_address` need to be
402    // verified to match.
403    async fn run_onchain_execution_loop(
404        sui_client: Arc<SuiClient<C>>,
405        sui_key: SuiKeyPair,
406        sui_address: SuiAddress,
407        gas_object_id: ObjectID,
408        store: Arc<BridgeOrchestratorTables>,
409        execution_queue_sender: mysten_metrics::metered_channel::Sender<
410            CertifiedBridgeActionExecutionWrapper,
411        >,
412        mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
413            CertifiedBridgeActionExecutionWrapper,
414        >,
415        bridge_object_arg: ObjectArg,
416        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
417        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
418        metrics: Arc<BridgeMetrics>,
419    ) {
420        info!("Starting run_onchain_execution_loop");
421        while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
422            // When bridge is paused, skip execution.
423            // Skipped actions will be picked up upon node restarting
424            // if bridge is unpaused.
425            if *bridge_pause_rx.borrow() {
426                warn!("Bridge is paused, skipping execution");
427                metrics
428                    .action_executor_execution_queue_skipped_actions_due_to_pausing
429                    .inc();
430                continue;
431            }
432            Self::handle_execution_task(
433                certificate_wrapper,
434                &sui_client,
435                &sui_key,
436                &sui_address,
437                gas_object_id,
438                &store,
439                &execution_queue_sender,
440                &bridge_object_arg,
441                &sui_token_type_tags,
442                &metrics,
443            )
444            .await;
445        }
446        panic!("Execution queue closed unexpectedly");
447    }
448
449    #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
450    async fn handle_execution_task(
451        certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
452        sui_client: &Arc<SuiClient<C>>,
453        sui_key: &SuiKeyPair,
454        sui_address: &SuiAddress,
455        gas_object_id: ObjectID,
456        store: &Arc<BridgeOrchestratorTables>,
457        execution_queue_sender: &mysten_metrics::metered_channel::Sender<
458            CertifiedBridgeActionExecutionWrapper,
459        >,
460        bridge_object_arg: &ObjectArg,
461        sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
462        metrics: &Arc<BridgeMetrics>,
463    ) {
464        metrics
465            .action_executor_execution_queue_received_actions
466            .inc();
467        let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
468        let action = certificate.data();
469        let action_key = action.key();
470
471        info!("Received certified action for execution: {:?}", action);
472
473        // TODO check gas coin balance here. If gas balance too low, do not proceed.
474        let (gas_coin, gas_object_ref) =
475            Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
476        metrics.gas_coin_balance.set(gas_coin.value() as i64);
477
478        let ceriticate_clone = certificate.clone();
479
480        // Check once: if the action is already processed, skip it.
481        if Self::handle_already_processed_token_transfer_action_maybe(
482            sui_client, action, store, metrics,
483        )
484        .await
485        {
486            info!("Action already processed, skipping");
487            return;
488        }
489
490        info!("Building Sui transaction");
491        let rgp = sui_client.get_reference_gas_price_until_success().await;
492        let tx_data = match build_sui_transaction(
493            *sui_address,
494            &gas_object_ref,
495            ceriticate_clone.clone(),
496            *bridge_object_arg,
497            sui_token_type_tags.load().as_ref(),
498            rgp,
499        ) {
500            Ok(tx_data) => tx_data,
501            Err(BridgeError::UnknownTokenId(token_id)) => {
502                // Token not found in local cache - it might be newly registered.
503                // Refresh token map from chain and retry.
504                info!(
505                    "Unknown token_id {}, refreshing token map from chain and retrying",
506                    token_id
507                );
508                match sui_client.get_token_id_map().await {
509                    Ok(new_token_map) => {
510                        sui_token_type_tags.store(Arc::new(new_token_map));
511                        // Retry building transaction with refreshed token map
512                        match build_sui_transaction(
513                            *sui_address,
514                            &gas_object_ref,
515                            ceriticate_clone,
516                            *bridge_object_arg,
517                            sui_token_type_tags.load().as_ref(),
518                            rgp,
519                        ) {
520                            Ok(tx_data) => tx_data,
521                            Err(err) => {
522                                metrics.err_build_sui_transaction.inc();
523                                error!(
524                                    "Manual intervention is required. Failed to build transaction after token map refresh for action {:?}: {:?}",
525                                    action, err
526                                );
527                                return;
528                            }
529                        }
530                    }
531                    Err(e) => {
532                        metrics.err_build_sui_transaction.inc();
533                        error!(
534                            "Manual intervention is required. Failed to refresh token map: {:?}",
535                            e
536                        );
537                        return;
538                    }
539                }
540            }
541            Err(err) => {
542                metrics.err_build_sui_transaction.inc();
543                error!(
544                    "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
545                    action, err
546                );
547                // This should not happen, but in case it does, we do not want to
548                // panic, instead we log here for manual intervention.
549                return;
550            }
551        };
552        let sig = Signature::new_secure(
553            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
554            sui_key,
555        );
556        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
557        let tx_digest = *signed_tx.digest();
558
559        // Check twice: If the action is already processed, skip it.
560        if Self::handle_already_processed_token_transfer_action_maybe(
561            sui_client, action, store, metrics,
562        )
563        .await
564        {
565            info!("Action already processed, skipping");
566            return;
567        }
568
569        info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
570        match sui_client
571            .execute_transaction_block_with_effects(signed_tx)
572            .await
573        {
574            Ok(resp) => {
575                Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
576            }
577
578            // If the transaction did not go through, retry up to a certain times.
579            Err(err) => {
580                error!(
581                    ?action_key,
582                    ?tx_digest,
583                    "Sui transaction failed at signing: {err:?}"
584                );
585                metrics.err_sui_transaction_submission.inc();
586                let metrics_clone = metrics.clone();
587                // Do this in a separate task so we won't deadlock here
588                let sender_clone = execution_queue_sender.clone();
589                spawn_logged_monitored_task!(async move {
590                    // If it fails for too many times, log and ask for manual intervention.
591                    if attempt_times >= MAX_EXECUTION_ATTEMPTS {
592                        metrics_clone
593                            .err_sui_transaction_submission_too_many_failures
594                            .inc();
595                        error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
596                        return;
597                    }
598                    delay(attempt_times).await;
599                    sender_clone
600                        .send(CertifiedBridgeActionExecutionWrapper(
601                            certificate,
602                            attempt_times + 1,
603                        ))
604                        .await
605                        .unwrap_or_else(|e| {
606                            panic!("Sending to execution queue should not fail: {:?}", e);
607                        });
608                    info!("Re-enqueued certificate for execution");
609                }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
610            }
611        }
612    }
613
614    // TODO: do we need a mechanism to periodically read pending actions from DB?
615    async fn handle_execution_effects(
616        tx_digest: TransactionDigest,
617        response: ExecuteTransactionResult,
618        store: &Arc<BridgeOrchestratorTables>,
619        action: &BridgeAction,
620        metrics: &Arc<BridgeMetrics>,
621    ) {
622        match &response.status {
623            SuiExecutionStatus::Success => {
624                let relevant_events = response
625                    .events
626                    .iter()
627                    .filter(|e| {
628                        e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
629                            || e.type_ == *TokenTransferClaimed.get().unwrap()
630                            || e.type_ == *TokenTransferApproved.get().unwrap()
631                            || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
632                    })
633                    .collect::<Vec<_>>();
634                assert!(
635                    !relevant_events.is_empty(),
636                    "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
637                    or TokenTransferAlreadyApproved event but got: {:?}",
638                    response.events
639                );
640                info!(?tx_digest, "Sui transaction executed successfully");
641                // track successful approval and claim events
642                relevant_events.iter().for_each(|e| {
643                    if e.type_ == *TokenTransferClaimed.get().unwrap() {
644                        match action {
645                            BridgeAction::EthToSuiBridgeAction(_)
646                            | BridgeAction::EthToSuiTokenTransferV2(_) => {
647                                metrics.eth_sui_token_transfer_claimed.inc();
648                            }
649                            BridgeAction::SuiToEthBridgeAction(_)
650                            | BridgeAction::SuiToEthTokenTransfer(_)
651                            | BridgeAction::SuiToEthTokenTransferV2(_) => {
652                                metrics.sui_eth_token_transfer_claimed.inc();
653                            }
654                            _ => error!("Unexpected action type for claimed event: {:?}", action),
655                        }
656                    } else if e.type_ == *TokenTransferApproved.get().unwrap() {
657                        match action {
658                            BridgeAction::EthToSuiBridgeAction(_)
659                            | BridgeAction::EthToSuiTokenTransferV2(_) => {
660                                metrics.eth_sui_token_transfer_approved.inc();
661                            }
662                            BridgeAction::SuiToEthBridgeAction(_)
663                            | BridgeAction::SuiToEthTokenTransfer(_)
664                            | BridgeAction::SuiToEthTokenTransferV2(_) => {
665                                metrics.sui_eth_token_transfer_approved.inc();
666                            }
667                            _ => error!("Unexpected action type for approved event: {:?}", action),
668                        }
669                    }
670                });
671                store
672                    .remove_pending_actions(&[action.digest()])
673                    .unwrap_or_else(|e| {
674                        panic!("Write to DB should not fail: {:?}", e);
675                    })
676            }
677            SuiExecutionStatus::Failure { error } => {
678                // In practice the transaction could fail because of running out of gas, but really
679                // should not be due to other reasons.
680                // This means manual intervention is needed. So we do not push them back to
681                // the execution queue because retries are mostly likely going to fail anyway.
682                // After human examination, the node should be restarted and fetch them from WAL.
683
684                metrics.err_sui_transaction_execution.inc();
685                error!(
686                    ?tx_digest,
687                    "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
688                );
689            }
690        }
691    }
692
693    /// Panics if the gas object is not owned by the address.
694    async fn get_gas_data_assert_ownership(
695        sui_address: SuiAddress,
696        gas_object_id: ObjectID,
697        sui_client: &SuiClient<C>,
698    ) -> (GasCoin, ObjectRef) {
699        let (gas_coin, gas_obj_ref, owner) = sui_client
700            .get_gas_data_panic_if_not_gas(gas_object_id)
701            .await;
702
703        // TODO: when we add multiple gas support in the future we could discard
704        // transferred gas object instead.
705        assert_eq!(
706            owner,
707            Owner::AddressOwner(sui_address),
708            "Gas object {:?} is no longer owned by address {}",
709            gas_object_id,
710            sui_address
711        );
712        (gas_coin, gas_obj_ref)
713    }
714}
715
716pub async fn submit_to_executor(
717    tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
718    action: BridgeAction,
719) -> Result<(), BridgeError> {
720    tx.send(BridgeActionExecutionWrapper(action, 0))
721        .await
722        .map_err(|e| BridgeError::Generic(e.to_string()))
723}
724
725#[cfg(test)]
726mod tests {
727    use crate::events::init_all_struct_tags;
728    use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
729    use crate::types::BRIDGE_PAUSED;
730    use fastcrypto::traits::KeyPair;
731    use mysten_common::ZipDebugEqIteratorExt;
732    use prometheus::Registry;
733    use std::collections::{BTreeMap, HashMap};
734    use std::str::FromStr;
735    use sui_json_rpc_types::SuiEvent;
736    use sui_types::TypeTag;
737    use sui_types::crypto::get_key_pair;
738    use sui_types::gas_coin::GasCoin;
739    use sui_types::{base_types::random_object_ref, transaction::TransactionData};
740
741    use crate::{
742        crypto::{
743            BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
744            BridgeAuthorityRecoverableSignature,
745        },
746        server::mock_handler::BridgeRequestMockHandler,
747        sui_mock_client::SuiMockClient,
748        test_utils::{
749            get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
750            get_test_sui_to_eth_bridge_action, sign_action_with_key,
751        },
752        types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
753    };
754
755    use super::*;
756
757    #[tokio::test]
758    async fn test_onchain_execution_loop() {
759        let (
760            signing_tx,
761            _execution_tx,
762            sui_client_mock,
763            mut tx_subscription,
764            store,
765            secrets,
766            dummy_sui_key,
767            mock0,
768            mock1,
769            mock2,
770            mock3,
771            _handles,
772            gas_object_ref,
773            sui_address,
774            sui_token_type_tags,
775            _bridge_pause_tx,
776        ) = setup().await;
777        let (action_certificate, _, _) = get_bridge_authority_approved_action(
778            vec![&mock0, &mock1, &mock2, &mock3],
779            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
780            None,
781            true,
782        );
783        let action = action_certificate.data().clone();
784        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
785        let tx_data = build_sui_transaction(
786            sui_address,
787            &gas_object_ref,
788            action_certificate,
789            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
790            &id_token_map,
791            1000,
792        )
793        .unwrap();
794
795        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
796
797        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
798        sui_client_mock.add_gas_object_info(
799            gas_coin.clone(),
800            gas_object_ref,
801            Owner::AddressOwner(sui_address),
802        );
803
804        // Mock the transaction to be successfully executed
805        let mut event = SuiEvent::random_for_testing();
806        event.type_ = TokenTransferClaimed.get().unwrap().clone();
807        let events = vec![event];
808        mock_transaction_response(
809            &sui_client_mock,
810            tx_digest,
811            SuiExecutionStatus::Success,
812            Some(events),
813            true,
814        );
815
816        store
817            .insert_pending_actions(std::slice::from_ref(&action))
818            .unwrap();
819        assert_eq!(
820            store.get_all_pending_actions()[&action.digest()],
821            action.clone()
822        );
823
824        // Kick it
825        submit_to_executor(&signing_tx, action.clone())
826            .await
827            .unwrap();
828
829        // Expect to see the transaction to be requested and successfully executed hence removed from WAL
830        tx_subscription.recv().await.unwrap();
831        assert!(store.get_all_pending_actions().is_empty());
832
833        /////////////////////////////////////////////////////////////////////////////////////////////////
834        ////////////////////////////////////// Test execution failure ///////////////////////////////////
835        /////////////////////////////////////////////////////////////////////////////////////////////////
836
837        let (action_certificate, _, _) = get_bridge_authority_approved_action(
838            vec![&mock0, &mock1, &mock2, &mock3],
839            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
840            None,
841            true,
842        );
843
844        let action = action_certificate.data().clone();
845
846        let tx_data = build_sui_transaction(
847            sui_address,
848            &gas_object_ref,
849            action_certificate,
850            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
851            &id_token_map,
852            1000,
853        )
854        .unwrap();
855        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
856
857        // Mock the transaction to fail
858        mock_transaction_response(
859            &sui_client_mock,
860            tx_digest,
861            SuiExecutionStatus::Failure {
862                error: "failure is mother of success".to_string(),
863            },
864            None,
865            true,
866        );
867
868        store
869            .insert_pending_actions(std::slice::from_ref(&action))
870            .unwrap();
871        assert_eq!(
872            store.get_all_pending_actions()[&action.digest()],
873            action.clone()
874        );
875
876        // Kick it
877        submit_to_executor(&signing_tx, action.clone())
878            .await
879            .unwrap();
880
881        // Expect to see the transaction to be requested and but failed
882        tx_subscription.recv().await.unwrap();
883        // The action is not removed from WAL because the transaction failed
884        assert_eq!(
885            store.get_all_pending_actions()[&action.digest()],
886            action.clone()
887        );
888
889        /////////////////////////////////////////////////////////////////////////////////////////////////
890        //////////////////////////// Test transaction failed at signing stage ///////////////////////////
891        /////////////////////////////////////////////////////////////////////////////////////////////////
892
893        let (action_certificate, _, _) = get_bridge_authority_approved_action(
894            vec![&mock0, &mock1, &mock2, &mock3],
895            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
896            None,
897            true,
898        );
899
900        let action = action_certificate.data().clone();
901
902        let tx_data = build_sui_transaction(
903            sui_address,
904            &gas_object_ref,
905            action_certificate,
906            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
907            &id_token_map,
908            1000,
909        )
910        .unwrap();
911        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
912        mock_transaction_error(
913            &sui_client_mock,
914            tx_digest,
915            BridgeError::Generic("some random error".to_string()),
916            true,
917        );
918
919        store
920            .insert_pending_actions(std::slice::from_ref(&action))
921            .unwrap();
922        assert_eq!(
923            store.get_all_pending_actions()[&action.digest()],
924            action.clone()
925        );
926
927        // Kick it
928        submit_to_executor(&signing_tx, action.clone())
929            .await
930            .unwrap();
931
932        // Failure will trigger retry, we wait for 2 requests before checking WAL log
933        let tx_digest = tx_subscription.recv().await.unwrap();
934        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
935
936        // The retry is still going on, action still in WAL
937        assert!(
938            store
939                .get_all_pending_actions()
940                .contains_key(&action.digest())
941        );
942
943        // Now let it succeed
944        let mut event = SuiEvent::random_for_testing();
945        event.type_ = TokenTransferClaimed.get().unwrap().clone();
946        let events = vec![event];
947        mock_transaction_response(
948            &sui_client_mock,
949            tx_digest,
950            SuiExecutionStatus::Success,
951            Some(events),
952            true,
953        );
954
955        // Give it 1 second to retry and succeed
956        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
957        // The action is successful and should be removed from WAL now
958        assert!(
959            !store
960                .get_all_pending_actions()
961                .contains_key(&action.digest())
962        );
963    }
964
965    #[tokio::test]
966    async fn test_signature_aggregation_loop() {
967        let (
968            signing_tx,
969            _execution_tx,
970            sui_client_mock,
971            mut tx_subscription,
972            store,
973            secrets,
974            dummy_sui_key,
975            mock0,
976            mock1,
977            mock2,
978            mock3,
979            _handles,
980            gas_object_ref,
981            sui_address,
982            sui_token_type_tags,
983            _bridge_pause_tx,
984        ) = setup().await;
985        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
986        let (action_certificate, sui_tx_digest, sui_tx_event_index) =
987            get_bridge_authority_approved_action(
988                vec![&mock0, &mock1, &mock2, &mock3],
989                vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
990                None,
991                true,
992            );
993        let action = action_certificate.data().clone();
994        mock_bridge_authority_signing_errors(
995            vec![&mock0, &mock1, &mock2],
996            sui_tx_digest,
997            sui_tx_event_index,
998        );
999        let mut sigs = mock_bridge_authority_sigs(
1000            vec![&mock3],
1001            &action,
1002            vec![&secrets[3]],
1003            sui_tx_digest,
1004            sui_tx_event_index,
1005        );
1006
1007        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1008        sui_client_mock.add_gas_object_info(
1009            gas_coin,
1010            gas_object_ref,
1011            Owner::AddressOwner(sui_address),
1012        );
1013        store
1014            .insert_pending_actions(std::slice::from_ref(&action))
1015            .unwrap();
1016        assert_eq!(
1017            store.get_all_pending_actions()[&action.digest()],
1018            action.clone()
1019        );
1020
1021        // Kick it
1022        submit_to_executor(&signing_tx, action.clone())
1023            .await
1024            .unwrap();
1025
1026        // Wait until the transaction is retried at least once (instead of deing dropped)
1027        loop {
1028            let requested_times =
1029                mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
1030            if requested_times >= 2 {
1031                break;
1032            }
1033            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1034        }
1035        // Nothing is sent to execute yet
1036        assert_eq!(
1037            tx_subscription.try_recv().unwrap_err(),
1038            tokio::sync::broadcast::error::TryRecvError::Empty
1039        );
1040        // Still in WAL
1041        assert_eq!(
1042            store.get_all_pending_actions()[&action.digest()],
1043            action.clone()
1044        );
1045
1046        // Let authorities sign the action too. Now we are above the threshold
1047        let sig_from_2 = mock_bridge_authority_sigs(
1048            vec![&mock2],
1049            &action,
1050            vec![&secrets[2]],
1051            sui_tx_digest,
1052            sui_tx_event_index,
1053        );
1054        sigs.extend(sig_from_2);
1055        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1056            action.clone(),
1057            BridgeCommitteeValiditySignInfo { signatures: sigs },
1058        );
1059        let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1060        let tx_data = build_sui_transaction(
1061            sui_address,
1062            &gas_object_ref,
1063            action_certificate,
1064            DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1065            &id_token_map,
1066            1000,
1067        )
1068        .unwrap();
1069        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1070
1071        let mut event = SuiEvent::random_for_testing();
1072        event.type_ = TokenTransferClaimed.get().unwrap().clone();
1073        let events = vec![event];
1074        mock_transaction_response(
1075            &sui_client_mock,
1076            tx_digest,
1077            SuiExecutionStatus::Success,
1078            Some(events),
1079            true,
1080        );
1081
1082        // Expect to see the transaction to be requested and succeed
1083        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1084        // The action is removed from WAL
1085        assert!(
1086            !store
1087                .get_all_pending_actions()
1088                .contains_key(&action.digest())
1089        );
1090    }
1091
1092    #[tokio::test]
1093    async fn test_skip_request_signature_if_already_processed_on_chain() {
1094        let (
1095            signing_tx,
1096            _execution_tx,
1097            sui_client_mock,
1098            mut tx_subscription,
1099            store,
1100            _secrets,
1101            _dummy_sui_key,
1102            mock0,
1103            mock1,
1104            mock2,
1105            mock3,
1106            _handles,
1107            _gas_object_ref,
1108            _sui_address,
1109            _sui_token_type_tags,
1110            _bridge_pause_tx,
1111        ) = setup().await;
1112
1113        let sui_tx_digest = TransactionDigest::random();
1114        let sui_tx_event_index = 0;
1115        let action = get_test_sui_to_eth_bridge_action(
1116            Some(sui_tx_digest),
1117            Some(sui_tx_event_index),
1118            None,
1119            None,
1120            None,
1121            None,
1122            None,
1123        );
1124        mock_bridge_authority_signing_errors(
1125            vec![&mock0, &mock1, &mock2, &mock3],
1126            sui_tx_digest,
1127            sui_tx_event_index,
1128        );
1129        store
1130            .insert_pending_actions(std::slice::from_ref(&action))
1131            .unwrap();
1132        assert_eq!(
1133            store.get_all_pending_actions()[&action.digest()],
1134            action.clone()
1135        );
1136
1137        // Kick it
1138        submit_to_executor(&signing_tx, action.clone())
1139            .await
1140            .unwrap();
1141        let action_digest = action.digest();
1142
1143        // Wait for 1 second. It should still in the process of retrying requesting sigs becaues we mock errors above.
1144        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1145        tx_subscription.try_recv().unwrap_err();
1146        // And the action is still in WAL
1147        assert!(store.get_all_pending_actions().contains_key(&action_digest));
1148
1149        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1150
1151        // The next retry will see the action is already processed on chain and remove it from WAL
1152        let now = std::time::Instant::now();
1153        while store.get_all_pending_actions().contains_key(&action_digest) {
1154            if now.elapsed().as_secs() > 10 {
1155                panic!("Timeout waiting for action to be removed from WAL");
1156            }
1157            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1158        }
1159        tx_subscription.try_recv().unwrap_err();
1160    }
1161
1162    #[tokio::test]
1163    async fn test_skip_tx_submission_if_already_processed_on_chain() {
1164        let (
1165            _signing_tx,
1166            execution_tx,
1167            sui_client_mock,
1168            mut tx_subscription,
1169            store,
1170            secrets,
1171            dummy_sui_key,
1172            mock0,
1173            mock1,
1174            mock2,
1175            mock3,
1176            _handles,
1177            gas_object_ref,
1178            sui_address,
1179            sui_token_type_tags,
1180            _bridge_pause_tx,
1181        ) = setup().await;
1182        let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1183        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1184            vec![&mock0, &mock1, &mock2, &mock3],
1185            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1186            None,
1187            true,
1188        );
1189
1190        let action = action_certificate.data().clone();
1191        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1192        let tx_data = build_sui_transaction(
1193            sui_address,
1194            &gas_object_ref,
1195            action_certificate.clone(),
1196            arg,
1197            &id_token_map,
1198            1000,
1199        )
1200        .unwrap();
1201        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1202        mock_transaction_error(
1203            &sui_client_mock,
1204            tx_digest,
1205            BridgeError::Generic("some random error".to_string()),
1206            true,
1207        );
1208
1209        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1210        sui_client_mock.add_gas_object_info(
1211            gas_coin.clone(),
1212            gas_object_ref,
1213            Owner::AddressOwner(sui_address),
1214        );
1215
1216        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1217
1218        store
1219            .insert_pending_actions(std::slice::from_ref(&action))
1220            .unwrap();
1221        assert_eq!(
1222            store.get_all_pending_actions()[&action.digest()],
1223            action.clone()
1224        );
1225
1226        // Kick it (send to the execution queue, skipping the signing queue)
1227        execution_tx
1228            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1229            .await
1230            .unwrap();
1231
1232        // Some requests come in and will fail.
1233        tx_subscription.recv().await.unwrap();
1234
1235        // Set the action to be already approved on chain
1236        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1237
1238        // The next retry will see the action is already processed on chain and remove it from WAL
1239        let now = std::time::Instant::now();
1240        let action_digest = action.digest();
1241        while store.get_all_pending_actions().contains_key(&action_digest) {
1242            if now.elapsed().as_secs() > 10 {
1243                panic!("Timeout waiting for action to be removed from WAL");
1244            }
1245            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1246        }
1247    }
1248
1249    #[tokio::test]
1250    async fn test_skip_tx_submission_if_bridge_is_paused() {
1251        let (
1252            _signing_tx,
1253            execution_tx,
1254            sui_client_mock,
1255            mut tx_subscription,
1256            store,
1257            secrets,
1258            dummy_sui_key,
1259            mock0,
1260            mock1,
1261            mock2,
1262            mock3,
1263            _handles,
1264            gas_object_ref,
1265            sui_address,
1266            sui_token_type_tags,
1267            bridge_pause_tx,
1268        ) = setup().await;
1269        let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1270        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1271            vec![&mock0, &mock1, &mock2, &mock3],
1272            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1273            None,
1274            true,
1275        );
1276
1277        let action = action_certificate.data().clone();
1278        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1279        let tx_data = build_sui_transaction(
1280            sui_address,
1281            &gas_object_ref,
1282            action_certificate.clone(),
1283            arg,
1284            &id_token_map,
1285            1000,
1286        )
1287        .unwrap();
1288        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1289        mock_transaction_error(
1290            &sui_client_mock,
1291            tx_digest,
1292            BridgeError::Generic("some random error".to_string()),
1293            true,
1294        );
1295
1296        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1297        sui_client_mock.add_gas_object_info(
1298            gas_coin.clone(),
1299            gas_object_ref,
1300            Owner::AddressOwner(sui_address),
1301        );
1302        let action_digest = action.digest();
1303        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1304
1305        // assert bridge is unpaused now
1306        assert!(!*bridge_pause_tx.borrow());
1307
1308        store
1309            .insert_pending_actions(std::slice::from_ref(&action))
1310            .unwrap();
1311        assert_eq!(
1312            store.get_all_pending_actions()[&action.digest()],
1313            action.clone()
1314        );
1315
1316        // Kick it (send to the execution queue, skipping the signing queue)
1317        execution_tx
1318            .send(CertifiedBridgeActionExecutionWrapper(
1319                action_certificate.clone(),
1320                0,
1321            ))
1322            .await
1323            .unwrap();
1324
1325        // Some requests come in
1326        tx_subscription.recv().await.unwrap();
1327
1328        // Pause the bridge
1329        bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1330
1331        // Kick it again
1332        execution_tx
1333            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1334            .await
1335            .unwrap();
1336
1337        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1338        // Nothing is sent to execute
1339        assert_eq!(
1340            tx_subscription.try_recv().unwrap_err(),
1341            tokio::sync::broadcast::error::TryRecvError::Empty
1342        );
1343        // Still in WAL
1344        assert_eq!(
1345            store.get_all_pending_actions()[&action_digest],
1346            action.clone()
1347        );
1348    }
1349
1350    #[tokio::test]
1351    async fn test_action_executor_handle_new_token() {
1352        let new_token_id = 255u8; // token id that does not exist
1353        let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1354        let (
1355            _signing_tx,
1356            execution_tx,
1357            sui_client_mock,
1358            mut tx_subscription,
1359            _store,
1360            secrets,
1361            dummy_sui_key,
1362            mock0,
1363            mock1,
1364            mock2,
1365            mock3,
1366            _handles,
1367            gas_object_ref,
1368            sui_address,
1369            sui_token_type_tags,
1370            _bridge_pause_tx,
1371        ) = setup().await;
1372        let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1373        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1374            vec![&mock0, &mock1, &mock2, &mock3],
1375            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1376            Some(new_token_id),
1377            false, // we need an eth -> sui action that entails the new token type tag in transaction building
1378        );
1379
1380        let action = action_certificate.data().clone();
1381        let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1382        let tx_data = build_sui_transaction(
1383            sui_address,
1384            &gas_object_ref,
1385            action_certificate.clone(),
1386            arg,
1387            &maplit::hashmap! {
1388                new_token_id => new_type_tag.clone()
1389            },
1390            1000,
1391        )
1392        .unwrap();
1393        let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1394        mock_transaction_error(
1395            &sui_client_mock,
1396            tx_digest,
1397            BridgeError::Generic("some random error".to_string()),
1398            true,
1399        );
1400
1401        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1402        sui_client_mock.add_gas_object_info(
1403            gas_coin.clone(),
1404            gas_object_ref,
1405            Owner::AddressOwner(sui_address),
1406        );
1407        sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1408
1409        // Kick it (send to the execution queue, skipping the signing queue)
1410        execution_tx
1411            .send(CertifiedBridgeActionExecutionWrapper(
1412                action_certificate.clone(),
1413                0,
1414            ))
1415            .await
1416            .unwrap();
1417
1418        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1419        // Nothing is sent to execute, because the token id does not exist yet
1420        assert_eq!(
1421            tx_subscription.try_recv().unwrap_err(),
1422            tokio::sync::broadcast::error::TryRecvError::Empty
1423        );
1424
1425        // Now insert the new token id
1426        id_token_map.insert(new_token_id, new_type_tag);
1427        sui_token_type_tags.store(Arc::new(id_token_map));
1428
1429        // Kick it again
1430        execution_tx
1431            .send(CertifiedBridgeActionExecutionWrapper(
1432                action_certificate.clone(),
1433                0,
1434            ))
1435            .await
1436            .unwrap();
1437
1438        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1439        // The action is sent to execution
1440        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1441    }
1442
1443    fn mock_bridge_authority_sigs(
1444        mocks: Vec<&BridgeRequestMockHandler>,
1445        action: &BridgeAction,
1446        secrets: Vec<&BridgeAuthorityKeyPair>,
1447        sui_tx_digest: TransactionDigest,
1448        sui_tx_event_index: u16,
1449    ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1450        assert_eq!(mocks.len(), secrets.len());
1451        let mut signed_actions = BTreeMap::new();
1452        for (mock, secret) in mocks.iter().zip_debug_eq(secrets.iter()) {
1453            let signed_action = sign_action_with_key(action, secret);
1454            mock.add_sui_event_response(
1455                sui_tx_digest,
1456                sui_tx_event_index,
1457                Ok(signed_action.clone()),
1458                None,
1459            );
1460            signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1461        }
1462        signed_actions
1463    }
1464
1465    fn mock_bridge_authority_signing_errors(
1466        mocks: Vec<&BridgeRequestMockHandler>,
1467        sui_tx_digest: TransactionDigest,
1468        sui_tx_event_index: u16,
1469    ) {
1470        for mock in mocks {
1471            mock.add_sui_event_response(
1472                sui_tx_digest,
1473                sui_tx_event_index,
1474                Err(BridgeError::RestAPIError("small issue".into())),
1475                None,
1476            );
1477        }
1478    }
1479
1480    /// Create a BridgeAction and mock authorities to return signatures
1481    fn get_bridge_authority_approved_action(
1482        mocks: Vec<&BridgeRequestMockHandler>,
1483        secrets: Vec<&BridgeAuthorityKeyPair>,
1484        token_id: Option<u8>,
1485        sui_to_eth: bool,
1486    ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1487        let sui_tx_digest = TransactionDigest::random();
1488        let sui_tx_event_index = 1;
1489        let action = if sui_to_eth {
1490            get_test_sui_to_eth_bridge_action(
1491                Some(sui_tx_digest),
1492                Some(sui_tx_event_index),
1493                None,
1494                None,
1495                None,
1496                None,
1497                token_id,
1498            )
1499        } else {
1500            get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1501        };
1502
1503        let sigs =
1504            mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1505        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1506            action,
1507            BridgeCommitteeValiditySignInfo { signatures: sigs },
1508        );
1509        (
1510            VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1511            sui_tx_digest,
1512            sui_tx_event_index,
1513        )
1514    }
1515
1516    fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1517        let sig = Signature::new_secure(
1518            &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1519            dummy_sui_key,
1520        );
1521        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1522        *signed_tx.digest()
1523    }
1524
1525    /// Why is `wildcard` needed? This is because authority signatures
1526    /// are part of transaction data. Depending on whose signatures
1527    /// are included in what order, this may change the tx digest.
1528    fn mock_transaction_response(
1529        sui_client_mock: &SuiMockClient,
1530        tx_digest: TransactionDigest,
1531        status: SuiExecutionStatus,
1532        events: Option<Vec<SuiEvent>>,
1533        wildcard: bool,
1534    ) {
1535        let response = ExecuteTransactionResult {
1536            status,
1537            events: events.unwrap_or_default(),
1538        };
1539        if wildcard {
1540            sui_client_mock.set_wildcard_transaction_response(Ok(response));
1541        } else {
1542            sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1543        }
1544    }
1545
1546    fn mock_transaction_error(
1547        sui_client_mock: &SuiMockClient,
1548        tx_digest: TransactionDigest,
1549        error: BridgeError,
1550        wildcard: bool,
1551    ) {
1552        if wildcard {
1553            sui_client_mock.set_wildcard_transaction_response(Err(error));
1554        } else {
1555            sui_client_mock.add_transaction_response(tx_digest, Err(error));
1556        }
1557    }
1558
1559    #[allow(clippy::type_complexity)]
1560    async fn setup() -> (
1561        mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1562        mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1563        SuiMockClient,
1564        tokio::sync::broadcast::Receiver<TransactionDigest>,
1565        Arc<BridgeOrchestratorTables>,
1566        Vec<BridgeAuthorityKeyPair>,
1567        SuiKeyPair,
1568        BridgeRequestMockHandler,
1569        BridgeRequestMockHandler,
1570        BridgeRequestMockHandler,
1571        BridgeRequestMockHandler,
1572        Vec<tokio::task::JoinHandle<()>>,
1573        ObjectRef,
1574        SuiAddress,
1575        Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1576        tokio::sync::watch::Sender<IsBridgePaused>,
1577    ) {
1578        telemetry_subscribers::init_for_testing();
1579        let registry = Registry::new();
1580        mysten_metrics::init_metrics(&registry);
1581        init_all_struct_tags();
1582
1583        let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1584        let sui_key = SuiKeyPair::from(kp);
1585        let gas_object_ref = random_object_ref();
1586        let temp_dir = tempfile::tempdir().unwrap();
1587        let store = BridgeOrchestratorTables::new(temp_dir.path());
1588        let sui_client_mock = SuiMockClient::default();
1589        let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1590        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1591
1592        // The dummy key is used to sign transaction so we can get TransactionDigest easily.
1593        // User signature is not part of the transaction so it does not matter which key it is.
1594        let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1595        let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1596
1597        let mock0 = BridgeRequestMockHandler::new();
1598        let mock1 = BridgeRequestMockHandler::new();
1599        let mock2 = BridgeRequestMockHandler::new();
1600        let mock3 = BridgeRequestMockHandler::new();
1601
1602        let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1603            vec![2500, 2500, 2500, 2500],
1604            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1605        );
1606
1607        let committee = BridgeCommittee::new(authorities).unwrap();
1608
1609        let agg = Arc::new(ArcSwap::new(Arc::new(
1610            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1611        )));
1612        let metrics = Arc::new(BridgeMetrics::new(&registry));
1613        let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1614        let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1615        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1616        let executor = BridgeActionExecutor::new(
1617            sui_client.clone(),
1618            agg.clone(),
1619            store.clone(),
1620            sui_key,
1621            sui_address,
1622            gas_object_ref.0,
1623            sui_token_type_tags.clone(),
1624            bridge_pause_rx,
1625            metrics,
1626        )
1627        .await;
1628
1629        let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1630        handles.extend(executor_handle);
1631
1632        (
1633            signing_tx,
1634            execution_tx,
1635            sui_client_mock,
1636            tx_subscription,
1637            store,
1638            secrets,
1639            dummy_sui_key,
1640            mock0,
1641            mock1,
1642            mock2,
1643            mock3,
1644            handles,
1645            gas_object_ref,
1646            sui_address,
1647            sui_token_type_tags,
1648            bridge_pause_tx,
1649        )
1650    }
1651}