1use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::{
13 SuiExecutionStatus, SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse,
14};
15use sui_types::TypeTag;
16use sui_types::transaction::ObjectArg;
17use sui_types::{
18 base_types::{ObjectID, ObjectRef, SuiAddress},
19 crypto::{Signature, SuiKeyPair},
20 digests::TransactionDigest,
21 gas_coin::GasCoin,
22 object::Owner,
23 transaction::Transaction,
24};
25
26use crate::events::{
27 TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
28 TokenTransferClaimed,
29};
30use crate::metrics::BridgeMetrics;
31use crate::{
32 client::bridge_authority_aggregator::BridgeAuthorityAggregator,
33 error::BridgeError,
34 storage::BridgeOrchestratorTables,
35 sui_client::{SuiClient, SuiClientInner},
36 sui_transaction_builder::build_sui_transaction,
37 types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
38};
39use std::collections::HashMap;
40use std::sync::Arc;
41use tokio::sync::Semaphore;
42use tokio::time::Duration;
43use tracing::{Instrument, error, info, instrument, warn};
44
45pub const CHANNEL_SIZE: usize = 1000;
46pub const SIGNING_CONCURRENCY: usize = 10;
47
48pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
51pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
52
53async fn delay(attempt_times: u64) {
54 let delay_ms = 100 * 2_u64.pow(attempt_times as u32);
55 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
56}
57
58#[derive(Debug)]
59pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
60
61#[derive(Debug)]
62pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
63
64pub trait BridgeActionExecutorTrait {
65 fn run(
66 self,
67 ) -> (
68 Vec<tokio::task::JoinHandle<()>>,
69 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
70 );
71}
72
73pub struct BridgeActionExecutor<C> {
74 sui_client: Arc<SuiClient<C>>,
75 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
76 key: SuiKeyPair,
77 sui_address: SuiAddress,
78 gas_object_id: ObjectID,
79 store: Arc<BridgeOrchestratorTables>,
80 bridge_object_arg: ObjectArg,
81 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
82 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
83 metrics: Arc<BridgeMetrics>,
84}
85
86impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
87where
88 C: SuiClientInner + 'static,
89{
90 fn run(
91 self,
92 ) -> (
93 Vec<tokio::task::JoinHandle<()>>,
94 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
95 ) {
96 let (tasks, sender, _) = self.run_inner();
97 (tasks, sender)
98 }
99}
100
101impl<C> BridgeActionExecutor<C>
102where
103 C: SuiClientInner + 'static,
104{
105 pub async fn new(
106 sui_client: Arc<SuiClient<C>>,
107 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
108 store: Arc<BridgeOrchestratorTables>,
109 key: SuiKeyPair,
110 sui_address: SuiAddress,
111 gas_object_id: ObjectID,
112 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
113 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
114 metrics: Arc<BridgeMetrics>,
115 ) -> Self {
116 let bridge_object_arg = sui_client
117 .get_mutable_bridge_object_arg_must_succeed()
118 .await;
119 Self {
120 sui_client,
121 bridge_auth_agg,
122 store,
123 key,
124 gas_object_id,
125 sui_address,
126 bridge_object_arg,
127 sui_token_type_tags,
128 bridge_pause_rx,
129 metrics,
130 }
131 }
132
133 fn run_inner(
134 self,
135 ) -> (
136 Vec<tokio::task::JoinHandle<()>>,
137 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
138 mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
139 ) {
140 let key = self.key;
141
142 let (sender, receiver) = mysten_metrics::metered_channel::channel(
143 CHANNEL_SIZE,
144 &mysten_metrics::get_metrics()
145 .unwrap()
146 .channel_inflight
147 .with_label_values(&["executor_signing_queue"]),
148 );
149
150 let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
151 CHANNEL_SIZE,
152 &mysten_metrics::get_metrics()
153 .unwrap()
154 .channel_inflight
155 .with_label_values(&["executor_execution_queue"]),
156 );
157 let execution_tx_clone = execution_tx.clone();
158 let sender_clone = sender.clone();
159 let store_clone = self.store.clone();
160 let client_clone = self.sui_client.clone();
161 let mut tasks = vec![];
162 let metrics = self.metrics.clone();
163 tasks.push(spawn_logged_monitored_task!(
164 Self::run_signature_aggregation_loop(
165 client_clone,
166 self.bridge_auth_agg,
167 store_clone,
168 sender_clone,
169 receiver,
170 execution_tx_clone,
171 metrics,
172 )
173 ));
174
175 let metrics = self.metrics.clone();
176 let execution_tx_clone = execution_tx.clone();
177 tasks.push(spawn_logged_monitored_task!(
178 Self::run_onchain_execution_loop(
179 self.sui_client.clone(),
180 key,
181 self.sui_address,
182 self.gas_object_id,
183 self.store.clone(),
184 execution_tx_clone,
185 execution_rx,
186 self.bridge_object_arg,
187 self.sui_token_type_tags,
188 self.bridge_pause_rx,
189 metrics,
190 )
191 ));
192 (tasks, sender, execution_tx)
193 }
194
195 async fn run_signature_aggregation_loop(
196 sui_client: Arc<SuiClient<C>>,
197 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
198 store: Arc<BridgeOrchestratorTables>,
199 signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
200 mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
201 BridgeActionExecutionWrapper,
202 >,
203 execution_queue_sender: mysten_metrics::metered_channel::Sender<
204 CertifiedBridgeActionExecutionWrapper,
205 >,
206 metrics: Arc<BridgeMetrics>,
207 ) {
208 info!("Starting run_signature_aggregation_loop");
209 let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
210 while let Some(action) = signing_queue_receiver.recv().await {
211 Self::handle_signing_task(
212 &semaphore,
213 &auth_agg,
214 &signing_queue_sender,
215 &execution_queue_sender,
216 &sui_client,
217 &store,
218 action,
219 &metrics,
220 )
221 .await;
222 }
223 }
224
225 async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
226 let Ok(Ok(is_paused)) =
227 retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
228 else {
229 error!("Failed to get bridge status after retry");
230 return false;
231 };
232 !is_paused
233 }
234
235 #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
236 async fn handle_signing_task(
237 semaphore: &Arc<Semaphore>,
238 auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
239 signing_queue_sender: &mysten_metrics::metered_channel::Sender<
240 BridgeActionExecutionWrapper,
241 >,
242 execution_queue_sender: &mysten_metrics::metered_channel::Sender<
243 CertifiedBridgeActionExecutionWrapper,
244 >,
245 sui_client: &Arc<SuiClient<C>>,
246 store: &Arc<BridgeOrchestratorTables>,
247 action: BridgeActionExecutionWrapper,
248 metrics: &Arc<BridgeMetrics>,
249 ) {
250 metrics.action_executor_signing_queue_received_actions.inc();
251 let action_key = action.0.key();
252 info!("Received action for signing: {:?}", action.0);
253
254 let should_proceed = Self::should_proceed_signing(sui_client).await;
259 if !should_proceed {
260 metrics.action_executor_signing_queue_skipped_actions.inc();
261 warn!("skipping signing task: {:?}", action_key);
262 return;
263 }
264
265 let auth_agg_clone = auth_agg.clone();
266 let signing_queue_sender_clone = signing_queue_sender.clone();
267 let execution_queue_sender_clone = execution_queue_sender.clone();
268 let sui_client_clone = sui_client.clone();
269 let store_clone = store.clone();
270 let metrics_clone = metrics.clone();
271 let semaphore_clone = semaphore.clone();
272 spawn_logged_monitored_task!(
273 Self::request_signatures(
274 semaphore_clone,
275 sui_client_clone,
276 auth_agg_clone,
277 action,
278 store_clone,
279 signing_queue_sender_clone,
280 execution_queue_sender_clone,
281 metrics_clone,
282 )
283 .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
284 "request_signatures"
285 );
286 }
287
288 async fn handle_already_processed_token_transfer_action_maybe(
292 sui_client: &Arc<SuiClient<C>>,
293 action: &BridgeAction,
294 store: &Arc<BridgeOrchestratorTables>,
295 metrics: &Arc<BridgeMetrics>,
296 ) -> bool {
297 let status = sui_client
298 .get_token_transfer_action_onchain_status_until_success(
299 action.chain_id() as u8,
300 action.seq_number(),
301 )
302 .await;
303 match status {
304 BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
305 info!(
306 "Action already approved or claimed, removing action from pending logs: {:?}",
307 action
308 );
309 metrics.action_executor_already_processed_actions.inc();
310 store
311 .remove_pending_actions(&[action.digest()])
312 .unwrap_or_else(|e| {
313 panic!("Write to DB should not fail: {:?}", e);
314 });
315 true
316 }
317 BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
320 }
321 }
322
323 async fn request_signatures(
326 semaphore: Arc<Semaphore>,
327 sui_client: Arc<SuiClient<C>>,
328 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
329 action: BridgeActionExecutionWrapper,
330 store: Arc<BridgeOrchestratorTables>,
331 signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
332 execution_queue_sender: mysten_metrics::metered_channel::Sender<
333 CertifiedBridgeActionExecutionWrapper,
334 >,
335 metrics: Arc<BridgeMetrics>,
336 ) {
337 let _permit = semaphore
338 .acquire()
339 .await
340 .expect("semaphore should not be closed");
341 info!("requesting signatures");
342 let BridgeActionExecutionWrapper(action, attempt_times) = action;
343
344 match &action {
346 BridgeAction::SuiToEthBridgeAction(_)
347 | BridgeAction::SuiToEthTokenTransfer(_)
348 | BridgeAction::EthToSuiBridgeAction(_) => (),
349 _ => unreachable!("Non token transfer action should not reach here"),
350 };
351
352 if Self::handle_already_processed_token_transfer_action_maybe(
354 &sui_client,
355 &action,
356 &store,
357 &metrics,
358 )
359 .await
360 {
361 return;
362 }
363 match auth_agg
364 .load()
365 .request_committee_signatures(action.clone())
366 .await
367 {
368 Ok(certificate) => {
369 info!("Sending certificate to execution");
370 execution_queue_sender
371 .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
372 .await
373 .unwrap_or_else(|e| {
374 panic!("Sending to execution queue should not fail: {:?}", e);
375 });
376 }
377 Err(e) => {
378 warn!("Failed to collect sigs for bridge action: {:?}", e);
379 metrics.err_signature_aggregation.inc();
380
381 if attempt_times >= MAX_SIGNING_ATTEMPTS {
383 metrics.err_signature_aggregation_too_many_failures.inc();
384 error!(
385 "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
386 e
387 );
388 return;
389 }
390 delay(attempt_times).await;
391 signing_queue_sender
392 .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
393 .await
394 .unwrap_or_else(|e| {
395 panic!("Sending to signing queue should not fail: {:?}", e);
396 });
397 }
398 }
399 }
400
401 async fn run_onchain_execution_loop(
404 sui_client: Arc<SuiClient<C>>,
405 sui_key: SuiKeyPair,
406 sui_address: SuiAddress,
407 gas_object_id: ObjectID,
408 store: Arc<BridgeOrchestratorTables>,
409 execution_queue_sender: mysten_metrics::metered_channel::Sender<
410 CertifiedBridgeActionExecutionWrapper,
411 >,
412 mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
413 CertifiedBridgeActionExecutionWrapper,
414 >,
415 bridge_object_arg: ObjectArg,
416 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
417 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
418 metrics: Arc<BridgeMetrics>,
419 ) {
420 info!("Starting run_onchain_execution_loop");
421 while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
422 if *bridge_pause_rx.borrow() {
426 warn!("Bridge is paused, skipping execution");
427 metrics
428 .action_executor_execution_queue_skipped_actions_due_to_pausing
429 .inc();
430 continue;
431 }
432 Self::handle_execution_task(
433 certificate_wrapper,
434 &sui_client,
435 &sui_key,
436 &sui_address,
437 gas_object_id,
438 &store,
439 &execution_queue_sender,
440 &bridge_object_arg,
441 &sui_token_type_tags,
442 &metrics,
443 )
444 .await;
445 }
446 panic!("Execution queue closed unexpectedly");
447 }
448
449 #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
450 async fn handle_execution_task(
451 certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
452 sui_client: &Arc<SuiClient<C>>,
453 sui_key: &SuiKeyPair,
454 sui_address: &SuiAddress,
455 gas_object_id: ObjectID,
456 store: &Arc<BridgeOrchestratorTables>,
457 execution_queue_sender: &mysten_metrics::metered_channel::Sender<
458 CertifiedBridgeActionExecutionWrapper,
459 >,
460 bridge_object_arg: &ObjectArg,
461 sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
462 metrics: &Arc<BridgeMetrics>,
463 ) {
464 metrics
465 .action_executor_execution_queue_received_actions
466 .inc();
467 let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
468 let action = certificate.data();
469 let action_key = action.key();
470
471 info!("Received certified action for execution: {:?}", action);
472
473 let (gas_coin, gas_object_ref) =
475 Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
476 metrics.gas_coin_balance.set(gas_coin.value() as i64);
477
478 let ceriticate_clone = certificate.clone();
479
480 if Self::handle_already_processed_token_transfer_action_maybe(
482 sui_client, action, store, metrics,
483 )
484 .await
485 {
486 info!("Action already processed, skipping");
487 return;
488 }
489
490 info!("Building Sui transaction");
491 let rgp = sui_client.get_reference_gas_price_until_success().await;
492 let tx_data = match build_sui_transaction(
493 *sui_address,
494 &gas_object_ref,
495 ceriticate_clone,
496 *bridge_object_arg,
497 sui_token_type_tags.load().as_ref(),
498 rgp,
499 ) {
500 Ok(tx_data) => tx_data,
501 Err(err) => {
502 metrics.err_build_sui_transaction.inc();
503 error!(
504 "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
505 action, err
506 );
507 return;
510 }
511 };
512 let sig = Signature::new_secure(
513 &IntentMessage::new(Intent::sui_transaction(), &tx_data),
514 sui_key,
515 );
516 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
517 let tx_digest = *signed_tx.digest();
518
519 if Self::handle_already_processed_token_transfer_action_maybe(
521 sui_client, action, store, metrics,
522 )
523 .await
524 {
525 info!("Action already processed, skipping");
526 return;
527 }
528
529 info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
530 match sui_client
531 .execute_transaction_block_with_effects(signed_tx)
532 .await
533 {
534 Ok(resp) => {
535 Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
536 }
537
538 Err(err) => {
540 error!(
541 ?action_key,
542 ?tx_digest,
543 "Sui transaction failed at signing: {err:?}"
544 );
545 metrics.err_sui_transaction_submission.inc();
546 let metrics_clone = metrics.clone();
547 let sender_clone = execution_queue_sender.clone();
549 spawn_logged_monitored_task!(async move {
550 if attempt_times >= MAX_EXECUTION_ATTEMPTS {
552 metrics_clone
553 .err_sui_transaction_submission_too_many_failures
554 .inc();
555 error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
556 return;
557 }
558 delay(attempt_times).await;
559 sender_clone
560 .send(CertifiedBridgeActionExecutionWrapper(
561 certificate,
562 attempt_times + 1,
563 ))
564 .await
565 .unwrap_or_else(|e| {
566 panic!("Sending to execution queue should not fail: {:?}", e);
567 });
568 info!("Re-enqueued certificate for execution");
569 }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
570 }
571 }
572 }
573
574 async fn handle_execution_effects(
576 tx_digest: TransactionDigest,
577 response: SuiTransactionBlockResponse,
578 store: &Arc<BridgeOrchestratorTables>,
579 action: &BridgeAction,
580 metrics: &Arc<BridgeMetrics>,
581 ) {
582 let effects = response
583 .effects
584 .clone()
585 .expect("We requested effects but got None.");
586 let status = effects.status();
587 match status {
588 SuiExecutionStatus::Success => {
589 let events = response.events.expect("We requested events but got None.");
590 let relevant_events = events
591 .data
592 .iter()
593 .filter(|e| {
594 e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
595 || e.type_ == *TokenTransferClaimed.get().unwrap()
596 || e.type_ == *TokenTransferApproved.get().unwrap()
597 || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
598 })
599 .collect::<Vec<_>>();
600 assert!(
601 !relevant_events.is_empty(),
602 "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
603 or TokenTransferAlreadyApproved event but got: {:?}",
604 events
605 );
606 info!(?tx_digest, "Sui transaction executed successfully");
607 relevant_events.iter().for_each(|e| {
609 if e.type_ == *TokenTransferClaimed.get().unwrap() {
610 match action {
611 BridgeAction::EthToSuiBridgeAction(_) => {
612 metrics.eth_sui_token_transfer_claimed.inc();
613 }
614 BridgeAction::SuiToEthBridgeAction(_) => {
615 metrics.sui_eth_token_transfer_claimed.inc();
616 }
617 _ => error!("Unexpected action type for claimed event: {:?}", action),
618 }
619 } else if e.type_ == *TokenTransferApproved.get().unwrap() {
620 match action {
621 BridgeAction::EthToSuiBridgeAction(_) => {
622 metrics.eth_sui_token_transfer_approved.inc();
623 }
624 BridgeAction::SuiToEthBridgeAction(_) => {
625 metrics.sui_eth_token_transfer_approved.inc();
626 }
627 _ => error!("Unexpected action type for approved event: {:?}", action),
628 }
629 }
630 });
631 store
632 .remove_pending_actions(&[action.digest()])
633 .unwrap_or_else(|e| {
634 panic!("Write to DB should not fail: {:?}", e);
635 })
636 }
637 SuiExecutionStatus::Failure { error } => {
638 metrics.err_sui_transaction_execution.inc();
645 error!(
646 ?tx_digest,
647 "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
648 );
649 }
650 }
651 }
652
653 async fn get_gas_data_assert_ownership(
655 sui_address: SuiAddress,
656 gas_object_id: ObjectID,
657 sui_client: &SuiClient<C>,
658 ) -> (GasCoin, ObjectRef) {
659 let (gas_coin, gas_obj_ref, owner) = sui_client
660 .get_gas_data_panic_if_not_gas(gas_object_id)
661 .await;
662
663 assert_eq!(
666 owner,
667 Owner::AddressOwner(sui_address),
668 "Gas object {:?} is no longer owned by address {}",
669 gas_object_id,
670 sui_address
671 );
672 (gas_coin, gas_obj_ref)
673 }
674}
675
676pub async fn submit_to_executor(
677 tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
678 action: BridgeAction,
679) -> Result<(), BridgeError> {
680 tx.send(BridgeActionExecutionWrapper(action, 0))
681 .await
682 .map_err(|e| BridgeError::Generic(e.to_string()))
683}
684
685#[cfg(test)]
686mod tests {
687 use crate::events::init_all_struct_tags;
688 use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
689 use crate::types::BRIDGE_PAUSED;
690 use fastcrypto::traits::KeyPair;
691 use prometheus::Registry;
692 use std::collections::{BTreeMap, HashMap};
693 use std::str::FromStr;
694 use sui_json_rpc_types::SuiTransactionBlockEffects;
695 use sui_json_rpc_types::SuiTransactionBlockEvents;
696 use sui_json_rpc_types::{SuiEvent, SuiTransactionBlockResponse};
697 use sui_types::TypeTag;
698 use sui_types::crypto::get_key_pair;
699 use sui_types::gas_coin::GasCoin;
700 use sui_types::{base_types::random_object_ref, transaction::TransactionData};
701
702 use crate::{
703 crypto::{
704 BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
705 BridgeAuthorityRecoverableSignature,
706 },
707 server::mock_handler::BridgeRequestMockHandler,
708 sui_mock_client::SuiMockClient,
709 test_utils::{
710 get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
711 get_test_sui_to_eth_bridge_action, sign_action_with_key,
712 },
713 types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
714 };
715
716 use super::*;
717
718 #[tokio::test]
719 async fn test_onchain_execution_loop() {
720 let (
721 signing_tx,
722 _execution_tx,
723 sui_client_mock,
724 mut tx_subscription,
725 store,
726 secrets,
727 dummy_sui_key,
728 mock0,
729 mock1,
730 mock2,
731 mock3,
732 _handles,
733 gas_object_ref,
734 sui_address,
735 sui_token_type_tags,
736 _bridge_pause_tx,
737 ) = setup().await;
738 let (action_certificate, _, _) = get_bridge_authority_approved_action(
739 vec![&mock0, &mock1, &mock2, &mock3],
740 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
741 None,
742 true,
743 );
744 let action = action_certificate.data().clone();
745 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
746 let tx_data = build_sui_transaction(
747 sui_address,
748 &gas_object_ref,
749 action_certificate,
750 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
751 &id_token_map,
752 1000,
753 )
754 .unwrap();
755
756 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
757
758 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
760 gas_coin.clone(),
761 gas_object_ref,
762 Owner::AddressOwner(sui_address),
763 );
764
765 let mut event = SuiEvent::random_for_testing();
767 event.type_ = TokenTransferClaimed.get().unwrap().clone();
768 let events = vec![event];
769 mock_transaction_response(
770 &sui_client_mock,
771 tx_digest,
772 SuiExecutionStatus::Success,
773 Some(events),
774 true,
775 );
776
777 store
778 .insert_pending_actions(std::slice::from_ref(&action))
779 .unwrap();
780 assert_eq!(
781 store.get_all_pending_actions()[&action.digest()],
782 action.clone()
783 );
784
785 submit_to_executor(&signing_tx, action.clone())
787 .await
788 .unwrap();
789
790 tx_subscription.recv().await.unwrap();
792 assert!(store.get_all_pending_actions().is_empty());
793
794 let (action_certificate, _, _) = get_bridge_authority_approved_action(
799 vec![&mock0, &mock1, &mock2, &mock3],
800 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
801 None,
802 true,
803 );
804
805 let action = action_certificate.data().clone();
806
807 let tx_data = build_sui_transaction(
808 sui_address,
809 &gas_object_ref,
810 action_certificate,
811 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
812 &id_token_map,
813 1000,
814 )
815 .unwrap();
816 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
817
818 mock_transaction_response(
820 &sui_client_mock,
821 tx_digest,
822 SuiExecutionStatus::Failure {
823 error: "failure is mother of success".to_string(),
824 },
825 None,
826 true,
827 );
828
829 store
830 .insert_pending_actions(std::slice::from_ref(&action))
831 .unwrap();
832 assert_eq!(
833 store.get_all_pending_actions()[&action.digest()],
834 action.clone()
835 );
836
837 submit_to_executor(&signing_tx, action.clone())
839 .await
840 .unwrap();
841
842 tx_subscription.recv().await.unwrap();
844 assert_eq!(
846 store.get_all_pending_actions()[&action.digest()],
847 action.clone()
848 );
849
850 let (action_certificate, _, _) = get_bridge_authority_approved_action(
855 vec![&mock0, &mock1, &mock2, &mock3],
856 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
857 None,
858 true,
859 );
860
861 let action = action_certificate.data().clone();
862
863 let tx_data = build_sui_transaction(
864 sui_address,
865 &gas_object_ref,
866 action_certificate,
867 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
868 &id_token_map,
869 1000,
870 )
871 .unwrap();
872 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
873 mock_transaction_error(
874 &sui_client_mock,
875 tx_digest,
876 BridgeError::Generic("some random error".to_string()),
877 true,
878 );
879
880 store
881 .insert_pending_actions(std::slice::from_ref(&action))
882 .unwrap();
883 assert_eq!(
884 store.get_all_pending_actions()[&action.digest()],
885 action.clone()
886 );
887
888 submit_to_executor(&signing_tx, action.clone())
890 .await
891 .unwrap();
892
893 let tx_digest = tx_subscription.recv().await.unwrap();
895 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
896
897 assert!(
899 store
900 .get_all_pending_actions()
901 .contains_key(&action.digest())
902 );
903
904 let mut event = SuiEvent::random_for_testing();
906 event.type_ = TokenTransferClaimed.get().unwrap().clone();
907 let events = vec![event];
908 mock_transaction_response(
909 &sui_client_mock,
910 tx_digest,
911 SuiExecutionStatus::Success,
912 Some(events),
913 true,
914 );
915
916 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
918 assert!(
920 !store
921 .get_all_pending_actions()
922 .contains_key(&action.digest())
923 );
924 }
925
926 #[tokio::test]
927 async fn test_signature_aggregation_loop() {
928 let (
929 signing_tx,
930 _execution_tx,
931 sui_client_mock,
932 mut tx_subscription,
933 store,
934 secrets,
935 dummy_sui_key,
936 mock0,
937 mock1,
938 mock2,
939 mock3,
940 _handles,
941 gas_object_ref,
942 sui_address,
943 sui_token_type_tags,
944 _bridge_pause_tx,
945 ) = setup().await;
946 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
947 let (action_certificate, sui_tx_digest, sui_tx_event_index) =
948 get_bridge_authority_approved_action(
949 vec![&mock0, &mock1, &mock2, &mock3],
950 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
951 None,
952 true,
953 );
954 let action = action_certificate.data().clone();
955 mock_bridge_authority_signing_errors(
956 vec![&mock0, &mock1, &mock2],
957 sui_tx_digest,
958 sui_tx_event_index,
959 );
960 let mut sigs = mock_bridge_authority_sigs(
961 vec![&mock3],
962 &action,
963 vec![&secrets[3]],
964 sui_tx_digest,
965 sui_tx_event_index,
966 );
967
968 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
970 gas_coin,
971 gas_object_ref,
972 Owner::AddressOwner(sui_address),
973 );
974 store
975 .insert_pending_actions(std::slice::from_ref(&action))
976 .unwrap();
977 assert_eq!(
978 store.get_all_pending_actions()[&action.digest()],
979 action.clone()
980 );
981
982 submit_to_executor(&signing_tx, action.clone())
984 .await
985 .unwrap();
986
987 loop {
989 let requested_times =
990 mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
991 if requested_times >= 2 {
992 break;
993 }
994 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
995 }
996 assert_eq!(
998 tx_subscription.try_recv().unwrap_err(),
999 tokio::sync::broadcast::error::TryRecvError::Empty
1000 );
1001 assert_eq!(
1003 store.get_all_pending_actions()[&action.digest()],
1004 action.clone()
1005 );
1006
1007 let sig_from_2 = mock_bridge_authority_sigs(
1009 vec![&mock2],
1010 &action,
1011 vec![&secrets[2]],
1012 sui_tx_digest,
1013 sui_tx_event_index,
1014 );
1015 sigs.extend(sig_from_2);
1016 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1017 action.clone(),
1018 BridgeCommitteeValiditySignInfo { signatures: sigs },
1019 );
1020 let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1021 let tx_data = build_sui_transaction(
1022 sui_address,
1023 &gas_object_ref,
1024 action_certificate,
1025 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1026 &id_token_map,
1027 1000,
1028 )
1029 .unwrap();
1030 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1031
1032 let mut event = SuiEvent::random_for_testing();
1033 event.type_ = TokenTransferClaimed.get().unwrap().clone();
1034 let events = vec![event];
1035 mock_transaction_response(
1036 &sui_client_mock,
1037 tx_digest,
1038 SuiExecutionStatus::Success,
1039 Some(events),
1040 true,
1041 );
1042
1043 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1045 assert!(
1047 !store
1048 .get_all_pending_actions()
1049 .contains_key(&action.digest())
1050 );
1051 }
1052
1053 #[tokio::test]
1054 async fn test_skip_request_signature_if_already_processed_on_chain() {
1055 let (
1056 signing_tx,
1057 _execution_tx,
1058 sui_client_mock,
1059 mut tx_subscription,
1060 store,
1061 _secrets,
1062 _dummy_sui_key,
1063 mock0,
1064 mock1,
1065 mock2,
1066 mock3,
1067 _handles,
1068 _gas_object_ref,
1069 _sui_address,
1070 _sui_token_type_tags,
1071 _bridge_pause_tx,
1072 ) = setup().await;
1073
1074 let sui_tx_digest = TransactionDigest::random();
1075 let sui_tx_event_index = 0;
1076 let action = get_test_sui_to_eth_bridge_action(
1077 Some(sui_tx_digest),
1078 Some(sui_tx_event_index),
1079 None,
1080 None,
1081 None,
1082 None,
1083 None,
1084 );
1085 mock_bridge_authority_signing_errors(
1086 vec![&mock0, &mock1, &mock2, &mock3],
1087 sui_tx_digest,
1088 sui_tx_event_index,
1089 );
1090 store
1091 .insert_pending_actions(std::slice::from_ref(&action))
1092 .unwrap();
1093 assert_eq!(
1094 store.get_all_pending_actions()[&action.digest()],
1095 action.clone()
1096 );
1097
1098 submit_to_executor(&signing_tx, action.clone())
1100 .await
1101 .unwrap();
1102 let action_digest = action.digest();
1103
1104 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1106 tx_subscription.try_recv().unwrap_err();
1107 assert!(store.get_all_pending_actions().contains_key(&action_digest));
1109
1110 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1111
1112 let now = std::time::Instant::now();
1114 while store.get_all_pending_actions().contains_key(&action_digest) {
1115 if now.elapsed().as_secs() > 10 {
1116 panic!("Timeout waiting for action to be removed from WAL");
1117 }
1118 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1119 }
1120 tx_subscription.try_recv().unwrap_err();
1121 }
1122
1123 #[tokio::test]
1124 async fn test_skip_tx_submission_if_already_processed_on_chain() {
1125 let (
1126 _signing_tx,
1127 execution_tx,
1128 sui_client_mock,
1129 mut tx_subscription,
1130 store,
1131 secrets,
1132 dummy_sui_key,
1133 mock0,
1134 mock1,
1135 mock2,
1136 mock3,
1137 _handles,
1138 gas_object_ref,
1139 sui_address,
1140 sui_token_type_tags,
1141 _bridge_pause_tx,
1142 ) = setup().await;
1143 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1144 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1145 vec![&mock0, &mock1, &mock2, &mock3],
1146 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1147 None,
1148 true,
1149 );
1150
1151 let action = action_certificate.data().clone();
1152 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1153 let tx_data = build_sui_transaction(
1154 sui_address,
1155 &gas_object_ref,
1156 action_certificate.clone(),
1157 arg,
1158 &id_token_map,
1159 1000,
1160 )
1161 .unwrap();
1162 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1163 mock_transaction_error(
1164 &sui_client_mock,
1165 tx_digest,
1166 BridgeError::Generic("some random error".to_string()),
1167 true,
1168 );
1169
1170 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1172 gas_coin.clone(),
1173 gas_object_ref,
1174 Owner::AddressOwner(sui_address),
1175 );
1176
1177 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1178
1179 store
1180 .insert_pending_actions(std::slice::from_ref(&action))
1181 .unwrap();
1182 assert_eq!(
1183 store.get_all_pending_actions()[&action.digest()],
1184 action.clone()
1185 );
1186
1187 execution_tx
1189 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1190 .await
1191 .unwrap();
1192
1193 tx_subscription.recv().await.unwrap();
1195
1196 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1198
1199 let now = std::time::Instant::now();
1201 let action_digest = action.digest();
1202 while store.get_all_pending_actions().contains_key(&action_digest) {
1203 if now.elapsed().as_secs() > 10 {
1204 panic!("Timeout waiting for action to be removed from WAL");
1205 }
1206 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1207 }
1208 }
1209
1210 #[tokio::test]
1211 async fn test_skip_tx_submission_if_bridge_is_paused() {
1212 let (
1213 _signing_tx,
1214 execution_tx,
1215 sui_client_mock,
1216 mut tx_subscription,
1217 store,
1218 secrets,
1219 dummy_sui_key,
1220 mock0,
1221 mock1,
1222 mock2,
1223 mock3,
1224 _handles,
1225 gas_object_ref,
1226 sui_address,
1227 sui_token_type_tags,
1228 bridge_pause_tx,
1229 ) = setup().await;
1230 let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1231 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1232 vec![&mock0, &mock1, &mock2, &mock3],
1233 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1234 None,
1235 true,
1236 );
1237
1238 let action = action_certificate.data().clone();
1239 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1240 let tx_data = build_sui_transaction(
1241 sui_address,
1242 &gas_object_ref,
1243 action_certificate.clone(),
1244 arg,
1245 &id_token_map,
1246 1000,
1247 )
1248 .unwrap();
1249 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1250 mock_transaction_error(
1251 &sui_client_mock,
1252 tx_digest,
1253 BridgeError::Generic("some random error".to_string()),
1254 true,
1255 );
1256
1257 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1259 gas_coin.clone(),
1260 gas_object_ref,
1261 Owner::AddressOwner(sui_address),
1262 );
1263 let action_digest = action.digest();
1264 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1265
1266 assert!(!*bridge_pause_tx.borrow());
1268
1269 store
1270 .insert_pending_actions(std::slice::from_ref(&action))
1271 .unwrap();
1272 assert_eq!(
1273 store.get_all_pending_actions()[&action.digest()],
1274 action.clone()
1275 );
1276
1277 execution_tx
1279 .send(CertifiedBridgeActionExecutionWrapper(
1280 action_certificate.clone(),
1281 0,
1282 ))
1283 .await
1284 .unwrap();
1285
1286 tx_subscription.recv().await.unwrap();
1288
1289 bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1291
1292 execution_tx
1294 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1295 .await
1296 .unwrap();
1297
1298 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1299 assert_eq!(
1301 tx_subscription.try_recv().unwrap_err(),
1302 tokio::sync::broadcast::error::TryRecvError::Empty
1303 );
1304 assert_eq!(
1306 store.get_all_pending_actions()[&action_digest],
1307 action.clone()
1308 );
1309 }
1310
1311 #[tokio::test]
1312 async fn test_action_executor_handle_new_token() {
1313 let new_token_id = 255u8; let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1315 let (
1316 _signing_tx,
1317 execution_tx,
1318 sui_client_mock,
1319 mut tx_subscription,
1320 _store,
1321 secrets,
1322 dummy_sui_key,
1323 mock0,
1324 mock1,
1325 mock2,
1326 mock3,
1327 _handles,
1328 gas_object_ref,
1329 sui_address,
1330 sui_token_type_tags,
1331 _bridge_pause_tx,
1332 ) = setup().await;
1333 let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1334 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1335 vec![&mock0, &mock1, &mock2, &mock3],
1336 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1337 Some(new_token_id),
1338 false, );
1340
1341 let action = action_certificate.data().clone();
1342 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1343 let tx_data = build_sui_transaction(
1344 sui_address,
1345 &gas_object_ref,
1346 action_certificate.clone(),
1347 arg,
1348 &maplit::hashmap! {
1349 new_token_id => new_type_tag.clone()
1350 },
1351 1000,
1352 )
1353 .unwrap();
1354 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1355 mock_transaction_error(
1356 &sui_client_mock,
1357 tx_digest,
1358 BridgeError::Generic("some random error".to_string()),
1359 true,
1360 );
1361
1362 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1364 gas_coin.clone(),
1365 gas_object_ref,
1366 Owner::AddressOwner(sui_address),
1367 );
1368 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1369
1370 execution_tx
1372 .send(CertifiedBridgeActionExecutionWrapper(
1373 action_certificate.clone(),
1374 0,
1375 ))
1376 .await
1377 .unwrap();
1378
1379 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1380 assert_eq!(
1382 tx_subscription.try_recv().unwrap_err(),
1383 tokio::sync::broadcast::error::TryRecvError::Empty
1384 );
1385
1386 id_token_map.insert(new_token_id, new_type_tag);
1388 sui_token_type_tags.store(Arc::new(id_token_map));
1389
1390 execution_tx
1392 .send(CertifiedBridgeActionExecutionWrapper(
1393 action_certificate.clone(),
1394 0,
1395 ))
1396 .await
1397 .unwrap();
1398
1399 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1400 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1402 }
1403
1404 fn mock_bridge_authority_sigs(
1405 mocks: Vec<&BridgeRequestMockHandler>,
1406 action: &BridgeAction,
1407 secrets: Vec<&BridgeAuthorityKeyPair>,
1408 sui_tx_digest: TransactionDigest,
1409 sui_tx_event_index: u16,
1410 ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1411 assert_eq!(mocks.len(), secrets.len());
1412 let mut signed_actions = BTreeMap::new();
1413 for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1414 let signed_action = sign_action_with_key(action, secret);
1415 mock.add_sui_event_response(
1416 sui_tx_digest,
1417 sui_tx_event_index,
1418 Ok(signed_action.clone()),
1419 None,
1420 );
1421 signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1422 }
1423 signed_actions
1424 }
1425
1426 fn mock_bridge_authority_signing_errors(
1427 mocks: Vec<&BridgeRequestMockHandler>,
1428 sui_tx_digest: TransactionDigest,
1429 sui_tx_event_index: u16,
1430 ) {
1431 for mock in mocks {
1432 mock.add_sui_event_response(
1433 sui_tx_digest,
1434 sui_tx_event_index,
1435 Err(BridgeError::RestAPIError("small issue".into())),
1436 None,
1437 );
1438 }
1439 }
1440
1441 fn get_bridge_authority_approved_action(
1443 mocks: Vec<&BridgeRequestMockHandler>,
1444 secrets: Vec<&BridgeAuthorityKeyPair>,
1445 token_id: Option<u8>,
1446 sui_to_eth: bool,
1447 ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1448 let sui_tx_digest = TransactionDigest::random();
1449 let sui_tx_event_index = 1;
1450 let action = if sui_to_eth {
1451 get_test_sui_to_eth_bridge_action(
1452 Some(sui_tx_digest),
1453 Some(sui_tx_event_index),
1454 None,
1455 None,
1456 None,
1457 None,
1458 token_id,
1459 )
1460 } else {
1461 get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1462 };
1463
1464 let sigs =
1465 mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1466 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1467 action,
1468 BridgeCommitteeValiditySignInfo { signatures: sigs },
1469 );
1470 (
1471 VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1472 sui_tx_digest,
1473 sui_tx_event_index,
1474 )
1475 }
1476
1477 fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1478 let sig = Signature::new_secure(
1479 &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1480 dummy_sui_key,
1481 );
1482 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1483 *signed_tx.digest()
1484 }
1485
1486 fn mock_transaction_response(
1490 sui_client_mock: &SuiMockClient,
1491 tx_digest: TransactionDigest,
1492 status: SuiExecutionStatus,
1493 events: Option<Vec<SuiEvent>>,
1494 wildcard: bool,
1495 ) {
1496 let mut response = SuiTransactionBlockResponse::new(tx_digest);
1497 let effects = SuiTransactionBlockEffects::new_for_testing(tx_digest, status);
1498 if let Some(events) = events {
1499 response.events = Some(SuiTransactionBlockEvents { data: events });
1500 }
1501 response.effects = Some(effects);
1502 if wildcard {
1503 sui_client_mock.set_wildcard_transaction_response(Ok(response));
1504 } else {
1505 sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1506 }
1507 }
1508
1509 fn mock_transaction_error(
1510 sui_client_mock: &SuiMockClient,
1511 tx_digest: TransactionDigest,
1512 error: BridgeError,
1513 wildcard: bool,
1514 ) {
1515 if wildcard {
1516 sui_client_mock.set_wildcard_transaction_response(Err(error));
1517 } else {
1518 sui_client_mock.add_transaction_response(tx_digest, Err(error));
1519 }
1520 }
1521
1522 #[allow(clippy::type_complexity)]
1523 async fn setup() -> (
1524 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1525 mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1526 SuiMockClient,
1527 tokio::sync::broadcast::Receiver<TransactionDigest>,
1528 Arc<BridgeOrchestratorTables>,
1529 Vec<BridgeAuthorityKeyPair>,
1530 SuiKeyPair,
1531 BridgeRequestMockHandler,
1532 BridgeRequestMockHandler,
1533 BridgeRequestMockHandler,
1534 BridgeRequestMockHandler,
1535 Vec<tokio::task::JoinHandle<()>>,
1536 ObjectRef,
1537 SuiAddress,
1538 Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1539 tokio::sync::watch::Sender<IsBridgePaused>,
1540 ) {
1541 telemetry_subscribers::init_for_testing();
1542 let registry = Registry::new();
1543 mysten_metrics::init_metrics(®istry);
1544 init_all_struct_tags();
1545
1546 let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1547 let sui_key = SuiKeyPair::from(kp);
1548 let gas_object_ref = random_object_ref();
1549 let temp_dir = tempfile::tempdir().unwrap();
1550 let store = BridgeOrchestratorTables::new(temp_dir.path());
1551 let sui_client_mock = SuiMockClient::default();
1552 let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1553 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1554
1555 let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1558 let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1559
1560 let mock0 = BridgeRequestMockHandler::new();
1561 let mock1 = BridgeRequestMockHandler::new();
1562 let mock2 = BridgeRequestMockHandler::new();
1563 let mock3 = BridgeRequestMockHandler::new();
1564
1565 let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1566 vec![2500, 2500, 2500, 2500],
1567 vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1568 );
1569
1570 let committee = BridgeCommittee::new(authorities).unwrap();
1571
1572 let agg = Arc::new(ArcSwap::new(Arc::new(
1573 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1574 )));
1575 let metrics = Arc::new(BridgeMetrics::new(®istry));
1576 let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1577 let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1578 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1579 let executor = BridgeActionExecutor::new(
1580 sui_client.clone(),
1581 agg.clone(),
1582 store.clone(),
1583 sui_key,
1584 sui_address,
1585 gas_object_ref.0,
1586 sui_token_type_tags.clone(),
1587 bridge_pause_rx,
1588 metrics,
1589 )
1590 .await;
1591
1592 let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1593 handles.extend(executor_handle);
1594
1595 (
1596 signing_tx,
1597 execution_tx,
1598 sui_client_mock,
1599 tx_subscription,
1600 store,
1601 secrets,
1602 dummy_sui_key,
1603 mock0,
1604 mock1,
1605 mock2,
1606 mock3,
1607 handles,
1608 gas_object_ref,
1609 sui_address,
1610 sui_token_type_tags,
1611 bridge_pause_tx,
1612 )
1613 }
1614}