1use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::SuiExecutionStatus;
13use sui_types::TypeTag;
14use sui_types::transaction::ObjectArg;
15use sui_types::{
16 base_types::{ObjectID, ObjectRef, SuiAddress},
17 crypto::{Signature, SuiKeyPair},
18 digests::TransactionDigest,
19 gas_coin::GasCoin,
20 object::Owner,
21 transaction::Transaction,
22};
23
24use crate::events::{
25 TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
26 TokenTransferClaimed,
27};
28use crate::metrics::BridgeMetrics;
29use crate::{
30 client::bridge_authority_aggregator::BridgeAuthorityAggregator,
31 error::BridgeError,
32 storage::BridgeOrchestratorTables,
33 sui_client::{ExecuteTransactionResult, SuiClient, SuiClientInner},
34 sui_transaction_builder::build_sui_transaction,
35 types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
36};
37use std::collections::HashMap;
38use std::sync::Arc;
39use tokio::sync::Semaphore;
40use tokio::time::Duration;
41use tracing::{Instrument, error, info, instrument, warn};
42
43pub const CHANNEL_SIZE: usize = 1000;
44pub const SIGNING_CONCURRENCY: usize = 10;
45
46pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
49pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
50
51async fn delay(attempt_times: u64) {
52 let delay_ms = 100 * 2_u64.pow(attempt_times as u32);
53 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
54}
55
56#[derive(Debug)]
57pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
58
59#[derive(Debug)]
60pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
61
62pub trait BridgeActionExecutorTrait {
63 fn run(
64 self,
65 ) -> (
66 Vec<tokio::task::JoinHandle<()>>,
67 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
68 );
69}
70
71pub struct BridgeActionExecutor<C> {
72 sui_client: Arc<SuiClient<C>>,
73 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
74 key: SuiKeyPair,
75 sui_address: SuiAddress,
76 gas_object_id: ObjectID,
77 store: Arc<BridgeOrchestratorTables>,
78 bridge_object_arg: ObjectArg,
79 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
80 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
81 metrics: Arc<BridgeMetrics>,
82}
83
84impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
85where
86 C: SuiClientInner + 'static,
87{
88 fn run(
89 self,
90 ) -> (
91 Vec<tokio::task::JoinHandle<()>>,
92 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
93 ) {
94 let (tasks, sender, _) = self.run_inner();
95 (tasks, sender)
96 }
97}
98
99impl<C> BridgeActionExecutor<C>
100where
101 C: SuiClientInner + 'static,
102{
103 pub async fn new(
104 sui_client: Arc<SuiClient<C>>,
105 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
106 store: Arc<BridgeOrchestratorTables>,
107 key: SuiKeyPair,
108 sui_address: SuiAddress,
109 gas_object_id: ObjectID,
110 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
111 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
112 metrics: Arc<BridgeMetrics>,
113 ) -> Self {
114 let bridge_object_arg = sui_client
115 .get_mutable_bridge_object_arg_must_succeed()
116 .await;
117 Self {
118 sui_client,
119 bridge_auth_agg,
120 store,
121 key,
122 gas_object_id,
123 sui_address,
124 bridge_object_arg,
125 sui_token_type_tags,
126 bridge_pause_rx,
127 metrics,
128 }
129 }
130
131 fn run_inner(
132 self,
133 ) -> (
134 Vec<tokio::task::JoinHandle<()>>,
135 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
136 mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
137 ) {
138 let key = self.key;
139
140 let (sender, receiver) = mysten_metrics::metered_channel::channel(
141 CHANNEL_SIZE,
142 &mysten_metrics::get_metrics()
143 .unwrap()
144 .channel_inflight
145 .with_label_values(&["executor_signing_queue"]),
146 );
147
148 let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
149 CHANNEL_SIZE,
150 &mysten_metrics::get_metrics()
151 .unwrap()
152 .channel_inflight
153 .with_label_values(&["executor_execution_queue"]),
154 );
155 let execution_tx_clone = execution_tx.clone();
156 let sender_clone = sender.clone();
157 let store_clone = self.store.clone();
158 let client_clone = self.sui_client.clone();
159 let mut tasks = vec![];
160 let metrics = self.metrics.clone();
161 tasks.push(spawn_logged_monitored_task!(
162 Self::run_signature_aggregation_loop(
163 client_clone,
164 self.bridge_auth_agg,
165 store_clone,
166 sender_clone,
167 receiver,
168 execution_tx_clone,
169 metrics,
170 )
171 ));
172
173 let metrics = self.metrics.clone();
174 let execution_tx_clone = execution_tx.clone();
175 tasks.push(spawn_logged_monitored_task!(
176 Self::run_onchain_execution_loop(
177 self.sui_client.clone(),
178 key,
179 self.sui_address,
180 self.gas_object_id,
181 self.store.clone(),
182 execution_tx_clone,
183 execution_rx,
184 self.bridge_object_arg,
185 self.sui_token_type_tags,
186 self.bridge_pause_rx,
187 metrics,
188 )
189 ));
190 (tasks, sender, execution_tx)
191 }
192
193 async fn run_signature_aggregation_loop(
194 sui_client: Arc<SuiClient<C>>,
195 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
196 store: Arc<BridgeOrchestratorTables>,
197 signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
198 mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
199 BridgeActionExecutionWrapper,
200 >,
201 execution_queue_sender: mysten_metrics::metered_channel::Sender<
202 CertifiedBridgeActionExecutionWrapper,
203 >,
204 metrics: Arc<BridgeMetrics>,
205 ) {
206 info!("Starting run_signature_aggregation_loop");
207 let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
208 while let Some(action) = signing_queue_receiver.recv().await {
209 Self::handle_signing_task(
210 &semaphore,
211 &auth_agg,
212 &signing_queue_sender,
213 &execution_queue_sender,
214 &sui_client,
215 &store,
216 action,
217 &metrics,
218 )
219 .await;
220 }
221 }
222
223 async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
224 let Ok(Ok(is_paused)) =
225 retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
226 else {
227 error!("Failed to get bridge status after retry");
228 return false;
229 };
230 !is_paused
231 }
232
233 #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
234 async fn handle_signing_task(
235 semaphore: &Arc<Semaphore>,
236 auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
237 signing_queue_sender: &mysten_metrics::metered_channel::Sender<
238 BridgeActionExecutionWrapper,
239 >,
240 execution_queue_sender: &mysten_metrics::metered_channel::Sender<
241 CertifiedBridgeActionExecutionWrapper,
242 >,
243 sui_client: &Arc<SuiClient<C>>,
244 store: &Arc<BridgeOrchestratorTables>,
245 action: BridgeActionExecutionWrapper,
246 metrics: &Arc<BridgeMetrics>,
247 ) {
248 metrics.action_executor_signing_queue_received_actions.inc();
249 let action_key = action.0.key();
250 info!("Received action for signing: {:?}", action.0);
251
252 let should_proceed = Self::should_proceed_signing(sui_client).await;
257 if !should_proceed {
258 metrics.action_executor_signing_queue_skipped_actions.inc();
259 warn!("skipping signing task: {:?}", action_key);
260 return;
261 }
262
263 let auth_agg_clone = auth_agg.clone();
264 let signing_queue_sender_clone = signing_queue_sender.clone();
265 let execution_queue_sender_clone = execution_queue_sender.clone();
266 let sui_client_clone = sui_client.clone();
267 let store_clone = store.clone();
268 let metrics_clone = metrics.clone();
269 let semaphore_clone = semaphore.clone();
270 spawn_logged_monitored_task!(
271 Self::request_signatures(
272 semaphore_clone,
273 sui_client_clone,
274 auth_agg_clone,
275 action,
276 store_clone,
277 signing_queue_sender_clone,
278 execution_queue_sender_clone,
279 metrics_clone,
280 )
281 .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
282 "request_signatures"
283 );
284 }
285
286 async fn handle_already_processed_token_transfer_action_maybe(
290 sui_client: &Arc<SuiClient<C>>,
291 action: &BridgeAction,
292 store: &Arc<BridgeOrchestratorTables>,
293 metrics: &Arc<BridgeMetrics>,
294 ) -> bool {
295 let status = sui_client
296 .get_token_transfer_action_onchain_status_until_success(
297 action.chain_id() as u8,
298 action.seq_number(),
299 )
300 .await;
301 match status {
302 BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
303 info!(
304 "Action already approved or claimed, removing action from pending logs: {:?}",
305 action
306 );
307 metrics.action_executor_already_processed_actions.inc();
308 store
309 .remove_pending_actions(&[action.digest()])
310 .unwrap_or_else(|e| {
311 panic!("Write to DB should not fail: {:?}", e);
312 });
313 true
314 }
315 BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
318 }
319 }
320
321 async fn request_signatures(
324 semaphore: Arc<Semaphore>,
325 sui_client: Arc<SuiClient<C>>,
326 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
327 action: BridgeActionExecutionWrapper,
328 store: Arc<BridgeOrchestratorTables>,
329 signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
330 execution_queue_sender: mysten_metrics::metered_channel::Sender<
331 CertifiedBridgeActionExecutionWrapper,
332 >,
333 metrics: Arc<BridgeMetrics>,
334 ) {
335 let _permit = semaphore
336 .acquire()
337 .await
338 .expect("semaphore should not be closed");
339 info!("requesting signatures");
340 let BridgeActionExecutionWrapper(action, attempt_times) = action;
341
342 match &action {
344 BridgeAction::SuiToEthBridgeAction(_)
345 | BridgeAction::SuiToEthTokenTransfer(_)
346 | BridgeAction::SuiToEthTokenTransferV2(_)
347 | BridgeAction::EthToSuiBridgeAction(_)
348 | BridgeAction::EthToSuiTokenTransferV2(_) => (),
349 _ => unreachable!("Non token transfer action should not reach here"),
350 };
351
352 if Self::handle_already_processed_token_transfer_action_maybe(
354 &sui_client,
355 &action,
356 &store,
357 &metrics,
358 )
359 .await
360 {
361 return;
362 }
363 match auth_agg
364 .load()
365 .request_committee_signatures(action.clone())
366 .await
367 {
368 Ok(certificate) => {
369 info!("Sending certificate to execution");
370 execution_queue_sender
371 .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
372 .await
373 .unwrap_or_else(|e| {
374 panic!("Sending to execution queue should not fail: {:?}", e);
375 });
376 }
377 Err(e) => {
378 warn!("Failed to collect sigs for bridge action: {:?}", e);
379 metrics.err_signature_aggregation.inc();
380
381 if attempt_times >= MAX_SIGNING_ATTEMPTS {
383 metrics.err_signature_aggregation_too_many_failures.inc();
384 error!(
385 "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
386 e
387 );
388 return;
389 }
390 delay(attempt_times).await;
391 signing_queue_sender
392 .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
393 .await
394 .unwrap_or_else(|e| {
395 panic!("Sending to signing queue should not fail: {:?}", e);
396 });
397 }
398 }
399 }
400
401 async fn run_onchain_execution_loop(
404 sui_client: Arc<SuiClient<C>>,
405 sui_key: SuiKeyPair,
406 sui_address: SuiAddress,
407 gas_object_id: ObjectID,
408 store: Arc<BridgeOrchestratorTables>,
409 execution_queue_sender: mysten_metrics::metered_channel::Sender<
410 CertifiedBridgeActionExecutionWrapper,
411 >,
412 mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
413 CertifiedBridgeActionExecutionWrapper,
414 >,
415 bridge_object_arg: ObjectArg,
416 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
417 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
418 metrics: Arc<BridgeMetrics>,
419 ) {
420 info!("Starting run_onchain_execution_loop");
421 while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
422 if *bridge_pause_rx.borrow() {
426 warn!("Bridge is paused, skipping execution");
427 metrics
428 .action_executor_execution_queue_skipped_actions_due_to_pausing
429 .inc();
430 continue;
431 }
432 Self::handle_execution_task(
433 certificate_wrapper,
434 &sui_client,
435 &sui_key,
436 &sui_address,
437 gas_object_id,
438 &store,
439 &execution_queue_sender,
440 &bridge_object_arg,
441 &sui_token_type_tags,
442 &metrics,
443 )
444 .await;
445 }
446 panic!("Execution queue closed unexpectedly");
447 }
448
449 #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
450 async fn handle_execution_task(
451 certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
452 sui_client: &Arc<SuiClient<C>>,
453 sui_key: &SuiKeyPair,
454 sui_address: &SuiAddress,
455 gas_object_id: ObjectID,
456 store: &Arc<BridgeOrchestratorTables>,
457 execution_queue_sender: &mysten_metrics::metered_channel::Sender<
458 CertifiedBridgeActionExecutionWrapper,
459 >,
460 bridge_object_arg: &ObjectArg,
461 sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
462 metrics: &Arc<BridgeMetrics>,
463 ) {
464 metrics
465 .action_executor_execution_queue_received_actions
466 .inc();
467 let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
468 let action = certificate.data();
469 let action_key = action.key();
470
471 info!("Received certified action for execution: {:?}", action);
472
473 let (gas_coin, gas_object_ref) =
475 Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
476 metrics.gas_coin_balance.set(gas_coin.value() as i64);
477
478 let ceriticate_clone = certificate.clone();
479
480 if Self::handle_already_processed_token_transfer_action_maybe(
482 sui_client, action, store, metrics,
483 )
484 .await
485 {
486 info!("Action already processed, skipping");
487 return;
488 }
489
490 info!("Building Sui transaction");
491 let rgp = sui_client.get_reference_gas_price_until_success().await;
492 let tx_data = match build_sui_transaction(
493 *sui_address,
494 &gas_object_ref,
495 ceriticate_clone.clone(),
496 *bridge_object_arg,
497 sui_token_type_tags.load().as_ref(),
498 rgp,
499 ) {
500 Ok(tx_data) => tx_data,
501 Err(BridgeError::UnknownTokenId(token_id)) => {
502 info!(
505 "Unknown token_id {}, refreshing token map from chain and retrying",
506 token_id
507 );
508 match sui_client.get_token_id_map().await {
509 Ok(new_token_map) => {
510 sui_token_type_tags.store(Arc::new(new_token_map));
511 match build_sui_transaction(
513 *sui_address,
514 &gas_object_ref,
515 ceriticate_clone,
516 *bridge_object_arg,
517 sui_token_type_tags.load().as_ref(),
518 rgp,
519 ) {
520 Ok(tx_data) => tx_data,
521 Err(err) => {
522 metrics.err_build_sui_transaction.inc();
523 error!(
524 "Manual intervention is required. Failed to build transaction after token map refresh for action {:?}: {:?}",
525 action, err
526 );
527 return;
528 }
529 }
530 }
531 Err(e) => {
532 metrics.err_build_sui_transaction.inc();
533 error!(
534 "Manual intervention is required. Failed to refresh token map: {:?}",
535 e
536 );
537 return;
538 }
539 }
540 }
541 Err(err) => {
542 metrics.err_build_sui_transaction.inc();
543 error!(
544 "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
545 action, err
546 );
547 return;
550 }
551 };
552 let sig = Signature::new_secure(
553 &IntentMessage::new(Intent::sui_transaction(), &tx_data),
554 sui_key,
555 );
556 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
557 let tx_digest = *signed_tx.digest();
558
559 if Self::handle_already_processed_token_transfer_action_maybe(
561 sui_client, action, store, metrics,
562 )
563 .await
564 {
565 info!("Action already processed, skipping");
566 return;
567 }
568
569 info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
570 match sui_client
571 .execute_transaction_block_with_effects(signed_tx)
572 .await
573 {
574 Ok(resp) => {
575 Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
576 }
577
578 Err(err) => {
580 error!(
581 ?action_key,
582 ?tx_digest,
583 "Sui transaction failed at signing: {err:?}"
584 );
585 metrics.err_sui_transaction_submission.inc();
586 let metrics_clone = metrics.clone();
587 let sender_clone = execution_queue_sender.clone();
589 spawn_logged_monitored_task!(async move {
590 if attempt_times >= MAX_EXECUTION_ATTEMPTS {
592 metrics_clone
593 .err_sui_transaction_submission_too_many_failures
594 .inc();
595 error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
596 return;
597 }
598 delay(attempt_times).await;
599 sender_clone
600 .send(CertifiedBridgeActionExecutionWrapper(
601 certificate,
602 attempt_times + 1,
603 ))
604 .await
605 .unwrap_or_else(|e| {
606 panic!("Sending to execution queue should not fail: {:?}", e);
607 });
608 info!("Re-enqueued certificate for execution");
609 }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
610 }
611 }
612 }
613
614 async fn handle_execution_effects(
616 tx_digest: TransactionDigest,
617 response: ExecuteTransactionResult,
618 store: &Arc<BridgeOrchestratorTables>,
619 action: &BridgeAction,
620 metrics: &Arc<BridgeMetrics>,
621 ) {
622 match &response.status {
623 SuiExecutionStatus::Success => {
624 let relevant_events = response
625 .events
626 .iter()
627 .filter(|e| {
628 e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
629 || e.type_ == *TokenTransferClaimed.get().unwrap()
630 || e.type_ == *TokenTransferApproved.get().unwrap()
631 || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
632 })
633 .collect::<Vec<_>>();
634 assert!(
635 !relevant_events.is_empty(),
636 "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
637 or TokenTransferAlreadyApproved event but got: {:?}",
638 response.events
639 );
640 info!(?tx_digest, "Sui transaction executed successfully");
641 relevant_events.iter().for_each(|e| {
643 if e.type_ == *TokenTransferClaimed.get().unwrap() {
644 match action {
645 BridgeAction::EthToSuiBridgeAction(_)
646 | BridgeAction::EthToSuiTokenTransferV2(_) => {
647 metrics.eth_sui_token_transfer_claimed.inc();
648 }
649 BridgeAction::SuiToEthBridgeAction(_)
650 | BridgeAction::SuiToEthTokenTransfer(_)
651 | BridgeAction::SuiToEthTokenTransferV2(_) => {
652 metrics.sui_eth_token_transfer_claimed.inc();
653 }
654 _ => error!("Unexpected action type for claimed event: {:?}", action),
655 }
656 } else if e.type_ == *TokenTransferApproved.get().unwrap() {
657 match action {
658 BridgeAction::EthToSuiBridgeAction(_)
659 | BridgeAction::EthToSuiTokenTransferV2(_) => {
660 metrics.eth_sui_token_transfer_approved.inc();
661 }
662 BridgeAction::SuiToEthBridgeAction(_)
663 | BridgeAction::SuiToEthTokenTransfer(_)
664 | BridgeAction::SuiToEthTokenTransferV2(_) => {
665 metrics.sui_eth_token_transfer_approved.inc();
666 }
667 _ => error!("Unexpected action type for approved event: {:?}", action),
668 }
669 }
670 });
671 store
672 .remove_pending_actions(&[action.digest()])
673 .unwrap_or_else(|e| {
674 panic!("Write to DB should not fail: {:?}", e);
675 })
676 }
677 SuiExecutionStatus::Failure { error } => {
678 metrics.err_sui_transaction_execution.inc();
685 error!(
686 ?tx_digest,
687 "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
688 );
689 }
690 }
691 }
692
693 async fn get_gas_data_assert_ownership(
695 sui_address: SuiAddress,
696 gas_object_id: ObjectID,
697 sui_client: &SuiClient<C>,
698 ) -> (GasCoin, ObjectRef) {
699 let (gas_coin, gas_obj_ref, owner) = sui_client
700 .get_gas_data_panic_if_not_gas(gas_object_id)
701 .await;
702
703 assert_eq!(
706 owner,
707 Owner::AddressOwner(sui_address),
708 "Gas object {:?} is no longer owned by address {}",
709 gas_object_id,
710 sui_address
711 );
712 (gas_coin, gas_obj_ref)
713 }
714}
715
716pub async fn submit_to_executor(
717 tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
718 action: BridgeAction,
719) -> Result<(), BridgeError> {
720 tx.send(BridgeActionExecutionWrapper(action, 0))
721 .await
722 .map_err(|e| BridgeError::Generic(e.to_string()))
723}
724
725#[cfg(test)]
726mod tests {
727 use crate::events::init_all_struct_tags;
728 use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
729 use crate::types::BRIDGE_PAUSED;
730 use fastcrypto::traits::KeyPair;
731 use prometheus::Registry;
732 use std::collections::{BTreeMap, HashMap};
733 use std::str::FromStr;
734 use sui_json_rpc_types::SuiEvent;
735 use sui_types::TypeTag;
736 use sui_types::crypto::get_key_pair;
737 use sui_types::gas_coin::GasCoin;
738 use sui_types::{base_types::random_object_ref, transaction::TransactionData};
739
740 use crate::{
741 crypto::{
742 BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
743 BridgeAuthorityRecoverableSignature,
744 },
745 server::mock_handler::BridgeRequestMockHandler,
746 sui_mock_client::SuiMockClient,
747 test_utils::{
748 get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
749 get_test_sui_to_eth_bridge_action, sign_action_with_key,
750 },
751 types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
752 };
753
754 use super::*;
755
756 #[tokio::test]
757 async fn test_onchain_execution_loop() {
758 let (
759 signing_tx,
760 _execution_tx,
761 sui_client_mock,
762 mut tx_subscription,
763 store,
764 secrets,
765 dummy_sui_key,
766 mock0,
767 mock1,
768 mock2,
769 mock3,
770 _handles,
771 gas_object_ref,
772 sui_address,
773 sui_token_type_tags,
774 _bridge_pause_tx,
775 ) = setup().await;
776 let (action_certificate, _, _) = get_bridge_authority_approved_action(
777 vec![&mock0, &mock1, &mock2, &mock3],
778 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
779 None,
780 true,
781 );
782 let action = action_certificate.data().clone();
783 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
784 let tx_data = build_sui_transaction(
785 sui_address,
786 &gas_object_ref,
787 action_certificate,
788 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
789 &id_token_map,
790 1000,
791 )
792 .unwrap();
793
794 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
795
796 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
798 gas_coin.clone(),
799 gas_object_ref,
800 Owner::AddressOwner(sui_address),
801 );
802
803 let mut event = SuiEvent::random_for_testing();
805 event.type_ = TokenTransferClaimed.get().unwrap().clone();
806 let events = vec![event];
807 mock_transaction_response(
808 &sui_client_mock,
809 tx_digest,
810 SuiExecutionStatus::Success,
811 Some(events),
812 true,
813 );
814
815 store
816 .insert_pending_actions(std::slice::from_ref(&action))
817 .unwrap();
818 assert_eq!(
819 store.get_all_pending_actions()[&action.digest()],
820 action.clone()
821 );
822
823 submit_to_executor(&signing_tx, action.clone())
825 .await
826 .unwrap();
827
828 tx_subscription.recv().await.unwrap();
830 assert!(store.get_all_pending_actions().is_empty());
831
832 let (action_certificate, _, _) = get_bridge_authority_approved_action(
837 vec![&mock0, &mock1, &mock2, &mock3],
838 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
839 None,
840 true,
841 );
842
843 let action = action_certificate.data().clone();
844
845 let tx_data = build_sui_transaction(
846 sui_address,
847 &gas_object_ref,
848 action_certificate,
849 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
850 &id_token_map,
851 1000,
852 )
853 .unwrap();
854 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
855
856 mock_transaction_response(
858 &sui_client_mock,
859 tx_digest,
860 SuiExecutionStatus::Failure {
861 error: "failure is mother of success".to_string(),
862 },
863 None,
864 true,
865 );
866
867 store
868 .insert_pending_actions(std::slice::from_ref(&action))
869 .unwrap();
870 assert_eq!(
871 store.get_all_pending_actions()[&action.digest()],
872 action.clone()
873 );
874
875 submit_to_executor(&signing_tx, action.clone())
877 .await
878 .unwrap();
879
880 tx_subscription.recv().await.unwrap();
882 assert_eq!(
884 store.get_all_pending_actions()[&action.digest()],
885 action.clone()
886 );
887
888 let (action_certificate, _, _) = get_bridge_authority_approved_action(
893 vec![&mock0, &mock1, &mock2, &mock3],
894 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
895 None,
896 true,
897 );
898
899 let action = action_certificate.data().clone();
900
901 let tx_data = build_sui_transaction(
902 sui_address,
903 &gas_object_ref,
904 action_certificate,
905 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
906 &id_token_map,
907 1000,
908 )
909 .unwrap();
910 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
911 mock_transaction_error(
912 &sui_client_mock,
913 tx_digest,
914 BridgeError::Generic("some random error".to_string()),
915 true,
916 );
917
918 store
919 .insert_pending_actions(std::slice::from_ref(&action))
920 .unwrap();
921 assert_eq!(
922 store.get_all_pending_actions()[&action.digest()],
923 action.clone()
924 );
925
926 submit_to_executor(&signing_tx, action.clone())
928 .await
929 .unwrap();
930
931 let tx_digest = tx_subscription.recv().await.unwrap();
933 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
934
935 assert!(
937 store
938 .get_all_pending_actions()
939 .contains_key(&action.digest())
940 );
941
942 let mut event = SuiEvent::random_for_testing();
944 event.type_ = TokenTransferClaimed.get().unwrap().clone();
945 let events = vec![event];
946 mock_transaction_response(
947 &sui_client_mock,
948 tx_digest,
949 SuiExecutionStatus::Success,
950 Some(events),
951 true,
952 );
953
954 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
956 assert!(
958 !store
959 .get_all_pending_actions()
960 .contains_key(&action.digest())
961 );
962 }
963
964 #[tokio::test]
965 async fn test_signature_aggregation_loop() {
966 let (
967 signing_tx,
968 _execution_tx,
969 sui_client_mock,
970 mut tx_subscription,
971 store,
972 secrets,
973 dummy_sui_key,
974 mock0,
975 mock1,
976 mock2,
977 mock3,
978 _handles,
979 gas_object_ref,
980 sui_address,
981 sui_token_type_tags,
982 _bridge_pause_tx,
983 ) = setup().await;
984 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
985 let (action_certificate, sui_tx_digest, sui_tx_event_index) =
986 get_bridge_authority_approved_action(
987 vec![&mock0, &mock1, &mock2, &mock3],
988 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
989 None,
990 true,
991 );
992 let action = action_certificate.data().clone();
993 mock_bridge_authority_signing_errors(
994 vec![&mock0, &mock1, &mock2],
995 sui_tx_digest,
996 sui_tx_event_index,
997 );
998 let mut sigs = mock_bridge_authority_sigs(
999 vec![&mock3],
1000 &action,
1001 vec![&secrets[3]],
1002 sui_tx_digest,
1003 sui_tx_event_index,
1004 );
1005
1006 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1008 gas_coin,
1009 gas_object_ref,
1010 Owner::AddressOwner(sui_address),
1011 );
1012 store
1013 .insert_pending_actions(std::slice::from_ref(&action))
1014 .unwrap();
1015 assert_eq!(
1016 store.get_all_pending_actions()[&action.digest()],
1017 action.clone()
1018 );
1019
1020 submit_to_executor(&signing_tx, action.clone())
1022 .await
1023 .unwrap();
1024
1025 loop {
1027 let requested_times =
1028 mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
1029 if requested_times >= 2 {
1030 break;
1031 }
1032 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1033 }
1034 assert_eq!(
1036 tx_subscription.try_recv().unwrap_err(),
1037 tokio::sync::broadcast::error::TryRecvError::Empty
1038 );
1039 assert_eq!(
1041 store.get_all_pending_actions()[&action.digest()],
1042 action.clone()
1043 );
1044
1045 let sig_from_2 = mock_bridge_authority_sigs(
1047 vec![&mock2],
1048 &action,
1049 vec![&secrets[2]],
1050 sui_tx_digest,
1051 sui_tx_event_index,
1052 );
1053 sigs.extend(sig_from_2);
1054 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1055 action.clone(),
1056 BridgeCommitteeValiditySignInfo { signatures: sigs },
1057 );
1058 let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1059 let tx_data = build_sui_transaction(
1060 sui_address,
1061 &gas_object_ref,
1062 action_certificate,
1063 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1064 &id_token_map,
1065 1000,
1066 )
1067 .unwrap();
1068 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1069
1070 let mut event = SuiEvent::random_for_testing();
1071 event.type_ = TokenTransferClaimed.get().unwrap().clone();
1072 let events = vec![event];
1073 mock_transaction_response(
1074 &sui_client_mock,
1075 tx_digest,
1076 SuiExecutionStatus::Success,
1077 Some(events),
1078 true,
1079 );
1080
1081 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1083 assert!(
1085 !store
1086 .get_all_pending_actions()
1087 .contains_key(&action.digest())
1088 );
1089 }
1090
1091 #[tokio::test]
1092 async fn test_skip_request_signature_if_already_processed_on_chain() {
1093 let (
1094 signing_tx,
1095 _execution_tx,
1096 sui_client_mock,
1097 mut tx_subscription,
1098 store,
1099 _secrets,
1100 _dummy_sui_key,
1101 mock0,
1102 mock1,
1103 mock2,
1104 mock3,
1105 _handles,
1106 _gas_object_ref,
1107 _sui_address,
1108 _sui_token_type_tags,
1109 _bridge_pause_tx,
1110 ) = setup().await;
1111
1112 let sui_tx_digest = TransactionDigest::random();
1113 let sui_tx_event_index = 0;
1114 let action = get_test_sui_to_eth_bridge_action(
1115 Some(sui_tx_digest),
1116 Some(sui_tx_event_index),
1117 None,
1118 None,
1119 None,
1120 None,
1121 None,
1122 );
1123 mock_bridge_authority_signing_errors(
1124 vec![&mock0, &mock1, &mock2, &mock3],
1125 sui_tx_digest,
1126 sui_tx_event_index,
1127 );
1128 store
1129 .insert_pending_actions(std::slice::from_ref(&action))
1130 .unwrap();
1131 assert_eq!(
1132 store.get_all_pending_actions()[&action.digest()],
1133 action.clone()
1134 );
1135
1136 submit_to_executor(&signing_tx, action.clone())
1138 .await
1139 .unwrap();
1140 let action_digest = action.digest();
1141
1142 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1144 tx_subscription.try_recv().unwrap_err();
1145 assert!(store.get_all_pending_actions().contains_key(&action_digest));
1147
1148 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1149
1150 let now = std::time::Instant::now();
1152 while store.get_all_pending_actions().contains_key(&action_digest) {
1153 if now.elapsed().as_secs() > 10 {
1154 panic!("Timeout waiting for action to be removed from WAL");
1155 }
1156 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1157 }
1158 tx_subscription.try_recv().unwrap_err();
1159 }
1160
1161 #[tokio::test]
1162 async fn test_skip_tx_submission_if_already_processed_on_chain() {
1163 let (
1164 _signing_tx,
1165 execution_tx,
1166 sui_client_mock,
1167 mut tx_subscription,
1168 store,
1169 secrets,
1170 dummy_sui_key,
1171 mock0,
1172 mock1,
1173 mock2,
1174 mock3,
1175 _handles,
1176 gas_object_ref,
1177 sui_address,
1178 sui_token_type_tags,
1179 _bridge_pause_tx,
1180 ) = setup().await;
1181 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1182 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1183 vec![&mock0, &mock1, &mock2, &mock3],
1184 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1185 None,
1186 true,
1187 );
1188
1189 let action = action_certificate.data().clone();
1190 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1191 let tx_data = build_sui_transaction(
1192 sui_address,
1193 &gas_object_ref,
1194 action_certificate.clone(),
1195 arg,
1196 &id_token_map,
1197 1000,
1198 )
1199 .unwrap();
1200 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1201 mock_transaction_error(
1202 &sui_client_mock,
1203 tx_digest,
1204 BridgeError::Generic("some random error".to_string()),
1205 true,
1206 );
1207
1208 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1210 gas_coin.clone(),
1211 gas_object_ref,
1212 Owner::AddressOwner(sui_address),
1213 );
1214
1215 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1216
1217 store
1218 .insert_pending_actions(std::slice::from_ref(&action))
1219 .unwrap();
1220 assert_eq!(
1221 store.get_all_pending_actions()[&action.digest()],
1222 action.clone()
1223 );
1224
1225 execution_tx
1227 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1228 .await
1229 .unwrap();
1230
1231 tx_subscription.recv().await.unwrap();
1233
1234 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1236
1237 let now = std::time::Instant::now();
1239 let action_digest = action.digest();
1240 while store.get_all_pending_actions().contains_key(&action_digest) {
1241 if now.elapsed().as_secs() > 10 {
1242 panic!("Timeout waiting for action to be removed from WAL");
1243 }
1244 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1245 }
1246 }
1247
1248 #[tokio::test]
1249 async fn test_skip_tx_submission_if_bridge_is_paused() {
1250 let (
1251 _signing_tx,
1252 execution_tx,
1253 sui_client_mock,
1254 mut tx_subscription,
1255 store,
1256 secrets,
1257 dummy_sui_key,
1258 mock0,
1259 mock1,
1260 mock2,
1261 mock3,
1262 _handles,
1263 gas_object_ref,
1264 sui_address,
1265 sui_token_type_tags,
1266 bridge_pause_tx,
1267 ) = setup().await;
1268 let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1269 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1270 vec![&mock0, &mock1, &mock2, &mock3],
1271 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1272 None,
1273 true,
1274 );
1275
1276 let action = action_certificate.data().clone();
1277 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1278 let tx_data = build_sui_transaction(
1279 sui_address,
1280 &gas_object_ref,
1281 action_certificate.clone(),
1282 arg,
1283 &id_token_map,
1284 1000,
1285 )
1286 .unwrap();
1287 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1288 mock_transaction_error(
1289 &sui_client_mock,
1290 tx_digest,
1291 BridgeError::Generic("some random error".to_string()),
1292 true,
1293 );
1294
1295 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1297 gas_coin.clone(),
1298 gas_object_ref,
1299 Owner::AddressOwner(sui_address),
1300 );
1301 let action_digest = action.digest();
1302 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1303
1304 assert!(!*bridge_pause_tx.borrow());
1306
1307 store
1308 .insert_pending_actions(std::slice::from_ref(&action))
1309 .unwrap();
1310 assert_eq!(
1311 store.get_all_pending_actions()[&action.digest()],
1312 action.clone()
1313 );
1314
1315 execution_tx
1317 .send(CertifiedBridgeActionExecutionWrapper(
1318 action_certificate.clone(),
1319 0,
1320 ))
1321 .await
1322 .unwrap();
1323
1324 tx_subscription.recv().await.unwrap();
1326
1327 bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1329
1330 execution_tx
1332 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1333 .await
1334 .unwrap();
1335
1336 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1337 assert_eq!(
1339 tx_subscription.try_recv().unwrap_err(),
1340 tokio::sync::broadcast::error::TryRecvError::Empty
1341 );
1342 assert_eq!(
1344 store.get_all_pending_actions()[&action_digest],
1345 action.clone()
1346 );
1347 }
1348
1349 #[tokio::test]
1350 async fn test_action_executor_handle_new_token() {
1351 let new_token_id = 255u8; let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1353 let (
1354 _signing_tx,
1355 execution_tx,
1356 sui_client_mock,
1357 mut tx_subscription,
1358 _store,
1359 secrets,
1360 dummy_sui_key,
1361 mock0,
1362 mock1,
1363 mock2,
1364 mock3,
1365 _handles,
1366 gas_object_ref,
1367 sui_address,
1368 sui_token_type_tags,
1369 _bridge_pause_tx,
1370 ) = setup().await;
1371 let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1372 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1373 vec![&mock0, &mock1, &mock2, &mock3],
1374 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1375 Some(new_token_id),
1376 false, );
1378
1379 let action = action_certificate.data().clone();
1380 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1381 let tx_data = build_sui_transaction(
1382 sui_address,
1383 &gas_object_ref,
1384 action_certificate.clone(),
1385 arg,
1386 &maplit::hashmap! {
1387 new_token_id => new_type_tag.clone()
1388 },
1389 1000,
1390 )
1391 .unwrap();
1392 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1393 mock_transaction_error(
1394 &sui_client_mock,
1395 tx_digest,
1396 BridgeError::Generic("some random error".to_string()),
1397 true,
1398 );
1399
1400 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1402 gas_coin.clone(),
1403 gas_object_ref,
1404 Owner::AddressOwner(sui_address),
1405 );
1406 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1407
1408 execution_tx
1410 .send(CertifiedBridgeActionExecutionWrapper(
1411 action_certificate.clone(),
1412 0,
1413 ))
1414 .await
1415 .unwrap();
1416
1417 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1418 assert_eq!(
1420 tx_subscription.try_recv().unwrap_err(),
1421 tokio::sync::broadcast::error::TryRecvError::Empty
1422 );
1423
1424 id_token_map.insert(new_token_id, new_type_tag);
1426 sui_token_type_tags.store(Arc::new(id_token_map));
1427
1428 execution_tx
1430 .send(CertifiedBridgeActionExecutionWrapper(
1431 action_certificate.clone(),
1432 0,
1433 ))
1434 .await
1435 .unwrap();
1436
1437 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1438 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1440 }
1441
1442 fn mock_bridge_authority_sigs(
1443 mocks: Vec<&BridgeRequestMockHandler>,
1444 action: &BridgeAction,
1445 secrets: Vec<&BridgeAuthorityKeyPair>,
1446 sui_tx_digest: TransactionDigest,
1447 sui_tx_event_index: u16,
1448 ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1449 assert_eq!(mocks.len(), secrets.len());
1450 let mut signed_actions = BTreeMap::new();
1451 for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1452 let signed_action = sign_action_with_key(action, secret);
1453 mock.add_sui_event_response(
1454 sui_tx_digest,
1455 sui_tx_event_index,
1456 Ok(signed_action.clone()),
1457 None,
1458 );
1459 signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1460 }
1461 signed_actions
1462 }
1463
1464 fn mock_bridge_authority_signing_errors(
1465 mocks: Vec<&BridgeRequestMockHandler>,
1466 sui_tx_digest: TransactionDigest,
1467 sui_tx_event_index: u16,
1468 ) {
1469 for mock in mocks {
1470 mock.add_sui_event_response(
1471 sui_tx_digest,
1472 sui_tx_event_index,
1473 Err(BridgeError::RestAPIError("small issue".into())),
1474 None,
1475 );
1476 }
1477 }
1478
1479 fn get_bridge_authority_approved_action(
1481 mocks: Vec<&BridgeRequestMockHandler>,
1482 secrets: Vec<&BridgeAuthorityKeyPair>,
1483 token_id: Option<u8>,
1484 sui_to_eth: bool,
1485 ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1486 let sui_tx_digest = TransactionDigest::random();
1487 let sui_tx_event_index = 1;
1488 let action = if sui_to_eth {
1489 get_test_sui_to_eth_bridge_action(
1490 Some(sui_tx_digest),
1491 Some(sui_tx_event_index),
1492 None,
1493 None,
1494 None,
1495 None,
1496 token_id,
1497 )
1498 } else {
1499 get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1500 };
1501
1502 let sigs =
1503 mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1504 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1505 action,
1506 BridgeCommitteeValiditySignInfo { signatures: sigs },
1507 );
1508 (
1509 VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1510 sui_tx_digest,
1511 sui_tx_event_index,
1512 )
1513 }
1514
1515 fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1516 let sig = Signature::new_secure(
1517 &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1518 dummy_sui_key,
1519 );
1520 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1521 *signed_tx.digest()
1522 }
1523
1524 fn mock_transaction_response(
1528 sui_client_mock: &SuiMockClient,
1529 tx_digest: TransactionDigest,
1530 status: SuiExecutionStatus,
1531 events: Option<Vec<SuiEvent>>,
1532 wildcard: bool,
1533 ) {
1534 let response = ExecuteTransactionResult {
1535 status,
1536 events: events.unwrap_or_default(),
1537 };
1538 if wildcard {
1539 sui_client_mock.set_wildcard_transaction_response(Ok(response));
1540 } else {
1541 sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1542 }
1543 }
1544
1545 fn mock_transaction_error(
1546 sui_client_mock: &SuiMockClient,
1547 tx_digest: TransactionDigest,
1548 error: BridgeError,
1549 wildcard: bool,
1550 ) {
1551 if wildcard {
1552 sui_client_mock.set_wildcard_transaction_response(Err(error));
1553 } else {
1554 sui_client_mock.add_transaction_response(tx_digest, Err(error));
1555 }
1556 }
1557
1558 #[allow(clippy::type_complexity)]
1559 async fn setup() -> (
1560 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1561 mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1562 SuiMockClient,
1563 tokio::sync::broadcast::Receiver<TransactionDigest>,
1564 Arc<BridgeOrchestratorTables>,
1565 Vec<BridgeAuthorityKeyPair>,
1566 SuiKeyPair,
1567 BridgeRequestMockHandler,
1568 BridgeRequestMockHandler,
1569 BridgeRequestMockHandler,
1570 BridgeRequestMockHandler,
1571 Vec<tokio::task::JoinHandle<()>>,
1572 ObjectRef,
1573 SuiAddress,
1574 Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1575 tokio::sync::watch::Sender<IsBridgePaused>,
1576 ) {
1577 telemetry_subscribers::init_for_testing();
1578 let registry = Registry::new();
1579 mysten_metrics::init_metrics(®istry);
1580 init_all_struct_tags();
1581
1582 let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1583 let sui_key = SuiKeyPair::from(kp);
1584 let gas_object_ref = random_object_ref();
1585 let temp_dir = tempfile::tempdir().unwrap();
1586 let store = BridgeOrchestratorTables::new(temp_dir.path());
1587 let sui_client_mock = SuiMockClient::default();
1588 let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1589 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1590
1591 let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1594 let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1595
1596 let mock0 = BridgeRequestMockHandler::new();
1597 let mock1 = BridgeRequestMockHandler::new();
1598 let mock2 = BridgeRequestMockHandler::new();
1599 let mock3 = BridgeRequestMockHandler::new();
1600
1601 let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1602 vec![2500, 2500, 2500, 2500],
1603 vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1604 );
1605
1606 let committee = BridgeCommittee::new(authorities).unwrap();
1607
1608 let agg = Arc::new(ArcSwap::new(Arc::new(
1609 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1610 )));
1611 let metrics = Arc::new(BridgeMetrics::new(®istry));
1612 let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1613 let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1614 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1615 let executor = BridgeActionExecutor::new(
1616 sui_client.clone(),
1617 agg.clone(),
1618 store.clone(),
1619 sui_key,
1620 sui_address,
1621 gas_object_ref.0,
1622 sui_token_type_tags.clone(),
1623 bridge_pause_rx,
1624 metrics,
1625 )
1626 .await;
1627
1628 let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1629 handles.extend(executor_handle);
1630
1631 (
1632 signing_tx,
1633 execution_tx,
1634 sui_client_mock,
1635 tx_subscription,
1636 store,
1637 secrets,
1638 dummy_sui_key,
1639 mock0,
1640 mock1,
1641 mock2,
1642 mock3,
1643 handles,
1644 gas_object_ref,
1645 sui_address,
1646 sui_token_type_tags,
1647 bridge_pause_tx,
1648 )
1649 }
1650}