1use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::SuiExecutionStatus;
13use sui_types::TypeTag;
14use sui_types::transaction::ObjectArg;
15use sui_types::{
16 base_types::{ObjectID, ObjectRef, SuiAddress},
17 crypto::{Signature, SuiKeyPair},
18 digests::TransactionDigest,
19 gas_coin::GasCoin,
20 object::Owner,
21 transaction::Transaction,
22};
23
24use crate::events::{
25 TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
26 TokenTransferClaimed,
27};
28use crate::metrics::BridgeMetrics;
29use crate::{
30 client::bridge_authority_aggregator::BridgeAuthorityAggregator,
31 error::BridgeError,
32 storage::BridgeOrchestratorTables,
33 sui_client::{ExecuteTransactionResult, SuiClient, SuiClientInner},
34 sui_transaction_builder::build_sui_transaction,
35 types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
36};
37use std::collections::HashMap;
38use std::sync::Arc;
39use tokio::sync::Semaphore;
40use tokio::time::Duration;
41use tracing::{Instrument, error, info, instrument, warn};
42
43pub const CHANNEL_SIZE: usize = 1000;
44pub const SIGNING_CONCURRENCY: usize = 10;
45
46pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
49pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
50
51async fn delay(attempt_times: u64) {
52 let delay_ms = 100 * 2_u64.pow(attempt_times as u32);
53 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
54}
55
56#[derive(Debug)]
57pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
58
59#[derive(Debug)]
60pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
61
62pub trait BridgeActionExecutorTrait {
63 fn run(
64 self,
65 ) -> (
66 Vec<tokio::task::JoinHandle<()>>,
67 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
68 );
69}
70
71pub struct BridgeActionExecutor<C> {
72 sui_client: Arc<SuiClient<C>>,
73 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
74 key: SuiKeyPair,
75 sui_address: SuiAddress,
76 gas_object_id: ObjectID,
77 store: Arc<BridgeOrchestratorTables>,
78 bridge_object_arg: ObjectArg,
79 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
80 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
81 metrics: Arc<BridgeMetrics>,
82}
83
84impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
85where
86 C: SuiClientInner + 'static,
87{
88 fn run(
89 self,
90 ) -> (
91 Vec<tokio::task::JoinHandle<()>>,
92 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
93 ) {
94 let (tasks, sender, _) = self.run_inner();
95 (tasks, sender)
96 }
97}
98
99impl<C> BridgeActionExecutor<C>
100where
101 C: SuiClientInner + 'static,
102{
103 pub async fn new(
104 sui_client: Arc<SuiClient<C>>,
105 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
106 store: Arc<BridgeOrchestratorTables>,
107 key: SuiKeyPair,
108 sui_address: SuiAddress,
109 gas_object_id: ObjectID,
110 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
111 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
112 metrics: Arc<BridgeMetrics>,
113 ) -> Self {
114 let bridge_object_arg = sui_client
115 .get_mutable_bridge_object_arg_must_succeed()
116 .await;
117 Self {
118 sui_client,
119 bridge_auth_agg,
120 store,
121 key,
122 gas_object_id,
123 sui_address,
124 bridge_object_arg,
125 sui_token_type_tags,
126 bridge_pause_rx,
127 metrics,
128 }
129 }
130
131 fn run_inner(
132 self,
133 ) -> (
134 Vec<tokio::task::JoinHandle<()>>,
135 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
136 mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
137 ) {
138 let key = self.key;
139
140 let (sender, receiver) = mysten_metrics::metered_channel::channel(
141 CHANNEL_SIZE,
142 &mysten_metrics::get_metrics()
143 .unwrap()
144 .channel_inflight
145 .with_label_values(&["executor_signing_queue"]),
146 );
147
148 let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
149 CHANNEL_SIZE,
150 &mysten_metrics::get_metrics()
151 .unwrap()
152 .channel_inflight
153 .with_label_values(&["executor_execution_queue"]),
154 );
155 let execution_tx_clone = execution_tx.clone();
156 let sender_clone = sender.clone();
157 let store_clone = self.store.clone();
158 let client_clone = self.sui_client.clone();
159 let mut tasks = vec![];
160 let metrics = self.metrics.clone();
161 tasks.push(spawn_logged_monitored_task!(
162 Self::run_signature_aggregation_loop(
163 client_clone,
164 self.bridge_auth_agg,
165 store_clone,
166 sender_clone,
167 receiver,
168 execution_tx_clone,
169 metrics,
170 )
171 ));
172
173 let metrics = self.metrics.clone();
174 let execution_tx_clone = execution_tx.clone();
175 tasks.push(spawn_logged_monitored_task!(
176 Self::run_onchain_execution_loop(
177 self.sui_client.clone(),
178 key,
179 self.sui_address,
180 self.gas_object_id,
181 self.store.clone(),
182 execution_tx_clone,
183 execution_rx,
184 self.bridge_object_arg,
185 self.sui_token_type_tags,
186 self.bridge_pause_rx,
187 metrics,
188 )
189 ));
190 (tasks, sender, execution_tx)
191 }
192
193 async fn run_signature_aggregation_loop(
194 sui_client: Arc<SuiClient<C>>,
195 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
196 store: Arc<BridgeOrchestratorTables>,
197 signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
198 mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
199 BridgeActionExecutionWrapper,
200 >,
201 execution_queue_sender: mysten_metrics::metered_channel::Sender<
202 CertifiedBridgeActionExecutionWrapper,
203 >,
204 metrics: Arc<BridgeMetrics>,
205 ) {
206 info!("Starting run_signature_aggregation_loop");
207 let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
208 while let Some(action) = signing_queue_receiver.recv().await {
209 Self::handle_signing_task(
210 &semaphore,
211 &auth_agg,
212 &signing_queue_sender,
213 &execution_queue_sender,
214 &sui_client,
215 &store,
216 action,
217 &metrics,
218 )
219 .await;
220 }
221 }
222
223 async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
224 let Ok(Ok(is_paused)) =
225 retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
226 else {
227 error!("Failed to get bridge status after retry");
228 return false;
229 };
230 !is_paused
231 }
232
233 #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
234 async fn handle_signing_task(
235 semaphore: &Arc<Semaphore>,
236 auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
237 signing_queue_sender: &mysten_metrics::metered_channel::Sender<
238 BridgeActionExecutionWrapper,
239 >,
240 execution_queue_sender: &mysten_metrics::metered_channel::Sender<
241 CertifiedBridgeActionExecutionWrapper,
242 >,
243 sui_client: &Arc<SuiClient<C>>,
244 store: &Arc<BridgeOrchestratorTables>,
245 action: BridgeActionExecutionWrapper,
246 metrics: &Arc<BridgeMetrics>,
247 ) {
248 metrics.action_executor_signing_queue_received_actions.inc();
249 let action_key = action.0.key();
250 info!("Received action for signing: {:?}", action.0);
251
252 let should_proceed = Self::should_proceed_signing(sui_client).await;
257 if !should_proceed {
258 metrics.action_executor_signing_queue_skipped_actions.inc();
259 warn!("skipping signing task: {:?}", action_key);
260 return;
261 }
262
263 let auth_agg_clone = auth_agg.clone();
264 let signing_queue_sender_clone = signing_queue_sender.clone();
265 let execution_queue_sender_clone = execution_queue_sender.clone();
266 let sui_client_clone = sui_client.clone();
267 let store_clone = store.clone();
268 let metrics_clone = metrics.clone();
269 let semaphore_clone = semaphore.clone();
270 spawn_logged_monitored_task!(
271 Self::request_signatures(
272 semaphore_clone,
273 sui_client_clone,
274 auth_agg_clone,
275 action,
276 store_clone,
277 signing_queue_sender_clone,
278 execution_queue_sender_clone,
279 metrics_clone,
280 )
281 .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
282 "request_signatures"
283 );
284 }
285
286 async fn handle_already_processed_token_transfer_action_maybe(
290 sui_client: &Arc<SuiClient<C>>,
291 action: &BridgeAction,
292 store: &Arc<BridgeOrchestratorTables>,
293 metrics: &Arc<BridgeMetrics>,
294 ) -> bool {
295 let status = sui_client
296 .get_token_transfer_action_onchain_status_until_success(
297 action.chain_id() as u8,
298 action.seq_number(),
299 )
300 .await;
301 match status {
302 BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
303 info!(
304 "Action already approved or claimed, removing action from pending logs: {:?}",
305 action
306 );
307 metrics.action_executor_already_processed_actions.inc();
308 store
309 .remove_pending_actions(&[action.digest()])
310 .unwrap_or_else(|e| {
311 panic!("Write to DB should not fail: {:?}", e);
312 });
313 true
314 }
315 BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
318 }
319 }
320
321 async fn request_signatures(
324 semaphore: Arc<Semaphore>,
325 sui_client: Arc<SuiClient<C>>,
326 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
327 action: BridgeActionExecutionWrapper,
328 store: Arc<BridgeOrchestratorTables>,
329 signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
330 execution_queue_sender: mysten_metrics::metered_channel::Sender<
331 CertifiedBridgeActionExecutionWrapper,
332 >,
333 metrics: Arc<BridgeMetrics>,
334 ) {
335 let _permit = semaphore
336 .acquire()
337 .await
338 .expect("semaphore should not be closed");
339 info!("requesting signatures");
340 let BridgeActionExecutionWrapper(action, attempt_times) = action;
341
342 match &action {
344 BridgeAction::SuiToEthBridgeAction(_)
345 | BridgeAction::SuiToEthTokenTransfer(_)
346 | BridgeAction::SuiToEthTokenTransferV2(_)
347 | BridgeAction::EthToSuiBridgeAction(_)
348 | BridgeAction::EthToSuiTokenTransferV2(_) => (),
349 _ => unreachable!("Non token transfer action should not reach here"),
350 };
351
352 if Self::handle_already_processed_token_transfer_action_maybe(
354 &sui_client,
355 &action,
356 &store,
357 &metrics,
358 )
359 .await
360 {
361 return;
362 }
363 match auth_agg
364 .load()
365 .request_committee_signatures(action.clone())
366 .await
367 {
368 Ok(certificate) => {
369 info!("Sending certificate to execution");
370 execution_queue_sender
371 .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
372 .await
373 .unwrap_or_else(|e| {
374 panic!("Sending to execution queue should not fail: {:?}", e);
375 });
376 }
377 Err(e) => {
378 warn!("Failed to collect sigs for bridge action: {:?}", e);
379 metrics.err_signature_aggregation.inc();
380
381 if attempt_times >= MAX_SIGNING_ATTEMPTS {
383 metrics.err_signature_aggregation_too_many_failures.inc();
384 error!(
385 "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
386 e
387 );
388 return;
389 }
390 delay(attempt_times).await;
391 signing_queue_sender
392 .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
393 .await
394 .unwrap_or_else(|e| {
395 panic!("Sending to signing queue should not fail: {:?}", e);
396 });
397 }
398 }
399 }
400
401 async fn run_onchain_execution_loop(
404 sui_client: Arc<SuiClient<C>>,
405 sui_key: SuiKeyPair,
406 sui_address: SuiAddress,
407 gas_object_id: ObjectID,
408 store: Arc<BridgeOrchestratorTables>,
409 execution_queue_sender: mysten_metrics::metered_channel::Sender<
410 CertifiedBridgeActionExecutionWrapper,
411 >,
412 mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
413 CertifiedBridgeActionExecutionWrapper,
414 >,
415 bridge_object_arg: ObjectArg,
416 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
417 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
418 metrics: Arc<BridgeMetrics>,
419 ) {
420 info!("Starting run_onchain_execution_loop");
421 while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
422 if *bridge_pause_rx.borrow() {
426 warn!("Bridge is paused, skipping execution");
427 metrics
428 .action_executor_execution_queue_skipped_actions_due_to_pausing
429 .inc();
430 continue;
431 }
432 Self::handle_execution_task(
433 certificate_wrapper,
434 &sui_client,
435 &sui_key,
436 &sui_address,
437 gas_object_id,
438 &store,
439 &execution_queue_sender,
440 &bridge_object_arg,
441 &sui_token_type_tags,
442 &metrics,
443 )
444 .await;
445 }
446 panic!("Execution queue closed unexpectedly");
447 }
448
449 #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
450 async fn handle_execution_task(
451 certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
452 sui_client: &Arc<SuiClient<C>>,
453 sui_key: &SuiKeyPair,
454 sui_address: &SuiAddress,
455 gas_object_id: ObjectID,
456 store: &Arc<BridgeOrchestratorTables>,
457 execution_queue_sender: &mysten_metrics::metered_channel::Sender<
458 CertifiedBridgeActionExecutionWrapper,
459 >,
460 bridge_object_arg: &ObjectArg,
461 sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
462 metrics: &Arc<BridgeMetrics>,
463 ) {
464 metrics
465 .action_executor_execution_queue_received_actions
466 .inc();
467 let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
468 let action = certificate.data();
469 let action_key = action.key();
470
471 info!("Received certified action for execution: {:?}", action);
472
473 let (gas_coin, gas_object_ref) =
475 Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
476 metrics.gas_coin_balance.set(gas_coin.value() as i64);
477
478 let ceriticate_clone = certificate.clone();
479
480 if Self::handle_already_processed_token_transfer_action_maybe(
482 sui_client, action, store, metrics,
483 )
484 .await
485 {
486 info!("Action already processed, skipping");
487 return;
488 }
489
490 info!("Building Sui transaction");
491 let rgp = sui_client.get_reference_gas_price_until_success().await;
492 let tx_data = match build_sui_transaction(
493 *sui_address,
494 &gas_object_ref,
495 ceriticate_clone.clone(),
496 *bridge_object_arg,
497 sui_token_type_tags.load().as_ref(),
498 rgp,
499 ) {
500 Ok(tx_data) => tx_data,
501 Err(BridgeError::UnknownTokenId(token_id)) => {
502 info!(
505 "Unknown token_id {}, refreshing token map from chain and retrying",
506 token_id
507 );
508 match sui_client.get_token_id_map().await {
509 Ok(new_token_map) => {
510 sui_token_type_tags.store(Arc::new(new_token_map));
511 match build_sui_transaction(
513 *sui_address,
514 &gas_object_ref,
515 ceriticate_clone,
516 *bridge_object_arg,
517 sui_token_type_tags.load().as_ref(),
518 rgp,
519 ) {
520 Ok(tx_data) => tx_data,
521 Err(err) => {
522 metrics.err_build_sui_transaction.inc();
523 error!(
524 "Manual intervention is required. Failed to build transaction after token map refresh for action {:?}: {:?}",
525 action, err
526 );
527 return;
528 }
529 }
530 }
531 Err(e) => {
532 metrics.err_build_sui_transaction.inc();
533 error!(
534 "Manual intervention is required. Failed to refresh token map: {:?}",
535 e
536 );
537 return;
538 }
539 }
540 }
541 Err(err) => {
542 metrics.err_build_sui_transaction.inc();
543 error!(
544 "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
545 action, err
546 );
547 return;
550 }
551 };
552 let sig = Signature::new_secure(
553 &IntentMessage::new(Intent::sui_transaction(), &tx_data),
554 sui_key,
555 );
556 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
557 let tx_digest = *signed_tx.digest();
558
559 if Self::handle_already_processed_token_transfer_action_maybe(
561 sui_client, action, store, metrics,
562 )
563 .await
564 {
565 info!("Action already processed, skipping");
566 return;
567 }
568
569 info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
570 match sui_client
571 .execute_transaction_block_with_effects(signed_tx)
572 .await
573 {
574 Ok(resp) => {
575 Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
576 }
577
578 Err(err) => {
580 error!(
581 ?action_key,
582 ?tx_digest,
583 "Sui transaction failed at signing: {err:?}"
584 );
585 metrics.err_sui_transaction_submission.inc();
586 let metrics_clone = metrics.clone();
587 let sender_clone = execution_queue_sender.clone();
589 spawn_logged_monitored_task!(async move {
590 if attempt_times >= MAX_EXECUTION_ATTEMPTS {
592 metrics_clone
593 .err_sui_transaction_submission_too_many_failures
594 .inc();
595 error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
596 return;
597 }
598 delay(attempt_times).await;
599 sender_clone
600 .send(CertifiedBridgeActionExecutionWrapper(
601 certificate,
602 attempt_times + 1,
603 ))
604 .await
605 .unwrap_or_else(|e| {
606 panic!("Sending to execution queue should not fail: {:?}", e);
607 });
608 info!("Re-enqueued certificate for execution");
609 }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
610 }
611 }
612 }
613
614 async fn handle_execution_effects(
616 tx_digest: TransactionDigest,
617 response: ExecuteTransactionResult,
618 store: &Arc<BridgeOrchestratorTables>,
619 action: &BridgeAction,
620 metrics: &Arc<BridgeMetrics>,
621 ) {
622 match &response.status {
623 SuiExecutionStatus::Success => {
624 let relevant_events = response
625 .events
626 .iter()
627 .filter(|e| {
628 e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
629 || e.type_ == *TokenTransferClaimed.get().unwrap()
630 || e.type_ == *TokenTransferApproved.get().unwrap()
631 || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
632 })
633 .collect::<Vec<_>>();
634 assert!(
635 !relevant_events.is_empty(),
636 "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
637 or TokenTransferAlreadyApproved event but got: {:?}",
638 response.events
639 );
640 info!(?tx_digest, "Sui transaction executed successfully");
641 relevant_events.iter().for_each(|e| {
643 if e.type_ == *TokenTransferClaimed.get().unwrap() {
644 match action {
645 BridgeAction::EthToSuiBridgeAction(_)
646 | BridgeAction::EthToSuiTokenTransferV2(_) => {
647 metrics.eth_sui_token_transfer_claimed.inc();
648 }
649 BridgeAction::SuiToEthBridgeAction(_)
650 | BridgeAction::SuiToEthTokenTransfer(_)
651 | BridgeAction::SuiToEthTokenTransferV2(_) => {
652 metrics.sui_eth_token_transfer_claimed.inc();
653 }
654 _ => error!("Unexpected action type for claimed event: {:?}", action),
655 }
656 } else if e.type_ == *TokenTransferApproved.get().unwrap() {
657 match action {
658 BridgeAction::EthToSuiBridgeAction(_)
659 | BridgeAction::EthToSuiTokenTransferV2(_) => {
660 metrics.eth_sui_token_transfer_approved.inc();
661 }
662 BridgeAction::SuiToEthBridgeAction(_)
663 | BridgeAction::SuiToEthTokenTransfer(_)
664 | BridgeAction::SuiToEthTokenTransferV2(_) => {
665 metrics.sui_eth_token_transfer_approved.inc();
666 }
667 _ => error!("Unexpected action type for approved event: {:?}", action),
668 }
669 }
670 });
671 store
672 .remove_pending_actions(&[action.digest()])
673 .unwrap_or_else(|e| {
674 panic!("Write to DB should not fail: {:?}", e);
675 })
676 }
677 SuiExecutionStatus::Failure { error } => {
678 metrics.err_sui_transaction_execution.inc();
685 error!(
686 ?tx_digest,
687 "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
688 );
689 }
690 }
691 }
692
693 async fn get_gas_data_assert_ownership(
695 sui_address: SuiAddress,
696 gas_object_id: ObjectID,
697 sui_client: &SuiClient<C>,
698 ) -> (GasCoin, ObjectRef) {
699 let (gas_coin, gas_obj_ref, owner) = sui_client
700 .get_gas_data_panic_if_not_gas(gas_object_id)
701 .await;
702
703 assert_eq!(
706 owner,
707 Owner::AddressOwner(sui_address),
708 "Gas object {:?} is no longer owned by address {}",
709 gas_object_id,
710 sui_address
711 );
712 (gas_coin, gas_obj_ref)
713 }
714}
715
716pub async fn submit_to_executor(
717 tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
718 action: BridgeAction,
719) -> Result<(), BridgeError> {
720 tx.send(BridgeActionExecutionWrapper(action, 0))
721 .await
722 .map_err(|e| BridgeError::Generic(e.to_string()))
723}
724
725#[cfg(test)]
726mod tests {
727 use crate::events::init_all_struct_tags;
728 use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
729 use crate::types::BRIDGE_PAUSED;
730 use fastcrypto::traits::KeyPair;
731 use mysten_common::ZipDebugEqIteratorExt;
732 use prometheus::Registry;
733 use std::collections::{BTreeMap, HashMap};
734 use std::str::FromStr;
735 use sui_json_rpc_types::SuiEvent;
736 use sui_types::TypeTag;
737 use sui_types::crypto::get_key_pair;
738 use sui_types::gas_coin::GasCoin;
739 use sui_types::{base_types::random_object_ref, transaction::TransactionData};
740
741 use crate::{
742 crypto::{
743 BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
744 BridgeAuthorityRecoverableSignature,
745 },
746 server::mock_handler::BridgeRequestMockHandler,
747 sui_mock_client::SuiMockClient,
748 test_utils::{
749 get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
750 get_test_sui_to_eth_bridge_action, sign_action_with_key,
751 },
752 types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
753 };
754
755 use super::*;
756
757 #[tokio::test]
758 async fn test_onchain_execution_loop() {
759 let (
760 signing_tx,
761 _execution_tx,
762 sui_client_mock,
763 mut tx_subscription,
764 store,
765 secrets,
766 dummy_sui_key,
767 mock0,
768 mock1,
769 mock2,
770 mock3,
771 _handles,
772 gas_object_ref,
773 sui_address,
774 sui_token_type_tags,
775 _bridge_pause_tx,
776 ) = setup().await;
777 let (action_certificate, _, _) = get_bridge_authority_approved_action(
778 vec![&mock0, &mock1, &mock2, &mock3],
779 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
780 None,
781 true,
782 );
783 let action = action_certificate.data().clone();
784 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
785 let tx_data = build_sui_transaction(
786 sui_address,
787 &gas_object_ref,
788 action_certificate,
789 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
790 &id_token_map,
791 1000,
792 )
793 .unwrap();
794
795 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
796
797 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
799 gas_coin.clone(),
800 gas_object_ref,
801 Owner::AddressOwner(sui_address),
802 );
803
804 let mut event = SuiEvent::random_for_testing();
806 event.type_ = TokenTransferClaimed.get().unwrap().clone();
807 let events = vec![event];
808 mock_transaction_response(
809 &sui_client_mock,
810 tx_digest,
811 SuiExecutionStatus::Success,
812 Some(events),
813 true,
814 );
815
816 store
817 .insert_pending_actions(std::slice::from_ref(&action))
818 .unwrap();
819 assert_eq!(
820 store.get_all_pending_actions()[&action.digest()],
821 action.clone()
822 );
823
824 submit_to_executor(&signing_tx, action.clone())
826 .await
827 .unwrap();
828
829 tx_subscription.recv().await.unwrap();
831 assert!(store.get_all_pending_actions().is_empty());
832
833 let (action_certificate, _, _) = get_bridge_authority_approved_action(
838 vec![&mock0, &mock1, &mock2, &mock3],
839 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
840 None,
841 true,
842 );
843
844 let action = action_certificate.data().clone();
845
846 let tx_data = build_sui_transaction(
847 sui_address,
848 &gas_object_ref,
849 action_certificate,
850 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
851 &id_token_map,
852 1000,
853 )
854 .unwrap();
855 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
856
857 mock_transaction_response(
859 &sui_client_mock,
860 tx_digest,
861 SuiExecutionStatus::Failure {
862 error: "failure is mother of success".to_string(),
863 },
864 None,
865 true,
866 );
867
868 store
869 .insert_pending_actions(std::slice::from_ref(&action))
870 .unwrap();
871 assert_eq!(
872 store.get_all_pending_actions()[&action.digest()],
873 action.clone()
874 );
875
876 submit_to_executor(&signing_tx, action.clone())
878 .await
879 .unwrap();
880
881 tx_subscription.recv().await.unwrap();
883 assert_eq!(
885 store.get_all_pending_actions()[&action.digest()],
886 action.clone()
887 );
888
889 let (action_certificate, _, _) = get_bridge_authority_approved_action(
894 vec![&mock0, &mock1, &mock2, &mock3],
895 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
896 None,
897 true,
898 );
899
900 let action = action_certificate.data().clone();
901
902 let tx_data = build_sui_transaction(
903 sui_address,
904 &gas_object_ref,
905 action_certificate,
906 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
907 &id_token_map,
908 1000,
909 )
910 .unwrap();
911 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
912 mock_transaction_error(
913 &sui_client_mock,
914 tx_digest,
915 BridgeError::Generic("some random error".to_string()),
916 true,
917 );
918
919 store
920 .insert_pending_actions(std::slice::from_ref(&action))
921 .unwrap();
922 assert_eq!(
923 store.get_all_pending_actions()[&action.digest()],
924 action.clone()
925 );
926
927 submit_to_executor(&signing_tx, action.clone())
929 .await
930 .unwrap();
931
932 let tx_digest = tx_subscription.recv().await.unwrap();
934 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
935
936 assert!(
938 store
939 .get_all_pending_actions()
940 .contains_key(&action.digest())
941 );
942
943 let mut event = SuiEvent::random_for_testing();
945 event.type_ = TokenTransferClaimed.get().unwrap().clone();
946 let events = vec![event];
947 mock_transaction_response(
948 &sui_client_mock,
949 tx_digest,
950 SuiExecutionStatus::Success,
951 Some(events),
952 true,
953 );
954
955 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
957 assert!(
959 !store
960 .get_all_pending_actions()
961 .contains_key(&action.digest())
962 );
963 }
964
965 #[tokio::test]
966 async fn test_signature_aggregation_loop() {
967 let (
968 signing_tx,
969 _execution_tx,
970 sui_client_mock,
971 mut tx_subscription,
972 store,
973 secrets,
974 dummy_sui_key,
975 mock0,
976 mock1,
977 mock2,
978 mock3,
979 _handles,
980 gas_object_ref,
981 sui_address,
982 sui_token_type_tags,
983 _bridge_pause_tx,
984 ) = setup().await;
985 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
986 let (action_certificate, sui_tx_digest, sui_tx_event_index) =
987 get_bridge_authority_approved_action(
988 vec![&mock0, &mock1, &mock2, &mock3],
989 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
990 None,
991 true,
992 );
993 let action = action_certificate.data().clone();
994 mock_bridge_authority_signing_errors(
995 vec![&mock0, &mock1, &mock2],
996 sui_tx_digest,
997 sui_tx_event_index,
998 );
999 let mut sigs = mock_bridge_authority_sigs(
1000 vec![&mock3],
1001 &action,
1002 vec![&secrets[3]],
1003 sui_tx_digest,
1004 sui_tx_event_index,
1005 );
1006
1007 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1009 gas_coin,
1010 gas_object_ref,
1011 Owner::AddressOwner(sui_address),
1012 );
1013 store
1014 .insert_pending_actions(std::slice::from_ref(&action))
1015 .unwrap();
1016 assert_eq!(
1017 store.get_all_pending_actions()[&action.digest()],
1018 action.clone()
1019 );
1020
1021 submit_to_executor(&signing_tx, action.clone())
1023 .await
1024 .unwrap();
1025
1026 loop {
1028 let requested_times =
1029 mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
1030 if requested_times >= 2 {
1031 break;
1032 }
1033 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1034 }
1035 assert_eq!(
1037 tx_subscription.try_recv().unwrap_err(),
1038 tokio::sync::broadcast::error::TryRecvError::Empty
1039 );
1040 assert_eq!(
1042 store.get_all_pending_actions()[&action.digest()],
1043 action.clone()
1044 );
1045
1046 let sig_from_2 = mock_bridge_authority_sigs(
1048 vec![&mock2],
1049 &action,
1050 vec![&secrets[2]],
1051 sui_tx_digest,
1052 sui_tx_event_index,
1053 );
1054 sigs.extend(sig_from_2);
1055 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1056 action.clone(),
1057 BridgeCommitteeValiditySignInfo { signatures: sigs },
1058 );
1059 let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1060 let tx_data = build_sui_transaction(
1061 sui_address,
1062 &gas_object_ref,
1063 action_certificate,
1064 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1065 &id_token_map,
1066 1000,
1067 )
1068 .unwrap();
1069 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1070
1071 let mut event = SuiEvent::random_for_testing();
1072 event.type_ = TokenTransferClaimed.get().unwrap().clone();
1073 let events = vec![event];
1074 mock_transaction_response(
1075 &sui_client_mock,
1076 tx_digest,
1077 SuiExecutionStatus::Success,
1078 Some(events),
1079 true,
1080 );
1081
1082 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1084 assert!(
1086 !store
1087 .get_all_pending_actions()
1088 .contains_key(&action.digest())
1089 );
1090 }
1091
1092 #[tokio::test]
1093 async fn test_skip_request_signature_if_already_processed_on_chain() {
1094 let (
1095 signing_tx,
1096 _execution_tx,
1097 sui_client_mock,
1098 mut tx_subscription,
1099 store,
1100 _secrets,
1101 _dummy_sui_key,
1102 mock0,
1103 mock1,
1104 mock2,
1105 mock3,
1106 _handles,
1107 _gas_object_ref,
1108 _sui_address,
1109 _sui_token_type_tags,
1110 _bridge_pause_tx,
1111 ) = setup().await;
1112
1113 let sui_tx_digest = TransactionDigest::random();
1114 let sui_tx_event_index = 0;
1115 let action = get_test_sui_to_eth_bridge_action(
1116 Some(sui_tx_digest),
1117 Some(sui_tx_event_index),
1118 None,
1119 None,
1120 None,
1121 None,
1122 None,
1123 );
1124 mock_bridge_authority_signing_errors(
1125 vec![&mock0, &mock1, &mock2, &mock3],
1126 sui_tx_digest,
1127 sui_tx_event_index,
1128 );
1129 store
1130 .insert_pending_actions(std::slice::from_ref(&action))
1131 .unwrap();
1132 assert_eq!(
1133 store.get_all_pending_actions()[&action.digest()],
1134 action.clone()
1135 );
1136
1137 submit_to_executor(&signing_tx, action.clone())
1139 .await
1140 .unwrap();
1141 let action_digest = action.digest();
1142
1143 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1145 tx_subscription.try_recv().unwrap_err();
1146 assert!(store.get_all_pending_actions().contains_key(&action_digest));
1148
1149 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1150
1151 let now = std::time::Instant::now();
1153 while store.get_all_pending_actions().contains_key(&action_digest) {
1154 if now.elapsed().as_secs() > 10 {
1155 panic!("Timeout waiting for action to be removed from WAL");
1156 }
1157 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1158 }
1159 tx_subscription.try_recv().unwrap_err();
1160 }
1161
1162 #[tokio::test]
1163 async fn test_skip_tx_submission_if_already_processed_on_chain() {
1164 let (
1165 _signing_tx,
1166 execution_tx,
1167 sui_client_mock,
1168 mut tx_subscription,
1169 store,
1170 secrets,
1171 dummy_sui_key,
1172 mock0,
1173 mock1,
1174 mock2,
1175 mock3,
1176 _handles,
1177 gas_object_ref,
1178 sui_address,
1179 sui_token_type_tags,
1180 _bridge_pause_tx,
1181 ) = setup().await;
1182 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1183 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1184 vec![&mock0, &mock1, &mock2, &mock3],
1185 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1186 None,
1187 true,
1188 );
1189
1190 let action = action_certificate.data().clone();
1191 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1192 let tx_data = build_sui_transaction(
1193 sui_address,
1194 &gas_object_ref,
1195 action_certificate.clone(),
1196 arg,
1197 &id_token_map,
1198 1000,
1199 )
1200 .unwrap();
1201 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1202 mock_transaction_error(
1203 &sui_client_mock,
1204 tx_digest,
1205 BridgeError::Generic("some random error".to_string()),
1206 true,
1207 );
1208
1209 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1211 gas_coin.clone(),
1212 gas_object_ref,
1213 Owner::AddressOwner(sui_address),
1214 );
1215
1216 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1217
1218 store
1219 .insert_pending_actions(std::slice::from_ref(&action))
1220 .unwrap();
1221 assert_eq!(
1222 store.get_all_pending_actions()[&action.digest()],
1223 action.clone()
1224 );
1225
1226 execution_tx
1228 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1229 .await
1230 .unwrap();
1231
1232 tx_subscription.recv().await.unwrap();
1234
1235 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1237
1238 let now = std::time::Instant::now();
1240 let action_digest = action.digest();
1241 while store.get_all_pending_actions().contains_key(&action_digest) {
1242 if now.elapsed().as_secs() > 10 {
1243 panic!("Timeout waiting for action to be removed from WAL");
1244 }
1245 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1246 }
1247 }
1248
1249 #[tokio::test]
1250 async fn test_skip_tx_submission_if_bridge_is_paused() {
1251 let (
1252 _signing_tx,
1253 execution_tx,
1254 sui_client_mock,
1255 mut tx_subscription,
1256 store,
1257 secrets,
1258 dummy_sui_key,
1259 mock0,
1260 mock1,
1261 mock2,
1262 mock3,
1263 _handles,
1264 gas_object_ref,
1265 sui_address,
1266 sui_token_type_tags,
1267 bridge_pause_tx,
1268 ) = setup().await;
1269 let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1270 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1271 vec![&mock0, &mock1, &mock2, &mock3],
1272 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1273 None,
1274 true,
1275 );
1276
1277 let action = action_certificate.data().clone();
1278 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1279 let tx_data = build_sui_transaction(
1280 sui_address,
1281 &gas_object_ref,
1282 action_certificate.clone(),
1283 arg,
1284 &id_token_map,
1285 1000,
1286 )
1287 .unwrap();
1288 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1289 mock_transaction_error(
1290 &sui_client_mock,
1291 tx_digest,
1292 BridgeError::Generic("some random error".to_string()),
1293 true,
1294 );
1295
1296 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1298 gas_coin.clone(),
1299 gas_object_ref,
1300 Owner::AddressOwner(sui_address),
1301 );
1302 let action_digest = action.digest();
1303 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1304
1305 assert!(!*bridge_pause_tx.borrow());
1307
1308 store
1309 .insert_pending_actions(std::slice::from_ref(&action))
1310 .unwrap();
1311 assert_eq!(
1312 store.get_all_pending_actions()[&action.digest()],
1313 action.clone()
1314 );
1315
1316 execution_tx
1318 .send(CertifiedBridgeActionExecutionWrapper(
1319 action_certificate.clone(),
1320 0,
1321 ))
1322 .await
1323 .unwrap();
1324
1325 tx_subscription.recv().await.unwrap();
1327
1328 bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1330
1331 execution_tx
1333 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1334 .await
1335 .unwrap();
1336
1337 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1338 assert_eq!(
1340 tx_subscription.try_recv().unwrap_err(),
1341 tokio::sync::broadcast::error::TryRecvError::Empty
1342 );
1343 assert_eq!(
1345 store.get_all_pending_actions()[&action_digest],
1346 action.clone()
1347 );
1348 }
1349
1350 #[tokio::test]
1351 async fn test_action_executor_handle_new_token() {
1352 let new_token_id = 255u8; let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1354 let (
1355 _signing_tx,
1356 execution_tx,
1357 sui_client_mock,
1358 mut tx_subscription,
1359 _store,
1360 secrets,
1361 dummy_sui_key,
1362 mock0,
1363 mock1,
1364 mock2,
1365 mock3,
1366 _handles,
1367 gas_object_ref,
1368 sui_address,
1369 sui_token_type_tags,
1370 _bridge_pause_tx,
1371 ) = setup().await;
1372 let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1373 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1374 vec![&mock0, &mock1, &mock2, &mock3],
1375 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1376 Some(new_token_id),
1377 false, );
1379
1380 let action = action_certificate.data().clone();
1381 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1382 let tx_data = build_sui_transaction(
1383 sui_address,
1384 &gas_object_ref,
1385 action_certificate.clone(),
1386 arg,
1387 &maplit::hashmap! {
1388 new_token_id => new_type_tag.clone()
1389 },
1390 1000,
1391 )
1392 .unwrap();
1393 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1394 mock_transaction_error(
1395 &sui_client_mock,
1396 tx_digest,
1397 BridgeError::Generic("some random error".to_string()),
1398 true,
1399 );
1400
1401 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1403 gas_coin.clone(),
1404 gas_object_ref,
1405 Owner::AddressOwner(sui_address),
1406 );
1407 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1408
1409 execution_tx
1411 .send(CertifiedBridgeActionExecutionWrapper(
1412 action_certificate.clone(),
1413 0,
1414 ))
1415 .await
1416 .unwrap();
1417
1418 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1419 assert_eq!(
1421 tx_subscription.try_recv().unwrap_err(),
1422 tokio::sync::broadcast::error::TryRecvError::Empty
1423 );
1424
1425 id_token_map.insert(new_token_id, new_type_tag);
1427 sui_token_type_tags.store(Arc::new(id_token_map));
1428
1429 execution_tx
1431 .send(CertifiedBridgeActionExecutionWrapper(
1432 action_certificate.clone(),
1433 0,
1434 ))
1435 .await
1436 .unwrap();
1437
1438 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1439 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1441 }
1442
1443 fn mock_bridge_authority_sigs(
1444 mocks: Vec<&BridgeRequestMockHandler>,
1445 action: &BridgeAction,
1446 secrets: Vec<&BridgeAuthorityKeyPair>,
1447 sui_tx_digest: TransactionDigest,
1448 sui_tx_event_index: u16,
1449 ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1450 assert_eq!(mocks.len(), secrets.len());
1451 let mut signed_actions = BTreeMap::new();
1452 for (mock, secret) in mocks.iter().zip_debug_eq(secrets.iter()) {
1453 let signed_action = sign_action_with_key(action, secret);
1454 mock.add_sui_event_response(
1455 sui_tx_digest,
1456 sui_tx_event_index,
1457 Ok(signed_action.clone()),
1458 None,
1459 );
1460 signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1461 }
1462 signed_actions
1463 }
1464
1465 fn mock_bridge_authority_signing_errors(
1466 mocks: Vec<&BridgeRequestMockHandler>,
1467 sui_tx_digest: TransactionDigest,
1468 sui_tx_event_index: u16,
1469 ) {
1470 for mock in mocks {
1471 mock.add_sui_event_response(
1472 sui_tx_digest,
1473 sui_tx_event_index,
1474 Err(BridgeError::RestAPIError("small issue".into())),
1475 None,
1476 );
1477 }
1478 }
1479
1480 fn get_bridge_authority_approved_action(
1482 mocks: Vec<&BridgeRequestMockHandler>,
1483 secrets: Vec<&BridgeAuthorityKeyPair>,
1484 token_id: Option<u8>,
1485 sui_to_eth: bool,
1486 ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1487 let sui_tx_digest = TransactionDigest::random();
1488 let sui_tx_event_index = 1;
1489 let action = if sui_to_eth {
1490 get_test_sui_to_eth_bridge_action(
1491 Some(sui_tx_digest),
1492 Some(sui_tx_event_index),
1493 None,
1494 None,
1495 None,
1496 None,
1497 token_id,
1498 )
1499 } else {
1500 get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1501 };
1502
1503 let sigs =
1504 mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1505 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1506 action,
1507 BridgeCommitteeValiditySignInfo { signatures: sigs },
1508 );
1509 (
1510 VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1511 sui_tx_digest,
1512 sui_tx_event_index,
1513 )
1514 }
1515
1516 fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1517 let sig = Signature::new_secure(
1518 &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1519 dummy_sui_key,
1520 );
1521 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1522 *signed_tx.digest()
1523 }
1524
1525 fn mock_transaction_response(
1529 sui_client_mock: &SuiMockClient,
1530 tx_digest: TransactionDigest,
1531 status: SuiExecutionStatus,
1532 events: Option<Vec<SuiEvent>>,
1533 wildcard: bool,
1534 ) {
1535 let response = ExecuteTransactionResult {
1536 status,
1537 events: events.unwrap_or_default(),
1538 };
1539 if wildcard {
1540 sui_client_mock.set_wildcard_transaction_response(Ok(response));
1541 } else {
1542 sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1543 }
1544 }
1545
1546 fn mock_transaction_error(
1547 sui_client_mock: &SuiMockClient,
1548 tx_digest: TransactionDigest,
1549 error: BridgeError,
1550 wildcard: bool,
1551 ) {
1552 if wildcard {
1553 sui_client_mock.set_wildcard_transaction_response(Err(error));
1554 } else {
1555 sui_client_mock.add_transaction_response(tx_digest, Err(error));
1556 }
1557 }
1558
1559 #[allow(clippy::type_complexity)]
1560 async fn setup() -> (
1561 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1562 mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1563 SuiMockClient,
1564 tokio::sync::broadcast::Receiver<TransactionDigest>,
1565 Arc<BridgeOrchestratorTables>,
1566 Vec<BridgeAuthorityKeyPair>,
1567 SuiKeyPair,
1568 BridgeRequestMockHandler,
1569 BridgeRequestMockHandler,
1570 BridgeRequestMockHandler,
1571 BridgeRequestMockHandler,
1572 Vec<tokio::task::JoinHandle<()>>,
1573 ObjectRef,
1574 SuiAddress,
1575 Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1576 tokio::sync::watch::Sender<IsBridgePaused>,
1577 ) {
1578 telemetry_subscribers::init_for_testing();
1579 let registry = Registry::new();
1580 mysten_metrics::init_metrics(®istry);
1581 init_all_struct_tags();
1582
1583 let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1584 let sui_key = SuiKeyPair::from(kp);
1585 let gas_object_ref = random_object_ref();
1586 let temp_dir = tempfile::tempdir().unwrap();
1587 let store = BridgeOrchestratorTables::new(temp_dir.path());
1588 let sui_client_mock = SuiMockClient::default();
1589 let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1590 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1591
1592 let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1595 let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1596
1597 let mock0 = BridgeRequestMockHandler::new();
1598 let mock1 = BridgeRequestMockHandler::new();
1599 let mock2 = BridgeRequestMockHandler::new();
1600 let mock3 = BridgeRequestMockHandler::new();
1601
1602 let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1603 vec![2500, 2500, 2500, 2500],
1604 vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1605 );
1606
1607 let committee = BridgeCommittee::new(authorities).unwrap();
1608
1609 let agg = Arc::new(ArcSwap::new(Arc::new(
1610 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1611 )));
1612 let metrics = Arc::new(BridgeMetrics::new(®istry));
1613 let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1614 let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1615 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1616 let executor = BridgeActionExecutor::new(
1617 sui_client.clone(),
1618 agg.clone(),
1619 store.clone(),
1620 sui_key,
1621 sui_address,
1622 gas_object_ref.0,
1623 sui_token_type_tags.clone(),
1624 bridge_pause_rx,
1625 metrics,
1626 )
1627 .await;
1628
1629 let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1630 handles.extend(executor_handle);
1631
1632 (
1633 signing_tx,
1634 execution_tx,
1635 sui_client_mock,
1636 tx_subscription,
1637 store,
1638 secrets,
1639 dummy_sui_key,
1640 mock0,
1641 mock1,
1642 mock2,
1643 mock3,
1644 handles,
1645 gas_object_ref,
1646 sui_address,
1647 sui_token_type_tags,
1648 bridge_pause_tx,
1649 )
1650 }
1651}