1use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::SuiExecutionStatus;
13use sui_types::TypeTag;
14use sui_types::transaction::ObjectArg;
15use sui_types::{
16 base_types::{ObjectID, ObjectRef, SuiAddress},
17 crypto::{Signature, SuiKeyPair},
18 digests::TransactionDigest,
19 gas_coin::GasCoin,
20 object::Owner,
21 transaction::Transaction,
22};
23
24use crate::events::{
25 TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
26 TokenTransferClaimed,
27};
28use crate::metrics::BridgeMetrics;
29use crate::{
30 client::bridge_authority_aggregator::BridgeAuthorityAggregator,
31 error::BridgeError,
32 storage::BridgeOrchestratorTables,
33 sui_client::{ExecuteTransactionResult, SuiClient, SuiClientInner},
34 sui_transaction_builder::build_sui_transaction,
35 types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
36};
37use std::collections::HashMap;
38use std::sync::Arc;
39use tokio::sync::Semaphore;
40use tokio::time::Duration;
41use tracing::{Instrument, error, info, instrument, warn};
42
43pub const CHANNEL_SIZE: usize = 1000;
44pub const SIGNING_CONCURRENCY: usize = 10;
45
46pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
49pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
50
51async fn delay(attempt_times: u64) {
52 let delay_ms = 100 * 2_u64.pow(attempt_times as u32);
53 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
54}
55
56#[derive(Debug)]
57pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
58
59#[derive(Debug)]
60pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
61
62pub trait BridgeActionExecutorTrait {
63 fn run(
64 self,
65 ) -> (
66 Vec<tokio::task::JoinHandle<()>>,
67 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
68 );
69}
70
71pub struct BridgeActionExecutor<C> {
72 sui_client: Arc<SuiClient<C>>,
73 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
74 key: SuiKeyPair,
75 sui_address: SuiAddress,
76 gas_object_id: ObjectID,
77 store: Arc<BridgeOrchestratorTables>,
78 bridge_object_arg: ObjectArg,
79 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
80 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
81 metrics: Arc<BridgeMetrics>,
82}
83
84impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
85where
86 C: SuiClientInner + 'static,
87{
88 fn run(
89 self,
90 ) -> (
91 Vec<tokio::task::JoinHandle<()>>,
92 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
93 ) {
94 let (tasks, sender, _) = self.run_inner();
95 (tasks, sender)
96 }
97}
98
99impl<C> BridgeActionExecutor<C>
100where
101 C: SuiClientInner + 'static,
102{
103 pub async fn new(
104 sui_client: Arc<SuiClient<C>>,
105 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
106 store: Arc<BridgeOrchestratorTables>,
107 key: SuiKeyPair,
108 sui_address: SuiAddress,
109 gas_object_id: ObjectID,
110 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
111 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
112 metrics: Arc<BridgeMetrics>,
113 ) -> Self {
114 let bridge_object_arg = sui_client
115 .get_mutable_bridge_object_arg_must_succeed()
116 .await;
117 Self {
118 sui_client,
119 bridge_auth_agg,
120 store,
121 key,
122 gas_object_id,
123 sui_address,
124 bridge_object_arg,
125 sui_token_type_tags,
126 bridge_pause_rx,
127 metrics,
128 }
129 }
130
131 fn run_inner(
132 self,
133 ) -> (
134 Vec<tokio::task::JoinHandle<()>>,
135 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
136 mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
137 ) {
138 let key = self.key;
139
140 let (sender, receiver) = mysten_metrics::metered_channel::channel(
141 CHANNEL_SIZE,
142 &mysten_metrics::get_metrics()
143 .unwrap()
144 .channel_inflight
145 .with_label_values(&["executor_signing_queue"]),
146 );
147
148 let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
149 CHANNEL_SIZE,
150 &mysten_metrics::get_metrics()
151 .unwrap()
152 .channel_inflight
153 .with_label_values(&["executor_execution_queue"]),
154 );
155 let execution_tx_clone = execution_tx.clone();
156 let sender_clone = sender.clone();
157 let store_clone = self.store.clone();
158 let client_clone = self.sui_client.clone();
159 let mut tasks = vec![];
160 let metrics = self.metrics.clone();
161 tasks.push(spawn_logged_monitored_task!(
162 Self::run_signature_aggregation_loop(
163 client_clone,
164 self.bridge_auth_agg,
165 store_clone,
166 sender_clone,
167 receiver,
168 execution_tx_clone,
169 metrics,
170 )
171 ));
172
173 let metrics = self.metrics.clone();
174 let execution_tx_clone = execution_tx.clone();
175 tasks.push(spawn_logged_monitored_task!(
176 Self::run_onchain_execution_loop(
177 self.sui_client.clone(),
178 key,
179 self.sui_address,
180 self.gas_object_id,
181 self.store.clone(),
182 execution_tx_clone,
183 execution_rx,
184 self.bridge_object_arg,
185 self.sui_token_type_tags,
186 self.bridge_pause_rx,
187 metrics,
188 )
189 ));
190 (tasks, sender, execution_tx)
191 }
192
193 async fn run_signature_aggregation_loop(
194 sui_client: Arc<SuiClient<C>>,
195 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
196 store: Arc<BridgeOrchestratorTables>,
197 signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
198 mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
199 BridgeActionExecutionWrapper,
200 >,
201 execution_queue_sender: mysten_metrics::metered_channel::Sender<
202 CertifiedBridgeActionExecutionWrapper,
203 >,
204 metrics: Arc<BridgeMetrics>,
205 ) {
206 info!("Starting run_signature_aggregation_loop");
207 let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
208 while let Some(action) = signing_queue_receiver.recv().await {
209 Self::handle_signing_task(
210 &semaphore,
211 &auth_agg,
212 &signing_queue_sender,
213 &execution_queue_sender,
214 &sui_client,
215 &store,
216 action,
217 &metrics,
218 )
219 .await;
220 }
221 }
222
223 async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
224 let Ok(Ok(is_paused)) =
225 retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
226 else {
227 error!("Failed to get bridge status after retry");
228 return false;
229 };
230 !is_paused
231 }
232
233 #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
234 async fn handle_signing_task(
235 semaphore: &Arc<Semaphore>,
236 auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
237 signing_queue_sender: &mysten_metrics::metered_channel::Sender<
238 BridgeActionExecutionWrapper,
239 >,
240 execution_queue_sender: &mysten_metrics::metered_channel::Sender<
241 CertifiedBridgeActionExecutionWrapper,
242 >,
243 sui_client: &Arc<SuiClient<C>>,
244 store: &Arc<BridgeOrchestratorTables>,
245 action: BridgeActionExecutionWrapper,
246 metrics: &Arc<BridgeMetrics>,
247 ) {
248 metrics.action_executor_signing_queue_received_actions.inc();
249 let action_key = action.0.key();
250 info!("Received action for signing: {:?}", action.0);
251
252 let should_proceed = Self::should_proceed_signing(sui_client).await;
257 if !should_proceed {
258 metrics.action_executor_signing_queue_skipped_actions.inc();
259 warn!("skipping signing task: {:?}", action_key);
260 return;
261 }
262
263 let auth_agg_clone = auth_agg.clone();
264 let signing_queue_sender_clone = signing_queue_sender.clone();
265 let execution_queue_sender_clone = execution_queue_sender.clone();
266 let sui_client_clone = sui_client.clone();
267 let store_clone = store.clone();
268 let metrics_clone = metrics.clone();
269 let semaphore_clone = semaphore.clone();
270 spawn_logged_monitored_task!(
271 Self::request_signatures(
272 semaphore_clone,
273 sui_client_clone,
274 auth_agg_clone,
275 action,
276 store_clone,
277 signing_queue_sender_clone,
278 execution_queue_sender_clone,
279 metrics_clone,
280 )
281 .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
282 "request_signatures"
283 );
284 }
285
286 async fn handle_already_processed_token_transfer_action_maybe(
290 sui_client: &Arc<SuiClient<C>>,
291 action: &BridgeAction,
292 store: &Arc<BridgeOrchestratorTables>,
293 metrics: &Arc<BridgeMetrics>,
294 ) -> bool {
295 let status = sui_client
296 .get_token_transfer_action_onchain_status_until_success(
297 action.chain_id() as u8,
298 action.seq_number(),
299 )
300 .await;
301 match status {
302 BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
303 info!(
304 "Action already approved or claimed, removing action from pending logs: {:?}",
305 action
306 );
307 metrics.action_executor_already_processed_actions.inc();
308 store
309 .remove_pending_actions(&[action.digest()])
310 .unwrap_or_else(|e| {
311 panic!("Write to DB should not fail: {:?}", e);
312 });
313 true
314 }
315 BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
318 }
319 }
320
321 async fn request_signatures(
324 semaphore: Arc<Semaphore>,
325 sui_client: Arc<SuiClient<C>>,
326 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
327 action: BridgeActionExecutionWrapper,
328 store: Arc<BridgeOrchestratorTables>,
329 signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
330 execution_queue_sender: mysten_metrics::metered_channel::Sender<
331 CertifiedBridgeActionExecutionWrapper,
332 >,
333 metrics: Arc<BridgeMetrics>,
334 ) {
335 let _permit = semaphore
336 .acquire()
337 .await
338 .expect("semaphore should not be closed");
339 info!("requesting signatures");
340 let BridgeActionExecutionWrapper(action, attempt_times) = action;
341
342 match &action {
344 BridgeAction::SuiToEthBridgeAction(_)
345 | BridgeAction::SuiToEthTokenTransfer(_)
346 | BridgeAction::SuiToEthTokenTransferV2(_)
347 | BridgeAction::EthToSuiBridgeAction(_)
348 | BridgeAction::EthToSuiTokenTransferV2(_) => (),
349 _ => unreachable!("Non token transfer action should not reach here"),
350 };
351
352 if Self::handle_already_processed_token_transfer_action_maybe(
354 &sui_client,
355 &action,
356 &store,
357 &metrics,
358 )
359 .await
360 {
361 return;
362 }
363 match auth_agg
364 .load()
365 .request_committee_signatures(action.clone())
366 .await
367 {
368 Ok(certificate) => {
369 info!("Sending certificate to execution");
370 execution_queue_sender
371 .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
372 .await
373 .unwrap_or_else(|e| {
374 panic!("Sending to execution queue should not fail: {:?}", e);
375 });
376 }
377 Err(e) => {
378 warn!("Failed to collect sigs for bridge action: {:?}", e);
379 metrics.err_signature_aggregation.inc();
380
381 if attempt_times >= MAX_SIGNING_ATTEMPTS {
383 metrics.err_signature_aggregation_too_many_failures.inc();
384 error!(
385 "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
386 e
387 );
388 return;
389 }
390 delay(attempt_times).await;
391 signing_queue_sender
392 .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
393 .await
394 .unwrap_or_else(|e| {
395 panic!("Sending to signing queue should not fail: {:?}", e);
396 });
397 }
398 }
399 }
400
401 async fn run_onchain_execution_loop(
404 sui_client: Arc<SuiClient<C>>,
405 sui_key: SuiKeyPair,
406 sui_address: SuiAddress,
407 gas_object_id: ObjectID,
408 store: Arc<BridgeOrchestratorTables>,
409 execution_queue_sender: mysten_metrics::metered_channel::Sender<
410 CertifiedBridgeActionExecutionWrapper,
411 >,
412 mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
413 CertifiedBridgeActionExecutionWrapper,
414 >,
415 bridge_object_arg: ObjectArg,
416 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
417 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
418 metrics: Arc<BridgeMetrics>,
419 ) {
420 info!("Starting run_onchain_execution_loop");
421 while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
422 if *bridge_pause_rx.borrow() {
426 warn!("Bridge is paused, skipping execution");
427 metrics
428 .action_executor_execution_queue_skipped_actions_due_to_pausing
429 .inc();
430 continue;
431 }
432 Self::handle_execution_task(
433 certificate_wrapper,
434 &sui_client,
435 &sui_key,
436 &sui_address,
437 gas_object_id,
438 &store,
439 &execution_queue_sender,
440 &bridge_object_arg,
441 &sui_token_type_tags,
442 &metrics,
443 )
444 .await;
445 }
446 panic!("Execution queue closed unexpectedly");
447 }
448
449 #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
450 async fn handle_execution_task(
451 certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
452 sui_client: &Arc<SuiClient<C>>,
453 sui_key: &SuiKeyPair,
454 sui_address: &SuiAddress,
455 gas_object_id: ObjectID,
456 store: &Arc<BridgeOrchestratorTables>,
457 execution_queue_sender: &mysten_metrics::metered_channel::Sender<
458 CertifiedBridgeActionExecutionWrapper,
459 >,
460 bridge_object_arg: &ObjectArg,
461 sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
462 metrics: &Arc<BridgeMetrics>,
463 ) {
464 metrics
465 .action_executor_execution_queue_received_actions
466 .inc();
467 let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
468 let action = certificate.data();
469 let action_key = action.key();
470
471 info!("Received certified action for execution: {:?}", action);
472
473 let (gas_coin, gas_object_ref) =
475 Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
476 metrics.gas_coin_balance.set(gas_coin.value() as i64);
477
478 let ceriticate_clone = certificate.clone();
479
480 if Self::handle_already_processed_token_transfer_action_maybe(
482 sui_client, action, store, metrics,
483 )
484 .await
485 {
486 info!("Action already processed, skipping");
487 return;
488 }
489
490 info!("Building Sui transaction");
491 let rgp = sui_client.get_reference_gas_price_until_success().await;
492 let tx_data = match build_sui_transaction(
493 *sui_address,
494 &gas_object_ref,
495 ceriticate_clone,
496 *bridge_object_arg,
497 sui_token_type_tags.load().as_ref(),
498 rgp,
499 ) {
500 Ok(tx_data) => tx_data,
501 Err(err) => {
502 metrics.err_build_sui_transaction.inc();
503 error!(
504 "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
505 action, err
506 );
507 return;
510 }
511 };
512 let sig = Signature::new_secure(
513 &IntentMessage::new(Intent::sui_transaction(), &tx_data),
514 sui_key,
515 );
516 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
517 let tx_digest = *signed_tx.digest();
518
519 if Self::handle_already_processed_token_transfer_action_maybe(
521 sui_client, action, store, metrics,
522 )
523 .await
524 {
525 info!("Action already processed, skipping");
526 return;
527 }
528
529 info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
530 match sui_client
531 .execute_transaction_block_with_effects(signed_tx)
532 .await
533 {
534 Ok(resp) => {
535 Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
536 }
537
538 Err(err) => {
540 error!(
541 ?action_key,
542 ?tx_digest,
543 "Sui transaction failed at signing: {err:?}"
544 );
545 metrics.err_sui_transaction_submission.inc();
546 let metrics_clone = metrics.clone();
547 let sender_clone = execution_queue_sender.clone();
549 spawn_logged_monitored_task!(async move {
550 if attempt_times >= MAX_EXECUTION_ATTEMPTS {
552 metrics_clone
553 .err_sui_transaction_submission_too_many_failures
554 .inc();
555 error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
556 return;
557 }
558 delay(attempt_times).await;
559 sender_clone
560 .send(CertifiedBridgeActionExecutionWrapper(
561 certificate,
562 attempt_times + 1,
563 ))
564 .await
565 .unwrap_or_else(|e| {
566 panic!("Sending to execution queue should not fail: {:?}", e);
567 });
568 info!("Re-enqueued certificate for execution");
569 }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
570 }
571 }
572 }
573
574 async fn handle_execution_effects(
576 tx_digest: TransactionDigest,
577 response: ExecuteTransactionResult,
578 store: &Arc<BridgeOrchestratorTables>,
579 action: &BridgeAction,
580 metrics: &Arc<BridgeMetrics>,
581 ) {
582 match &response.status {
583 SuiExecutionStatus::Success => {
584 let relevant_events = response
585 .events
586 .iter()
587 .filter(|e| {
588 e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
589 || e.type_ == *TokenTransferClaimed.get().unwrap()
590 || e.type_ == *TokenTransferApproved.get().unwrap()
591 || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
592 })
593 .collect::<Vec<_>>();
594 assert!(
595 !relevant_events.is_empty(),
596 "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
597 or TokenTransferAlreadyApproved event but got: {:?}",
598 response.events
599 );
600 info!(?tx_digest, "Sui transaction executed successfully");
601 relevant_events.iter().for_each(|e| {
603 if e.type_ == *TokenTransferClaimed.get().unwrap() {
604 match action {
605 BridgeAction::EthToSuiBridgeAction(_)
606 | BridgeAction::EthToSuiTokenTransferV2(_) => {
607 metrics.eth_sui_token_transfer_claimed.inc();
608 }
609 BridgeAction::SuiToEthBridgeAction(_)
610 | BridgeAction::SuiToEthTokenTransfer(_)
611 | BridgeAction::SuiToEthTokenTransferV2(_) => {
612 metrics.sui_eth_token_transfer_claimed.inc();
613 }
614 _ => error!("Unexpected action type for claimed event: {:?}", action),
615 }
616 } else if e.type_ == *TokenTransferApproved.get().unwrap() {
617 match action {
618 BridgeAction::EthToSuiBridgeAction(_)
619 | BridgeAction::EthToSuiTokenTransferV2(_) => {
620 metrics.eth_sui_token_transfer_approved.inc();
621 }
622 BridgeAction::SuiToEthBridgeAction(_)
623 | BridgeAction::SuiToEthTokenTransfer(_)
624 | BridgeAction::SuiToEthTokenTransferV2(_) => {
625 metrics.sui_eth_token_transfer_approved.inc();
626 }
627 _ => error!("Unexpected action type for approved event: {:?}", action),
628 }
629 }
630 });
631 store
632 .remove_pending_actions(&[action.digest()])
633 .unwrap_or_else(|e| {
634 panic!("Write to DB should not fail: {:?}", e);
635 })
636 }
637 SuiExecutionStatus::Failure { error } => {
638 metrics.err_sui_transaction_execution.inc();
645 error!(
646 ?tx_digest,
647 "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
648 );
649 }
650 }
651 }
652
653 async fn get_gas_data_assert_ownership(
655 sui_address: SuiAddress,
656 gas_object_id: ObjectID,
657 sui_client: &SuiClient<C>,
658 ) -> (GasCoin, ObjectRef) {
659 let (gas_coin, gas_obj_ref, owner) = sui_client
660 .get_gas_data_panic_if_not_gas(gas_object_id)
661 .await;
662
663 assert_eq!(
666 owner,
667 Owner::AddressOwner(sui_address),
668 "Gas object {:?} is no longer owned by address {}",
669 gas_object_id,
670 sui_address
671 );
672 (gas_coin, gas_obj_ref)
673 }
674}
675
676pub async fn submit_to_executor(
677 tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
678 action: BridgeAction,
679) -> Result<(), BridgeError> {
680 tx.send(BridgeActionExecutionWrapper(action, 0))
681 .await
682 .map_err(|e| BridgeError::Generic(e.to_string()))
683}
684
685#[cfg(test)]
686mod tests {
687 use crate::events::init_all_struct_tags;
688 use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
689 use crate::types::BRIDGE_PAUSED;
690 use fastcrypto::traits::KeyPair;
691 use prometheus::Registry;
692 use std::collections::{BTreeMap, HashMap};
693 use std::str::FromStr;
694 use sui_json_rpc_types::SuiEvent;
695 use sui_types::TypeTag;
696 use sui_types::crypto::get_key_pair;
697 use sui_types::gas_coin::GasCoin;
698 use sui_types::{base_types::random_object_ref, transaction::TransactionData};
699
700 use crate::{
701 crypto::{
702 BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
703 BridgeAuthorityRecoverableSignature,
704 },
705 server::mock_handler::BridgeRequestMockHandler,
706 sui_mock_client::SuiMockClient,
707 test_utils::{
708 get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
709 get_test_sui_to_eth_bridge_action, sign_action_with_key,
710 },
711 types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
712 };
713
714 use super::*;
715
716 #[tokio::test]
717 async fn test_onchain_execution_loop() {
718 let (
719 signing_tx,
720 _execution_tx,
721 sui_client_mock,
722 mut tx_subscription,
723 store,
724 secrets,
725 dummy_sui_key,
726 mock0,
727 mock1,
728 mock2,
729 mock3,
730 _handles,
731 gas_object_ref,
732 sui_address,
733 sui_token_type_tags,
734 _bridge_pause_tx,
735 ) = setup().await;
736 let (action_certificate, _, _) = get_bridge_authority_approved_action(
737 vec![&mock0, &mock1, &mock2, &mock3],
738 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
739 None,
740 true,
741 );
742 let action = action_certificate.data().clone();
743 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
744 let tx_data = build_sui_transaction(
745 sui_address,
746 &gas_object_ref,
747 action_certificate,
748 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
749 &id_token_map,
750 1000,
751 )
752 .unwrap();
753
754 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
755
756 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
758 gas_coin.clone(),
759 gas_object_ref,
760 Owner::AddressOwner(sui_address),
761 );
762
763 let mut event = SuiEvent::random_for_testing();
765 event.type_ = TokenTransferClaimed.get().unwrap().clone();
766 let events = vec![event];
767 mock_transaction_response(
768 &sui_client_mock,
769 tx_digest,
770 SuiExecutionStatus::Success,
771 Some(events),
772 true,
773 );
774
775 store
776 .insert_pending_actions(std::slice::from_ref(&action))
777 .unwrap();
778 assert_eq!(
779 store.get_all_pending_actions()[&action.digest()],
780 action.clone()
781 );
782
783 submit_to_executor(&signing_tx, action.clone())
785 .await
786 .unwrap();
787
788 tx_subscription.recv().await.unwrap();
790 assert!(store.get_all_pending_actions().is_empty());
791
792 let (action_certificate, _, _) = get_bridge_authority_approved_action(
797 vec![&mock0, &mock1, &mock2, &mock3],
798 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
799 None,
800 true,
801 );
802
803 let action = action_certificate.data().clone();
804
805 let tx_data = build_sui_transaction(
806 sui_address,
807 &gas_object_ref,
808 action_certificate,
809 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
810 &id_token_map,
811 1000,
812 )
813 .unwrap();
814 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
815
816 mock_transaction_response(
818 &sui_client_mock,
819 tx_digest,
820 SuiExecutionStatus::Failure {
821 error: "failure is mother of success".to_string(),
822 },
823 None,
824 true,
825 );
826
827 store
828 .insert_pending_actions(std::slice::from_ref(&action))
829 .unwrap();
830 assert_eq!(
831 store.get_all_pending_actions()[&action.digest()],
832 action.clone()
833 );
834
835 submit_to_executor(&signing_tx, action.clone())
837 .await
838 .unwrap();
839
840 tx_subscription.recv().await.unwrap();
842 assert_eq!(
844 store.get_all_pending_actions()[&action.digest()],
845 action.clone()
846 );
847
848 let (action_certificate, _, _) = get_bridge_authority_approved_action(
853 vec![&mock0, &mock1, &mock2, &mock3],
854 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
855 None,
856 true,
857 );
858
859 let action = action_certificate.data().clone();
860
861 let tx_data = build_sui_transaction(
862 sui_address,
863 &gas_object_ref,
864 action_certificate,
865 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
866 &id_token_map,
867 1000,
868 )
869 .unwrap();
870 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
871 mock_transaction_error(
872 &sui_client_mock,
873 tx_digest,
874 BridgeError::Generic("some random error".to_string()),
875 true,
876 );
877
878 store
879 .insert_pending_actions(std::slice::from_ref(&action))
880 .unwrap();
881 assert_eq!(
882 store.get_all_pending_actions()[&action.digest()],
883 action.clone()
884 );
885
886 submit_to_executor(&signing_tx, action.clone())
888 .await
889 .unwrap();
890
891 let tx_digest = tx_subscription.recv().await.unwrap();
893 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
894
895 assert!(
897 store
898 .get_all_pending_actions()
899 .contains_key(&action.digest())
900 );
901
902 let mut event = SuiEvent::random_for_testing();
904 event.type_ = TokenTransferClaimed.get().unwrap().clone();
905 let events = vec![event];
906 mock_transaction_response(
907 &sui_client_mock,
908 tx_digest,
909 SuiExecutionStatus::Success,
910 Some(events),
911 true,
912 );
913
914 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
916 assert!(
918 !store
919 .get_all_pending_actions()
920 .contains_key(&action.digest())
921 );
922 }
923
924 #[tokio::test]
925 async fn test_signature_aggregation_loop() {
926 let (
927 signing_tx,
928 _execution_tx,
929 sui_client_mock,
930 mut tx_subscription,
931 store,
932 secrets,
933 dummy_sui_key,
934 mock0,
935 mock1,
936 mock2,
937 mock3,
938 _handles,
939 gas_object_ref,
940 sui_address,
941 sui_token_type_tags,
942 _bridge_pause_tx,
943 ) = setup().await;
944 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
945 let (action_certificate, sui_tx_digest, sui_tx_event_index) =
946 get_bridge_authority_approved_action(
947 vec![&mock0, &mock1, &mock2, &mock3],
948 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
949 None,
950 true,
951 );
952 let action = action_certificate.data().clone();
953 mock_bridge_authority_signing_errors(
954 vec![&mock0, &mock1, &mock2],
955 sui_tx_digest,
956 sui_tx_event_index,
957 );
958 let mut sigs = mock_bridge_authority_sigs(
959 vec![&mock3],
960 &action,
961 vec![&secrets[3]],
962 sui_tx_digest,
963 sui_tx_event_index,
964 );
965
966 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
968 gas_coin,
969 gas_object_ref,
970 Owner::AddressOwner(sui_address),
971 );
972 store
973 .insert_pending_actions(std::slice::from_ref(&action))
974 .unwrap();
975 assert_eq!(
976 store.get_all_pending_actions()[&action.digest()],
977 action.clone()
978 );
979
980 submit_to_executor(&signing_tx, action.clone())
982 .await
983 .unwrap();
984
985 loop {
987 let requested_times =
988 mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
989 if requested_times >= 2 {
990 break;
991 }
992 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
993 }
994 assert_eq!(
996 tx_subscription.try_recv().unwrap_err(),
997 tokio::sync::broadcast::error::TryRecvError::Empty
998 );
999 assert_eq!(
1001 store.get_all_pending_actions()[&action.digest()],
1002 action.clone()
1003 );
1004
1005 let sig_from_2 = mock_bridge_authority_sigs(
1007 vec![&mock2],
1008 &action,
1009 vec![&secrets[2]],
1010 sui_tx_digest,
1011 sui_tx_event_index,
1012 );
1013 sigs.extend(sig_from_2);
1014 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1015 action.clone(),
1016 BridgeCommitteeValiditySignInfo { signatures: sigs },
1017 );
1018 let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1019 let tx_data = build_sui_transaction(
1020 sui_address,
1021 &gas_object_ref,
1022 action_certificate,
1023 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1024 &id_token_map,
1025 1000,
1026 )
1027 .unwrap();
1028 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1029
1030 let mut event = SuiEvent::random_for_testing();
1031 event.type_ = TokenTransferClaimed.get().unwrap().clone();
1032 let events = vec![event];
1033 mock_transaction_response(
1034 &sui_client_mock,
1035 tx_digest,
1036 SuiExecutionStatus::Success,
1037 Some(events),
1038 true,
1039 );
1040
1041 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1043 assert!(
1045 !store
1046 .get_all_pending_actions()
1047 .contains_key(&action.digest())
1048 );
1049 }
1050
1051 #[tokio::test]
1052 async fn test_skip_request_signature_if_already_processed_on_chain() {
1053 let (
1054 signing_tx,
1055 _execution_tx,
1056 sui_client_mock,
1057 mut tx_subscription,
1058 store,
1059 _secrets,
1060 _dummy_sui_key,
1061 mock0,
1062 mock1,
1063 mock2,
1064 mock3,
1065 _handles,
1066 _gas_object_ref,
1067 _sui_address,
1068 _sui_token_type_tags,
1069 _bridge_pause_tx,
1070 ) = setup().await;
1071
1072 let sui_tx_digest = TransactionDigest::random();
1073 let sui_tx_event_index = 0;
1074 let action = get_test_sui_to_eth_bridge_action(
1075 Some(sui_tx_digest),
1076 Some(sui_tx_event_index),
1077 None,
1078 None,
1079 None,
1080 None,
1081 None,
1082 );
1083 mock_bridge_authority_signing_errors(
1084 vec![&mock0, &mock1, &mock2, &mock3],
1085 sui_tx_digest,
1086 sui_tx_event_index,
1087 );
1088 store
1089 .insert_pending_actions(std::slice::from_ref(&action))
1090 .unwrap();
1091 assert_eq!(
1092 store.get_all_pending_actions()[&action.digest()],
1093 action.clone()
1094 );
1095
1096 submit_to_executor(&signing_tx, action.clone())
1098 .await
1099 .unwrap();
1100 let action_digest = action.digest();
1101
1102 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1104 tx_subscription.try_recv().unwrap_err();
1105 assert!(store.get_all_pending_actions().contains_key(&action_digest));
1107
1108 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1109
1110 let now = std::time::Instant::now();
1112 while store.get_all_pending_actions().contains_key(&action_digest) {
1113 if now.elapsed().as_secs() > 10 {
1114 panic!("Timeout waiting for action to be removed from WAL");
1115 }
1116 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1117 }
1118 tx_subscription.try_recv().unwrap_err();
1119 }
1120
1121 #[tokio::test]
1122 async fn test_skip_tx_submission_if_already_processed_on_chain() {
1123 let (
1124 _signing_tx,
1125 execution_tx,
1126 sui_client_mock,
1127 mut tx_subscription,
1128 store,
1129 secrets,
1130 dummy_sui_key,
1131 mock0,
1132 mock1,
1133 mock2,
1134 mock3,
1135 _handles,
1136 gas_object_ref,
1137 sui_address,
1138 sui_token_type_tags,
1139 _bridge_pause_tx,
1140 ) = setup().await;
1141 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1142 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1143 vec![&mock0, &mock1, &mock2, &mock3],
1144 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1145 None,
1146 true,
1147 );
1148
1149 let action = action_certificate.data().clone();
1150 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1151 let tx_data = build_sui_transaction(
1152 sui_address,
1153 &gas_object_ref,
1154 action_certificate.clone(),
1155 arg,
1156 &id_token_map,
1157 1000,
1158 )
1159 .unwrap();
1160 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1161 mock_transaction_error(
1162 &sui_client_mock,
1163 tx_digest,
1164 BridgeError::Generic("some random error".to_string()),
1165 true,
1166 );
1167
1168 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1170 gas_coin.clone(),
1171 gas_object_ref,
1172 Owner::AddressOwner(sui_address),
1173 );
1174
1175 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1176
1177 store
1178 .insert_pending_actions(std::slice::from_ref(&action))
1179 .unwrap();
1180 assert_eq!(
1181 store.get_all_pending_actions()[&action.digest()],
1182 action.clone()
1183 );
1184
1185 execution_tx
1187 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1188 .await
1189 .unwrap();
1190
1191 tx_subscription.recv().await.unwrap();
1193
1194 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1196
1197 let now = std::time::Instant::now();
1199 let action_digest = action.digest();
1200 while store.get_all_pending_actions().contains_key(&action_digest) {
1201 if now.elapsed().as_secs() > 10 {
1202 panic!("Timeout waiting for action to be removed from WAL");
1203 }
1204 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1205 }
1206 }
1207
1208 #[tokio::test]
1209 async fn test_skip_tx_submission_if_bridge_is_paused() {
1210 let (
1211 _signing_tx,
1212 execution_tx,
1213 sui_client_mock,
1214 mut tx_subscription,
1215 store,
1216 secrets,
1217 dummy_sui_key,
1218 mock0,
1219 mock1,
1220 mock2,
1221 mock3,
1222 _handles,
1223 gas_object_ref,
1224 sui_address,
1225 sui_token_type_tags,
1226 bridge_pause_tx,
1227 ) = setup().await;
1228 let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1229 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1230 vec![&mock0, &mock1, &mock2, &mock3],
1231 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1232 None,
1233 true,
1234 );
1235
1236 let action = action_certificate.data().clone();
1237 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1238 let tx_data = build_sui_transaction(
1239 sui_address,
1240 &gas_object_ref,
1241 action_certificate.clone(),
1242 arg,
1243 &id_token_map,
1244 1000,
1245 )
1246 .unwrap();
1247 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1248 mock_transaction_error(
1249 &sui_client_mock,
1250 tx_digest,
1251 BridgeError::Generic("some random error".to_string()),
1252 true,
1253 );
1254
1255 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1257 gas_coin.clone(),
1258 gas_object_ref,
1259 Owner::AddressOwner(sui_address),
1260 );
1261 let action_digest = action.digest();
1262 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1263
1264 assert!(!*bridge_pause_tx.borrow());
1266
1267 store
1268 .insert_pending_actions(std::slice::from_ref(&action))
1269 .unwrap();
1270 assert_eq!(
1271 store.get_all_pending_actions()[&action.digest()],
1272 action.clone()
1273 );
1274
1275 execution_tx
1277 .send(CertifiedBridgeActionExecutionWrapper(
1278 action_certificate.clone(),
1279 0,
1280 ))
1281 .await
1282 .unwrap();
1283
1284 tx_subscription.recv().await.unwrap();
1286
1287 bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1289
1290 execution_tx
1292 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1293 .await
1294 .unwrap();
1295
1296 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1297 assert_eq!(
1299 tx_subscription.try_recv().unwrap_err(),
1300 tokio::sync::broadcast::error::TryRecvError::Empty
1301 );
1302 assert_eq!(
1304 store.get_all_pending_actions()[&action_digest],
1305 action.clone()
1306 );
1307 }
1308
1309 #[tokio::test]
1310 async fn test_action_executor_handle_new_token() {
1311 let new_token_id = 255u8; let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1313 let (
1314 _signing_tx,
1315 execution_tx,
1316 sui_client_mock,
1317 mut tx_subscription,
1318 _store,
1319 secrets,
1320 dummy_sui_key,
1321 mock0,
1322 mock1,
1323 mock2,
1324 mock3,
1325 _handles,
1326 gas_object_ref,
1327 sui_address,
1328 sui_token_type_tags,
1329 _bridge_pause_tx,
1330 ) = setup().await;
1331 let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1332 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1333 vec![&mock0, &mock1, &mock2, &mock3],
1334 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1335 Some(new_token_id),
1336 false, );
1338
1339 let action = action_certificate.data().clone();
1340 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1341 let tx_data = build_sui_transaction(
1342 sui_address,
1343 &gas_object_ref,
1344 action_certificate.clone(),
1345 arg,
1346 &maplit::hashmap! {
1347 new_token_id => new_type_tag.clone()
1348 },
1349 1000,
1350 )
1351 .unwrap();
1352 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1353 mock_transaction_error(
1354 &sui_client_mock,
1355 tx_digest,
1356 BridgeError::Generic("some random error".to_string()),
1357 true,
1358 );
1359
1360 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1362 gas_coin.clone(),
1363 gas_object_ref,
1364 Owner::AddressOwner(sui_address),
1365 );
1366 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1367
1368 execution_tx
1370 .send(CertifiedBridgeActionExecutionWrapper(
1371 action_certificate.clone(),
1372 0,
1373 ))
1374 .await
1375 .unwrap();
1376
1377 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1378 assert_eq!(
1380 tx_subscription.try_recv().unwrap_err(),
1381 tokio::sync::broadcast::error::TryRecvError::Empty
1382 );
1383
1384 id_token_map.insert(new_token_id, new_type_tag);
1386 sui_token_type_tags.store(Arc::new(id_token_map));
1387
1388 execution_tx
1390 .send(CertifiedBridgeActionExecutionWrapper(
1391 action_certificate.clone(),
1392 0,
1393 ))
1394 .await
1395 .unwrap();
1396
1397 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1398 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1400 }
1401
1402 fn mock_bridge_authority_sigs(
1403 mocks: Vec<&BridgeRequestMockHandler>,
1404 action: &BridgeAction,
1405 secrets: Vec<&BridgeAuthorityKeyPair>,
1406 sui_tx_digest: TransactionDigest,
1407 sui_tx_event_index: u16,
1408 ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1409 assert_eq!(mocks.len(), secrets.len());
1410 let mut signed_actions = BTreeMap::new();
1411 for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1412 let signed_action = sign_action_with_key(action, secret);
1413 mock.add_sui_event_response(
1414 sui_tx_digest,
1415 sui_tx_event_index,
1416 Ok(signed_action.clone()),
1417 None,
1418 );
1419 signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1420 }
1421 signed_actions
1422 }
1423
1424 fn mock_bridge_authority_signing_errors(
1425 mocks: Vec<&BridgeRequestMockHandler>,
1426 sui_tx_digest: TransactionDigest,
1427 sui_tx_event_index: u16,
1428 ) {
1429 for mock in mocks {
1430 mock.add_sui_event_response(
1431 sui_tx_digest,
1432 sui_tx_event_index,
1433 Err(BridgeError::RestAPIError("small issue".into())),
1434 None,
1435 );
1436 }
1437 }
1438
1439 fn get_bridge_authority_approved_action(
1441 mocks: Vec<&BridgeRequestMockHandler>,
1442 secrets: Vec<&BridgeAuthorityKeyPair>,
1443 token_id: Option<u8>,
1444 sui_to_eth: bool,
1445 ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1446 let sui_tx_digest = TransactionDigest::random();
1447 let sui_tx_event_index = 1;
1448 let action = if sui_to_eth {
1449 get_test_sui_to_eth_bridge_action(
1450 Some(sui_tx_digest),
1451 Some(sui_tx_event_index),
1452 None,
1453 None,
1454 None,
1455 None,
1456 token_id,
1457 )
1458 } else {
1459 get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1460 };
1461
1462 let sigs =
1463 mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1464 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1465 action,
1466 BridgeCommitteeValiditySignInfo { signatures: sigs },
1467 );
1468 (
1469 VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1470 sui_tx_digest,
1471 sui_tx_event_index,
1472 )
1473 }
1474
1475 fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1476 let sig = Signature::new_secure(
1477 &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1478 dummy_sui_key,
1479 );
1480 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1481 *signed_tx.digest()
1482 }
1483
1484 fn mock_transaction_response(
1488 sui_client_mock: &SuiMockClient,
1489 tx_digest: TransactionDigest,
1490 status: SuiExecutionStatus,
1491 events: Option<Vec<SuiEvent>>,
1492 wildcard: bool,
1493 ) {
1494 let response = ExecuteTransactionResult {
1495 status,
1496 events: events.unwrap_or_default(),
1497 };
1498 if wildcard {
1499 sui_client_mock.set_wildcard_transaction_response(Ok(response));
1500 } else {
1501 sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1502 }
1503 }
1504
1505 fn mock_transaction_error(
1506 sui_client_mock: &SuiMockClient,
1507 tx_digest: TransactionDigest,
1508 error: BridgeError,
1509 wildcard: bool,
1510 ) {
1511 if wildcard {
1512 sui_client_mock.set_wildcard_transaction_response(Err(error));
1513 } else {
1514 sui_client_mock.add_transaction_response(tx_digest, Err(error));
1515 }
1516 }
1517
1518 #[allow(clippy::type_complexity)]
1519 async fn setup() -> (
1520 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1521 mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1522 SuiMockClient,
1523 tokio::sync::broadcast::Receiver<TransactionDigest>,
1524 Arc<BridgeOrchestratorTables>,
1525 Vec<BridgeAuthorityKeyPair>,
1526 SuiKeyPair,
1527 BridgeRequestMockHandler,
1528 BridgeRequestMockHandler,
1529 BridgeRequestMockHandler,
1530 BridgeRequestMockHandler,
1531 Vec<tokio::task::JoinHandle<()>>,
1532 ObjectRef,
1533 SuiAddress,
1534 Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1535 tokio::sync::watch::Sender<IsBridgePaused>,
1536 ) {
1537 telemetry_subscribers::init_for_testing();
1538 let registry = Registry::new();
1539 mysten_metrics::init_metrics(®istry);
1540 init_all_struct_tags();
1541
1542 let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1543 let sui_key = SuiKeyPair::from(kp);
1544 let gas_object_ref = random_object_ref();
1545 let temp_dir = tempfile::tempdir().unwrap();
1546 let store = BridgeOrchestratorTables::new(temp_dir.path());
1547 let sui_client_mock = SuiMockClient::default();
1548 let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1549 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1550
1551 let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1554 let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1555
1556 let mock0 = BridgeRequestMockHandler::new();
1557 let mock1 = BridgeRequestMockHandler::new();
1558 let mock2 = BridgeRequestMockHandler::new();
1559 let mock3 = BridgeRequestMockHandler::new();
1560
1561 let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1562 vec![2500, 2500, 2500, 2500],
1563 vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1564 );
1565
1566 let committee = BridgeCommittee::new(authorities).unwrap();
1567
1568 let agg = Arc::new(ArcSwap::new(Arc::new(
1569 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1570 )));
1571 let metrics = Arc::new(BridgeMetrics::new(®istry));
1572 let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1573 let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1574 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1575 let executor = BridgeActionExecutor::new(
1576 sui_client.clone(),
1577 agg.clone(),
1578 store.clone(),
1579 sui_key,
1580 sui_address,
1581 gas_object_ref.0,
1582 sui_token_type_tags.clone(),
1583 bridge_pause_rx,
1584 metrics,
1585 )
1586 .await;
1587
1588 let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1589 handles.extend(executor_handle);
1590
1591 (
1592 signing_tx,
1593 execution_tx,
1594 sui_client_mock,
1595 tx_subscription,
1596 store,
1597 secrets,
1598 dummy_sui_key,
1599 mock0,
1600 mock1,
1601 mock2,
1602 mock3,
1603 handles,
1604 gas_object_ref,
1605 sui_address,
1606 sui_token_type_tags,
1607 bridge_pause_tx,
1608 )
1609 }
1610}