1use crate::retry_with_max_elapsed_time;
8use crate::types::IsBridgePaused;
9use arc_swap::ArcSwap;
10use mysten_metrics::spawn_logged_monitored_task;
11use shared_crypto::intent::{Intent, IntentMessage};
12use sui_json_rpc_types::SuiExecutionStatus;
13use sui_types::TypeTag;
14use sui_types::transaction::ObjectArg;
15use sui_types::{
16 base_types::{ObjectID, ObjectRef, SuiAddress},
17 crypto::{Signature, SuiKeyPair},
18 digests::TransactionDigest,
19 gas_coin::GasCoin,
20 object::Owner,
21 transaction::Transaction,
22};
23
24use crate::events::{
25 TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
26 TokenTransferClaimed,
27};
28use crate::metrics::BridgeMetrics;
29use crate::{
30 client::bridge_authority_aggregator::BridgeAuthorityAggregator,
31 error::BridgeError,
32 storage::BridgeOrchestratorTables,
33 sui_client::{ExecuteTransactionResult, SuiClient, SuiClientInner},
34 sui_transaction_builder::build_sui_transaction,
35 types::{BridgeAction, BridgeActionStatus, VerifiedCertifiedBridgeAction},
36};
37use std::collections::HashMap;
38use std::sync::Arc;
39use tokio::sync::Semaphore;
40use tokio::time::Duration;
41use tracing::{Instrument, error, info, instrument, warn};
42
43pub const CHANNEL_SIZE: usize = 1000;
44pub const SIGNING_CONCURRENCY: usize = 10;
45
46pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
49pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
50
51async fn delay(attempt_times: u64) {
52 let delay_ms = 100 * 2_u64.pow(attempt_times as u32);
53 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
54}
55
56#[derive(Debug)]
57pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
58
59#[derive(Debug)]
60pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
61
62pub trait BridgeActionExecutorTrait {
63 fn run(
64 self,
65 ) -> (
66 Vec<tokio::task::JoinHandle<()>>,
67 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
68 );
69}
70
71pub struct BridgeActionExecutor<C> {
72 sui_client: Arc<SuiClient<C>>,
73 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
74 key: SuiKeyPair,
75 sui_address: SuiAddress,
76 gas_object_id: ObjectID,
77 store: Arc<BridgeOrchestratorTables>,
78 bridge_object_arg: ObjectArg,
79 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
80 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
81 metrics: Arc<BridgeMetrics>,
82}
83
84impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
85where
86 C: SuiClientInner + 'static,
87{
88 fn run(
89 self,
90 ) -> (
91 Vec<tokio::task::JoinHandle<()>>,
92 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
93 ) {
94 let (tasks, sender, _) = self.run_inner();
95 (tasks, sender)
96 }
97}
98
99impl<C> BridgeActionExecutor<C>
100where
101 C: SuiClientInner + 'static,
102{
103 pub async fn new(
104 sui_client: Arc<SuiClient<C>>,
105 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
106 store: Arc<BridgeOrchestratorTables>,
107 key: SuiKeyPair,
108 sui_address: SuiAddress,
109 gas_object_id: ObjectID,
110 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
111 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
112 metrics: Arc<BridgeMetrics>,
113 ) -> Self {
114 let bridge_object_arg = sui_client
115 .get_mutable_bridge_object_arg_must_succeed()
116 .await;
117 Self {
118 sui_client,
119 bridge_auth_agg,
120 store,
121 key,
122 gas_object_id,
123 sui_address,
124 bridge_object_arg,
125 sui_token_type_tags,
126 bridge_pause_rx,
127 metrics,
128 }
129 }
130
131 fn run_inner(
132 self,
133 ) -> (
134 Vec<tokio::task::JoinHandle<()>>,
135 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
136 mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
137 ) {
138 let key = self.key;
139
140 let (sender, receiver) = mysten_metrics::metered_channel::channel(
141 CHANNEL_SIZE,
142 &mysten_metrics::get_metrics()
143 .unwrap()
144 .channel_inflight
145 .with_label_values(&["executor_signing_queue"]),
146 );
147
148 let (execution_tx, execution_rx) = mysten_metrics::metered_channel::channel(
149 CHANNEL_SIZE,
150 &mysten_metrics::get_metrics()
151 .unwrap()
152 .channel_inflight
153 .with_label_values(&["executor_execution_queue"]),
154 );
155 let execution_tx_clone = execution_tx.clone();
156 let sender_clone = sender.clone();
157 let store_clone = self.store.clone();
158 let client_clone = self.sui_client.clone();
159 let mut tasks = vec![];
160 let metrics = self.metrics.clone();
161 tasks.push(spawn_logged_monitored_task!(
162 Self::run_signature_aggregation_loop(
163 client_clone,
164 self.bridge_auth_agg,
165 store_clone,
166 sender_clone,
167 receiver,
168 execution_tx_clone,
169 metrics,
170 )
171 ));
172
173 let metrics = self.metrics.clone();
174 let execution_tx_clone = execution_tx.clone();
175 tasks.push(spawn_logged_monitored_task!(
176 Self::run_onchain_execution_loop(
177 self.sui_client.clone(),
178 key,
179 self.sui_address,
180 self.gas_object_id,
181 self.store.clone(),
182 execution_tx_clone,
183 execution_rx,
184 self.bridge_object_arg,
185 self.sui_token_type_tags,
186 self.bridge_pause_rx,
187 metrics,
188 )
189 ));
190 (tasks, sender, execution_tx)
191 }
192
193 async fn run_signature_aggregation_loop(
194 sui_client: Arc<SuiClient<C>>,
195 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
196 store: Arc<BridgeOrchestratorTables>,
197 signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
198 mut signing_queue_receiver: mysten_metrics::metered_channel::Receiver<
199 BridgeActionExecutionWrapper,
200 >,
201 execution_queue_sender: mysten_metrics::metered_channel::Sender<
202 CertifiedBridgeActionExecutionWrapper,
203 >,
204 metrics: Arc<BridgeMetrics>,
205 ) {
206 info!("Starting run_signature_aggregation_loop");
207 let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
208 while let Some(action) = signing_queue_receiver.recv().await {
209 Self::handle_signing_task(
210 &semaphore,
211 &auth_agg,
212 &signing_queue_sender,
213 &execution_queue_sender,
214 &sui_client,
215 &store,
216 action,
217 &metrics,
218 )
219 .await;
220 }
221 }
222
223 async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
224 let Ok(Ok(is_paused)) =
225 retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
226 else {
227 error!("Failed to get bridge status after retry");
228 return false;
229 };
230 !is_paused
231 }
232
233 #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
234 async fn handle_signing_task(
235 semaphore: &Arc<Semaphore>,
236 auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
237 signing_queue_sender: &mysten_metrics::metered_channel::Sender<
238 BridgeActionExecutionWrapper,
239 >,
240 execution_queue_sender: &mysten_metrics::metered_channel::Sender<
241 CertifiedBridgeActionExecutionWrapper,
242 >,
243 sui_client: &Arc<SuiClient<C>>,
244 store: &Arc<BridgeOrchestratorTables>,
245 action: BridgeActionExecutionWrapper,
246 metrics: &Arc<BridgeMetrics>,
247 ) {
248 metrics.action_executor_signing_queue_received_actions.inc();
249 let action_key = action.0.key();
250 info!("Received action for signing: {:?}", action.0);
251
252 let should_proceed = Self::should_proceed_signing(sui_client).await;
257 if !should_proceed {
258 metrics.action_executor_signing_queue_skipped_actions.inc();
259 warn!("skipping signing task: {:?}", action_key);
260 return;
261 }
262
263 let auth_agg_clone = auth_agg.clone();
264 let signing_queue_sender_clone = signing_queue_sender.clone();
265 let execution_queue_sender_clone = execution_queue_sender.clone();
266 let sui_client_clone = sui_client.clone();
267 let store_clone = store.clone();
268 let metrics_clone = metrics.clone();
269 let semaphore_clone = semaphore.clone();
270 spawn_logged_monitored_task!(
271 Self::request_signatures(
272 semaphore_clone,
273 sui_client_clone,
274 auth_agg_clone,
275 action,
276 store_clone,
277 signing_queue_sender_clone,
278 execution_queue_sender_clone,
279 metrics_clone,
280 )
281 .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
282 "request_signatures"
283 );
284 }
285
286 async fn handle_already_processed_token_transfer_action_maybe(
290 sui_client: &Arc<SuiClient<C>>,
291 action: &BridgeAction,
292 store: &Arc<BridgeOrchestratorTables>,
293 metrics: &Arc<BridgeMetrics>,
294 ) -> bool {
295 let status = sui_client
296 .get_token_transfer_action_onchain_status_until_success(
297 action.chain_id() as u8,
298 action.seq_number(),
299 )
300 .await;
301 match status {
302 BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
303 info!(
304 "Action already approved or claimed, removing action from pending logs: {:?}",
305 action
306 );
307 metrics.action_executor_already_processed_actions.inc();
308 store
309 .remove_pending_actions(&[action.digest()])
310 .unwrap_or_else(|e| {
311 panic!("Write to DB should not fail: {:?}", e);
312 });
313 true
314 }
315 BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
318 }
319 }
320
321 async fn request_signatures(
324 semaphore: Arc<Semaphore>,
325 sui_client: Arc<SuiClient<C>>,
326 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
327 action: BridgeActionExecutionWrapper,
328 store: Arc<BridgeOrchestratorTables>,
329 signing_queue_sender: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
330 execution_queue_sender: mysten_metrics::metered_channel::Sender<
331 CertifiedBridgeActionExecutionWrapper,
332 >,
333 metrics: Arc<BridgeMetrics>,
334 ) {
335 let _permit = semaphore
336 .acquire()
337 .await
338 .expect("semaphore should not be closed");
339 info!("requesting signatures");
340 let BridgeActionExecutionWrapper(action, attempt_times) = action;
341
342 match &action {
344 BridgeAction::SuiToEthBridgeAction(_)
345 | BridgeAction::SuiToEthTokenTransfer(_)
346 | BridgeAction::EthToSuiBridgeAction(_) => (),
347 _ => unreachable!("Non token transfer action should not reach here"),
348 };
349
350 if Self::handle_already_processed_token_transfer_action_maybe(
352 &sui_client,
353 &action,
354 &store,
355 &metrics,
356 )
357 .await
358 {
359 return;
360 }
361 match auth_agg
362 .load()
363 .request_committee_signatures(action.clone())
364 .await
365 {
366 Ok(certificate) => {
367 info!("Sending certificate to execution");
368 execution_queue_sender
369 .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
370 .await
371 .unwrap_or_else(|e| {
372 panic!("Sending to execution queue should not fail: {:?}", e);
373 });
374 }
375 Err(e) => {
376 warn!("Failed to collect sigs for bridge action: {:?}", e);
377 metrics.err_signature_aggregation.inc();
378
379 if attempt_times >= MAX_SIGNING_ATTEMPTS {
381 metrics.err_signature_aggregation_too_many_failures.inc();
382 error!(
383 "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
384 e
385 );
386 return;
387 }
388 delay(attempt_times).await;
389 signing_queue_sender
390 .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
391 .await
392 .unwrap_or_else(|e| {
393 panic!("Sending to signing queue should not fail: {:?}", e);
394 });
395 }
396 }
397 }
398
399 async fn run_onchain_execution_loop(
402 sui_client: Arc<SuiClient<C>>,
403 sui_key: SuiKeyPair,
404 sui_address: SuiAddress,
405 gas_object_id: ObjectID,
406 store: Arc<BridgeOrchestratorTables>,
407 execution_queue_sender: mysten_metrics::metered_channel::Sender<
408 CertifiedBridgeActionExecutionWrapper,
409 >,
410 mut execution_queue_receiver: mysten_metrics::metered_channel::Receiver<
411 CertifiedBridgeActionExecutionWrapper,
412 >,
413 bridge_object_arg: ObjectArg,
414 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
415 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
416 metrics: Arc<BridgeMetrics>,
417 ) {
418 info!("Starting run_onchain_execution_loop");
419 while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
420 if *bridge_pause_rx.borrow() {
424 warn!("Bridge is paused, skipping execution");
425 metrics
426 .action_executor_execution_queue_skipped_actions_due_to_pausing
427 .inc();
428 continue;
429 }
430 Self::handle_execution_task(
431 certificate_wrapper,
432 &sui_client,
433 &sui_key,
434 &sui_address,
435 gas_object_id,
436 &store,
437 &execution_queue_sender,
438 &bridge_object_arg,
439 &sui_token_type_tags,
440 &metrics,
441 )
442 .await;
443 }
444 panic!("Execution queue closed unexpectedly");
445 }
446
447 #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
448 async fn handle_execution_task(
449 certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
450 sui_client: &Arc<SuiClient<C>>,
451 sui_key: &SuiKeyPair,
452 sui_address: &SuiAddress,
453 gas_object_id: ObjectID,
454 store: &Arc<BridgeOrchestratorTables>,
455 execution_queue_sender: &mysten_metrics::metered_channel::Sender<
456 CertifiedBridgeActionExecutionWrapper,
457 >,
458 bridge_object_arg: &ObjectArg,
459 sui_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
460 metrics: &Arc<BridgeMetrics>,
461 ) {
462 metrics
463 .action_executor_execution_queue_received_actions
464 .inc();
465 let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
466 let action = certificate.data();
467 let action_key = action.key();
468
469 info!("Received certified action for execution: {:?}", action);
470
471 let (gas_coin, gas_object_ref) =
473 Self::get_gas_data_assert_ownership(*sui_address, gas_object_id, sui_client).await;
474 metrics.gas_coin_balance.set(gas_coin.value() as i64);
475
476 let ceriticate_clone = certificate.clone();
477
478 if Self::handle_already_processed_token_transfer_action_maybe(
480 sui_client, action, store, metrics,
481 )
482 .await
483 {
484 info!("Action already processed, skipping");
485 return;
486 }
487
488 info!("Building Sui transaction");
489 let rgp = sui_client.get_reference_gas_price_until_success().await;
490 let tx_data = match build_sui_transaction(
491 *sui_address,
492 &gas_object_ref,
493 ceriticate_clone,
494 *bridge_object_arg,
495 sui_token_type_tags.load().as_ref(),
496 rgp,
497 ) {
498 Ok(tx_data) => tx_data,
499 Err(err) => {
500 metrics.err_build_sui_transaction.inc();
501 error!(
502 "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
503 action, err
504 );
505 return;
508 }
509 };
510 let sig = Signature::new_secure(
511 &IntentMessage::new(Intent::sui_transaction(), &tx_data),
512 sui_key,
513 );
514 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
515 let tx_digest = *signed_tx.digest();
516
517 if Self::handle_already_processed_token_transfer_action_maybe(
519 sui_client, action, store, metrics,
520 )
521 .await
522 {
523 info!("Action already processed, skipping");
524 return;
525 }
526
527 info!(?tx_digest, ?gas_object_ref, "Sending transaction to Sui");
528 match sui_client
529 .execute_transaction_block_with_effects(signed_tx)
530 .await
531 {
532 Ok(resp) => {
533 Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
534 }
535
536 Err(err) => {
538 error!(
539 ?action_key,
540 ?tx_digest,
541 "Sui transaction failed at signing: {err:?}"
542 );
543 metrics.err_sui_transaction_submission.inc();
544 let metrics_clone = metrics.clone();
545 let sender_clone = execution_queue_sender.clone();
547 spawn_logged_monitored_task!(async move {
548 if attempt_times >= MAX_EXECUTION_ATTEMPTS {
550 metrics_clone
551 .err_sui_transaction_submission_too_many_failures
552 .inc();
553 error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
554 return;
555 }
556 delay(attempt_times).await;
557 sender_clone
558 .send(CertifiedBridgeActionExecutionWrapper(
559 certificate,
560 attempt_times + 1,
561 ))
562 .await
563 .unwrap_or_else(|e| {
564 panic!("Sending to execution queue should not fail: {:?}", e);
565 });
566 info!("Re-enqueued certificate for execution");
567 }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
568 }
569 }
570 }
571
572 async fn handle_execution_effects(
574 tx_digest: TransactionDigest,
575 response: ExecuteTransactionResult,
576 store: &Arc<BridgeOrchestratorTables>,
577 action: &BridgeAction,
578 metrics: &Arc<BridgeMetrics>,
579 ) {
580 match &response.status {
581 SuiExecutionStatus::Success => {
582 let relevant_events = response
583 .events
584 .iter()
585 .filter(|e| {
586 e.type_ == *TokenTransferAlreadyClaimed.get().unwrap()
587 || e.type_ == *TokenTransferClaimed.get().unwrap()
588 || e.type_ == *TokenTransferApproved.get().unwrap()
589 || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()
590 })
591 .collect::<Vec<_>>();
592 assert!(
593 !relevant_events.is_empty(),
594 "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved \
595 or TokenTransferAlreadyApproved event but got: {:?}",
596 response.events
597 );
598 info!(?tx_digest, "Sui transaction executed successfully");
599 relevant_events.iter().for_each(|e| {
601 if e.type_ == *TokenTransferClaimed.get().unwrap() {
602 match action {
603 BridgeAction::EthToSuiBridgeAction(_) => {
604 metrics.eth_sui_token_transfer_claimed.inc();
605 }
606 BridgeAction::SuiToEthBridgeAction(_) => {
607 metrics.sui_eth_token_transfer_claimed.inc();
608 }
609 _ => error!("Unexpected action type for claimed event: {:?}", action),
610 }
611 } else if e.type_ == *TokenTransferApproved.get().unwrap() {
612 match action {
613 BridgeAction::EthToSuiBridgeAction(_) => {
614 metrics.eth_sui_token_transfer_approved.inc();
615 }
616 BridgeAction::SuiToEthBridgeAction(_) => {
617 metrics.sui_eth_token_transfer_approved.inc();
618 }
619 _ => error!("Unexpected action type for approved event: {:?}", action),
620 }
621 }
622 });
623 store
624 .remove_pending_actions(&[action.digest()])
625 .unwrap_or_else(|e| {
626 panic!("Write to DB should not fail: {:?}", e);
627 })
628 }
629 SuiExecutionStatus::Failure { error } => {
630 metrics.err_sui_transaction_execution.inc();
637 error!(
638 ?tx_digest,
639 "Manual intervention is needed. Sui transaction executed and failed with error: {error:?}"
640 );
641 }
642 }
643 }
644
645 async fn get_gas_data_assert_ownership(
647 sui_address: SuiAddress,
648 gas_object_id: ObjectID,
649 sui_client: &SuiClient<C>,
650 ) -> (GasCoin, ObjectRef) {
651 let (gas_coin, gas_obj_ref, owner) = sui_client
652 .get_gas_data_panic_if_not_gas(gas_object_id)
653 .await;
654
655 assert_eq!(
658 owner,
659 Owner::AddressOwner(sui_address),
660 "Gas object {:?} is no longer owned by address {}",
661 gas_object_id,
662 sui_address
663 );
664 (gas_coin, gas_obj_ref)
665 }
666}
667
668pub async fn submit_to_executor(
669 tx: &mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
670 action: BridgeAction,
671) -> Result<(), BridgeError> {
672 tx.send(BridgeActionExecutionWrapper(action, 0))
673 .await
674 .map_err(|e| BridgeError::Generic(e.to_string()))
675}
676
677#[cfg(test)]
678mod tests {
679 use crate::events::init_all_struct_tags;
680 use crate::test_utils::DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
681 use crate::types::BRIDGE_PAUSED;
682 use fastcrypto::traits::KeyPair;
683 use prometheus::Registry;
684 use std::collections::{BTreeMap, HashMap};
685 use std::str::FromStr;
686 use sui_json_rpc_types::SuiEvent;
687 use sui_types::TypeTag;
688 use sui_types::crypto::get_key_pair;
689 use sui_types::gas_coin::GasCoin;
690 use sui_types::{base_types::random_object_ref, transaction::TransactionData};
691
692 use crate::{
693 crypto::{
694 BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
695 BridgeAuthorityRecoverableSignature,
696 },
697 server::mock_handler::BridgeRequestMockHandler,
698 sui_mock_client::SuiMockClient,
699 test_utils::{
700 get_test_authorities_and_run_mock_bridge_server, get_test_eth_to_sui_bridge_action,
701 get_test_sui_to_eth_bridge_action, sign_action_with_key,
702 },
703 types::{BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction},
704 };
705
706 use super::*;
707
708 #[tokio::test]
709 async fn test_onchain_execution_loop() {
710 let (
711 signing_tx,
712 _execution_tx,
713 sui_client_mock,
714 mut tx_subscription,
715 store,
716 secrets,
717 dummy_sui_key,
718 mock0,
719 mock1,
720 mock2,
721 mock3,
722 _handles,
723 gas_object_ref,
724 sui_address,
725 sui_token_type_tags,
726 _bridge_pause_tx,
727 ) = setup().await;
728 let (action_certificate, _, _) = get_bridge_authority_approved_action(
729 vec![&mock0, &mock1, &mock2, &mock3],
730 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
731 None,
732 true,
733 );
734 let action = action_certificate.data().clone();
735 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
736 let tx_data = build_sui_transaction(
737 sui_address,
738 &gas_object_ref,
739 action_certificate,
740 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
741 &id_token_map,
742 1000,
743 )
744 .unwrap();
745
746 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
747
748 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
750 gas_coin.clone(),
751 gas_object_ref,
752 Owner::AddressOwner(sui_address),
753 );
754
755 let mut event = SuiEvent::random_for_testing();
757 event.type_ = TokenTransferClaimed.get().unwrap().clone();
758 let events = vec![event];
759 mock_transaction_response(
760 &sui_client_mock,
761 tx_digest,
762 SuiExecutionStatus::Success,
763 Some(events),
764 true,
765 );
766
767 store
768 .insert_pending_actions(std::slice::from_ref(&action))
769 .unwrap();
770 assert_eq!(
771 store.get_all_pending_actions()[&action.digest()],
772 action.clone()
773 );
774
775 submit_to_executor(&signing_tx, action.clone())
777 .await
778 .unwrap();
779
780 tx_subscription.recv().await.unwrap();
782 assert!(store.get_all_pending_actions().is_empty());
783
784 let (action_certificate, _, _) = get_bridge_authority_approved_action(
789 vec![&mock0, &mock1, &mock2, &mock3],
790 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
791 None,
792 true,
793 );
794
795 let action = action_certificate.data().clone();
796
797 let tx_data = build_sui_transaction(
798 sui_address,
799 &gas_object_ref,
800 action_certificate,
801 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
802 &id_token_map,
803 1000,
804 )
805 .unwrap();
806 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
807
808 mock_transaction_response(
810 &sui_client_mock,
811 tx_digest,
812 SuiExecutionStatus::Failure {
813 error: "failure is mother of success".to_string(),
814 },
815 None,
816 true,
817 );
818
819 store
820 .insert_pending_actions(std::slice::from_ref(&action))
821 .unwrap();
822 assert_eq!(
823 store.get_all_pending_actions()[&action.digest()],
824 action.clone()
825 );
826
827 submit_to_executor(&signing_tx, action.clone())
829 .await
830 .unwrap();
831
832 tx_subscription.recv().await.unwrap();
834 assert_eq!(
836 store.get_all_pending_actions()[&action.digest()],
837 action.clone()
838 );
839
840 let (action_certificate, _, _) = get_bridge_authority_approved_action(
845 vec![&mock0, &mock1, &mock2, &mock3],
846 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
847 None,
848 true,
849 );
850
851 let action = action_certificate.data().clone();
852
853 let tx_data = build_sui_transaction(
854 sui_address,
855 &gas_object_ref,
856 action_certificate,
857 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
858 &id_token_map,
859 1000,
860 )
861 .unwrap();
862 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
863 mock_transaction_error(
864 &sui_client_mock,
865 tx_digest,
866 BridgeError::Generic("some random error".to_string()),
867 true,
868 );
869
870 store
871 .insert_pending_actions(std::slice::from_ref(&action))
872 .unwrap();
873 assert_eq!(
874 store.get_all_pending_actions()[&action.digest()],
875 action.clone()
876 );
877
878 submit_to_executor(&signing_tx, action.clone())
880 .await
881 .unwrap();
882
883 let tx_digest = tx_subscription.recv().await.unwrap();
885 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
886
887 assert!(
889 store
890 .get_all_pending_actions()
891 .contains_key(&action.digest())
892 );
893
894 let mut event = SuiEvent::random_for_testing();
896 event.type_ = TokenTransferClaimed.get().unwrap().clone();
897 let events = vec![event];
898 mock_transaction_response(
899 &sui_client_mock,
900 tx_digest,
901 SuiExecutionStatus::Success,
902 Some(events),
903 true,
904 );
905
906 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
908 assert!(
910 !store
911 .get_all_pending_actions()
912 .contains_key(&action.digest())
913 );
914 }
915
916 #[tokio::test]
917 async fn test_signature_aggregation_loop() {
918 let (
919 signing_tx,
920 _execution_tx,
921 sui_client_mock,
922 mut tx_subscription,
923 store,
924 secrets,
925 dummy_sui_key,
926 mock0,
927 mock1,
928 mock2,
929 mock3,
930 _handles,
931 gas_object_ref,
932 sui_address,
933 sui_token_type_tags,
934 _bridge_pause_tx,
935 ) = setup().await;
936 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
937 let (action_certificate, sui_tx_digest, sui_tx_event_index) =
938 get_bridge_authority_approved_action(
939 vec![&mock0, &mock1, &mock2, &mock3],
940 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
941 None,
942 true,
943 );
944 let action = action_certificate.data().clone();
945 mock_bridge_authority_signing_errors(
946 vec![&mock0, &mock1, &mock2],
947 sui_tx_digest,
948 sui_tx_event_index,
949 );
950 let mut sigs = mock_bridge_authority_sigs(
951 vec![&mock3],
952 &action,
953 vec![&secrets[3]],
954 sui_tx_digest,
955 sui_tx_event_index,
956 );
957
958 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
960 gas_coin,
961 gas_object_ref,
962 Owner::AddressOwner(sui_address),
963 );
964 store
965 .insert_pending_actions(std::slice::from_ref(&action))
966 .unwrap();
967 assert_eq!(
968 store.get_all_pending_actions()[&action.digest()],
969 action.clone()
970 );
971
972 submit_to_executor(&signing_tx, action.clone())
974 .await
975 .unwrap();
976
977 loop {
979 let requested_times =
980 mock0.get_sui_token_events_requested(sui_tx_digest, sui_tx_event_index);
981 if requested_times >= 2 {
982 break;
983 }
984 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
985 }
986 assert_eq!(
988 tx_subscription.try_recv().unwrap_err(),
989 tokio::sync::broadcast::error::TryRecvError::Empty
990 );
991 assert_eq!(
993 store.get_all_pending_actions()[&action.digest()],
994 action.clone()
995 );
996
997 let sig_from_2 = mock_bridge_authority_sigs(
999 vec![&mock2],
1000 &action,
1001 vec![&secrets[2]],
1002 sui_tx_digest,
1003 sui_tx_event_index,
1004 );
1005 sigs.extend(sig_from_2);
1006 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1007 action.clone(),
1008 BridgeCommitteeValiditySignInfo { signatures: sigs },
1009 );
1010 let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1011 let tx_data = build_sui_transaction(
1012 sui_address,
1013 &gas_object_ref,
1014 action_certificate,
1015 DUMMY_MUTALBE_BRIDGE_OBJECT_ARG,
1016 &id_token_map,
1017 1000,
1018 )
1019 .unwrap();
1020 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1021
1022 let mut event = SuiEvent::random_for_testing();
1023 event.type_ = TokenTransferClaimed.get().unwrap().clone();
1024 let events = vec![event];
1025 mock_transaction_response(
1026 &sui_client_mock,
1027 tx_digest,
1028 SuiExecutionStatus::Success,
1029 Some(events),
1030 true,
1031 );
1032
1033 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1035 assert!(
1037 !store
1038 .get_all_pending_actions()
1039 .contains_key(&action.digest())
1040 );
1041 }
1042
1043 #[tokio::test]
1044 async fn test_skip_request_signature_if_already_processed_on_chain() {
1045 let (
1046 signing_tx,
1047 _execution_tx,
1048 sui_client_mock,
1049 mut tx_subscription,
1050 store,
1051 _secrets,
1052 _dummy_sui_key,
1053 mock0,
1054 mock1,
1055 mock2,
1056 mock3,
1057 _handles,
1058 _gas_object_ref,
1059 _sui_address,
1060 _sui_token_type_tags,
1061 _bridge_pause_tx,
1062 ) = setup().await;
1063
1064 let sui_tx_digest = TransactionDigest::random();
1065 let sui_tx_event_index = 0;
1066 let action = get_test_sui_to_eth_bridge_action(
1067 Some(sui_tx_digest),
1068 Some(sui_tx_event_index),
1069 None,
1070 None,
1071 None,
1072 None,
1073 None,
1074 );
1075 mock_bridge_authority_signing_errors(
1076 vec![&mock0, &mock1, &mock2, &mock3],
1077 sui_tx_digest,
1078 sui_tx_event_index,
1079 );
1080 store
1081 .insert_pending_actions(std::slice::from_ref(&action))
1082 .unwrap();
1083 assert_eq!(
1084 store.get_all_pending_actions()[&action.digest()],
1085 action.clone()
1086 );
1087
1088 submit_to_executor(&signing_tx, action.clone())
1090 .await
1091 .unwrap();
1092 let action_digest = action.digest();
1093
1094 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1096 tx_subscription.try_recv().unwrap_err();
1097 assert!(store.get_all_pending_actions().contains_key(&action_digest));
1099
1100 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1101
1102 let now = std::time::Instant::now();
1104 while store.get_all_pending_actions().contains_key(&action_digest) {
1105 if now.elapsed().as_secs() > 10 {
1106 panic!("Timeout waiting for action to be removed from WAL");
1107 }
1108 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1109 }
1110 tx_subscription.try_recv().unwrap_err();
1111 }
1112
1113 #[tokio::test]
1114 async fn test_skip_tx_submission_if_already_processed_on_chain() {
1115 let (
1116 _signing_tx,
1117 execution_tx,
1118 sui_client_mock,
1119 mut tx_subscription,
1120 store,
1121 secrets,
1122 dummy_sui_key,
1123 mock0,
1124 mock1,
1125 mock2,
1126 mock3,
1127 _handles,
1128 gas_object_ref,
1129 sui_address,
1130 sui_token_type_tags,
1131 _bridge_pause_tx,
1132 ) = setup().await;
1133 let id_token_map = (*sui_token_type_tags.load().clone()).clone();
1134 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1135 vec![&mock0, &mock1, &mock2, &mock3],
1136 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1137 None,
1138 true,
1139 );
1140
1141 let action = action_certificate.data().clone();
1142 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1143 let tx_data = build_sui_transaction(
1144 sui_address,
1145 &gas_object_ref,
1146 action_certificate.clone(),
1147 arg,
1148 &id_token_map,
1149 1000,
1150 )
1151 .unwrap();
1152 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1153 mock_transaction_error(
1154 &sui_client_mock,
1155 tx_digest,
1156 BridgeError::Generic("some random error".to_string()),
1157 true,
1158 );
1159
1160 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1162 gas_coin.clone(),
1163 gas_object_ref,
1164 Owner::AddressOwner(sui_address),
1165 );
1166
1167 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1168
1169 store
1170 .insert_pending_actions(std::slice::from_ref(&action))
1171 .unwrap();
1172 assert_eq!(
1173 store.get_all_pending_actions()[&action.digest()],
1174 action.clone()
1175 );
1176
1177 execution_tx
1179 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1180 .await
1181 .unwrap();
1182
1183 tx_subscription.recv().await.unwrap();
1185
1186 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1188
1189 let now = std::time::Instant::now();
1191 let action_digest = action.digest();
1192 while store.get_all_pending_actions().contains_key(&action_digest) {
1193 if now.elapsed().as_secs() > 10 {
1194 panic!("Timeout waiting for action to be removed from WAL");
1195 }
1196 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1197 }
1198 }
1199
1200 #[tokio::test]
1201 async fn test_skip_tx_submission_if_bridge_is_paused() {
1202 let (
1203 _signing_tx,
1204 execution_tx,
1205 sui_client_mock,
1206 mut tx_subscription,
1207 store,
1208 secrets,
1209 dummy_sui_key,
1210 mock0,
1211 mock1,
1212 mock2,
1213 mock3,
1214 _handles,
1215 gas_object_ref,
1216 sui_address,
1217 sui_token_type_tags,
1218 bridge_pause_tx,
1219 ) = setup().await;
1220 let id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1221 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1222 vec![&mock0, &mock1, &mock2, &mock3],
1223 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1224 None,
1225 true,
1226 );
1227
1228 let action = action_certificate.data().clone();
1229 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1230 let tx_data = build_sui_transaction(
1231 sui_address,
1232 &gas_object_ref,
1233 action_certificate.clone(),
1234 arg,
1235 &id_token_map,
1236 1000,
1237 )
1238 .unwrap();
1239 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1240 mock_transaction_error(
1241 &sui_client_mock,
1242 tx_digest,
1243 BridgeError::Generic("some random error".to_string()),
1244 true,
1245 );
1246
1247 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1249 gas_coin.clone(),
1250 gas_object_ref,
1251 Owner::AddressOwner(sui_address),
1252 );
1253 let action_digest = action.digest();
1254 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1255
1256 assert!(!*bridge_pause_tx.borrow());
1258
1259 store
1260 .insert_pending_actions(std::slice::from_ref(&action))
1261 .unwrap();
1262 assert_eq!(
1263 store.get_all_pending_actions()[&action.digest()],
1264 action.clone()
1265 );
1266
1267 execution_tx
1269 .send(CertifiedBridgeActionExecutionWrapper(
1270 action_certificate.clone(),
1271 0,
1272 ))
1273 .await
1274 .unwrap();
1275
1276 tx_subscription.recv().await.unwrap();
1278
1279 bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1281
1282 execution_tx
1284 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1285 .await
1286 .unwrap();
1287
1288 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1289 assert_eq!(
1291 tx_subscription.try_recv().unwrap_err(),
1292 tokio::sync::broadcast::error::TryRecvError::Empty
1293 );
1294 assert_eq!(
1296 store.get_all_pending_actions()[&action_digest],
1297 action.clone()
1298 );
1299 }
1300
1301 #[tokio::test]
1302 async fn test_action_executor_handle_new_token() {
1303 let new_token_id = 255u8; let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1305 let (
1306 _signing_tx,
1307 execution_tx,
1308 sui_client_mock,
1309 mut tx_subscription,
1310 _store,
1311 secrets,
1312 dummy_sui_key,
1313 mock0,
1314 mock1,
1315 mock2,
1316 mock3,
1317 _handles,
1318 gas_object_ref,
1319 sui_address,
1320 sui_token_type_tags,
1321 _bridge_pause_tx,
1322 ) = setup().await;
1323 let mut id_token_map: HashMap<u8, TypeTag> = (*sui_token_type_tags.load().clone()).clone();
1324 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1325 vec![&mock0, &mock1, &mock2, &mock3],
1326 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1327 Some(new_token_id),
1328 false, );
1330
1331 let action = action_certificate.data().clone();
1332 let arg = DUMMY_MUTALBE_BRIDGE_OBJECT_ARG;
1333 let tx_data = build_sui_transaction(
1334 sui_address,
1335 &gas_object_ref,
1336 action_certificate.clone(),
1337 arg,
1338 &maplit::hashmap! {
1339 new_token_id => new_type_tag.clone()
1340 },
1341 1000,
1342 )
1343 .unwrap();
1344 let tx_digest = get_tx_digest(tx_data, &dummy_sui_key);
1345 mock_transaction_error(
1346 &sui_client_mock,
1347 tx_digest,
1348 BridgeError::Generic("some random error".to_string()),
1349 true,
1350 );
1351
1352 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); sui_client_mock.add_gas_object_info(
1354 gas_coin.clone(),
1355 gas_object_ref,
1356 Owner::AddressOwner(sui_address),
1357 );
1358 sui_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1359
1360 execution_tx
1362 .send(CertifiedBridgeActionExecutionWrapper(
1363 action_certificate.clone(),
1364 0,
1365 ))
1366 .await
1367 .unwrap();
1368
1369 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1370 assert_eq!(
1372 tx_subscription.try_recv().unwrap_err(),
1373 tokio::sync::broadcast::error::TryRecvError::Empty
1374 );
1375
1376 id_token_map.insert(new_token_id, new_type_tag);
1378 sui_token_type_tags.store(Arc::new(id_token_map));
1379
1380 execution_tx
1382 .send(CertifiedBridgeActionExecutionWrapper(
1383 action_certificate.clone(),
1384 0,
1385 ))
1386 .await
1387 .unwrap();
1388
1389 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1390 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1392 }
1393
1394 fn mock_bridge_authority_sigs(
1395 mocks: Vec<&BridgeRequestMockHandler>,
1396 action: &BridgeAction,
1397 secrets: Vec<&BridgeAuthorityKeyPair>,
1398 sui_tx_digest: TransactionDigest,
1399 sui_tx_event_index: u16,
1400 ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1401 assert_eq!(mocks.len(), secrets.len());
1402 let mut signed_actions = BTreeMap::new();
1403 for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1404 let signed_action = sign_action_with_key(action, secret);
1405 mock.add_sui_event_response(
1406 sui_tx_digest,
1407 sui_tx_event_index,
1408 Ok(signed_action.clone()),
1409 None,
1410 );
1411 signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1412 }
1413 signed_actions
1414 }
1415
1416 fn mock_bridge_authority_signing_errors(
1417 mocks: Vec<&BridgeRequestMockHandler>,
1418 sui_tx_digest: TransactionDigest,
1419 sui_tx_event_index: u16,
1420 ) {
1421 for mock in mocks {
1422 mock.add_sui_event_response(
1423 sui_tx_digest,
1424 sui_tx_event_index,
1425 Err(BridgeError::RestAPIError("small issue".into())),
1426 None,
1427 );
1428 }
1429 }
1430
1431 fn get_bridge_authority_approved_action(
1433 mocks: Vec<&BridgeRequestMockHandler>,
1434 secrets: Vec<&BridgeAuthorityKeyPair>,
1435 token_id: Option<u8>,
1436 sui_to_eth: bool,
1437 ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1438 let sui_tx_digest = TransactionDigest::random();
1439 let sui_tx_event_index = 1;
1440 let action = if sui_to_eth {
1441 get_test_sui_to_eth_bridge_action(
1442 Some(sui_tx_digest),
1443 Some(sui_tx_event_index),
1444 None,
1445 None,
1446 None,
1447 None,
1448 token_id,
1449 )
1450 } else {
1451 get_test_eth_to_sui_bridge_action(None, None, None, token_id)
1452 };
1453
1454 let sigs =
1455 mock_bridge_authority_sigs(mocks, &action, secrets, sui_tx_digest, sui_tx_event_index);
1456 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1457 action,
1458 BridgeCommitteeValiditySignInfo { signatures: sigs },
1459 );
1460 (
1461 VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1462 sui_tx_digest,
1463 sui_tx_event_index,
1464 )
1465 }
1466
1467 fn get_tx_digest(tx_data: TransactionData, dummy_sui_key: &SuiKeyPair) -> TransactionDigest {
1468 let sig = Signature::new_secure(
1469 &IntentMessage::new(Intent::sui_transaction(), &tx_data),
1470 dummy_sui_key,
1471 );
1472 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1473 *signed_tx.digest()
1474 }
1475
1476 fn mock_transaction_response(
1480 sui_client_mock: &SuiMockClient,
1481 tx_digest: TransactionDigest,
1482 status: SuiExecutionStatus,
1483 events: Option<Vec<SuiEvent>>,
1484 wildcard: bool,
1485 ) {
1486 let response = ExecuteTransactionResult {
1487 status,
1488 events: events.unwrap_or_default(),
1489 };
1490 if wildcard {
1491 sui_client_mock.set_wildcard_transaction_response(Ok(response));
1492 } else {
1493 sui_client_mock.add_transaction_response(tx_digest, Ok(response));
1494 }
1495 }
1496
1497 fn mock_transaction_error(
1498 sui_client_mock: &SuiMockClient,
1499 tx_digest: TransactionDigest,
1500 error: BridgeError,
1501 wildcard: bool,
1502 ) {
1503 if wildcard {
1504 sui_client_mock.set_wildcard_transaction_response(Err(error));
1505 } else {
1506 sui_client_mock.add_transaction_response(tx_digest, Err(error));
1507 }
1508 }
1509
1510 #[allow(clippy::type_complexity)]
1511 async fn setup() -> (
1512 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1513 mysten_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1514 SuiMockClient,
1515 tokio::sync::broadcast::Receiver<TransactionDigest>,
1516 Arc<BridgeOrchestratorTables>,
1517 Vec<BridgeAuthorityKeyPair>,
1518 SuiKeyPair,
1519 BridgeRequestMockHandler,
1520 BridgeRequestMockHandler,
1521 BridgeRequestMockHandler,
1522 BridgeRequestMockHandler,
1523 Vec<tokio::task::JoinHandle<()>>,
1524 ObjectRef,
1525 SuiAddress,
1526 Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1527 tokio::sync::watch::Sender<IsBridgePaused>,
1528 ) {
1529 telemetry_subscribers::init_for_testing();
1530 let registry = Registry::new();
1531 mysten_metrics::init_metrics(®istry);
1532 init_all_struct_tags();
1533
1534 let (sui_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1535 let sui_key = SuiKeyPair::from(kp);
1536 let gas_object_ref = random_object_ref();
1537 let temp_dir = tempfile::tempdir().unwrap();
1538 let store = BridgeOrchestratorTables::new(temp_dir.path());
1539 let sui_client_mock = SuiMockClient::default();
1540 let tx_subscription = sui_client_mock.subscribe_to_requested_transactions();
1541 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1542
1543 let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1546 let dummy_sui_key = SuiKeyPair::from(dummy_kp);
1547
1548 let mock0 = BridgeRequestMockHandler::new();
1549 let mock1 = BridgeRequestMockHandler::new();
1550 let mock2 = BridgeRequestMockHandler::new();
1551 let mock3 = BridgeRequestMockHandler::new();
1552
1553 let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1554 vec![2500, 2500, 2500, 2500],
1555 vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1556 );
1557
1558 let committee = BridgeCommittee::new(authorities).unwrap();
1559
1560 let agg = Arc::new(ArcSwap::new(Arc::new(
1561 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1562 )));
1563 let metrics = Arc::new(BridgeMetrics::new(®istry));
1564 let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
1565 let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
1566 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1567 let executor = BridgeActionExecutor::new(
1568 sui_client.clone(),
1569 agg.clone(),
1570 store.clone(),
1571 sui_key,
1572 sui_address,
1573 gas_object_ref.0,
1574 sui_token_type_tags.clone(),
1575 bridge_pause_rx,
1576 metrics,
1577 )
1578 .await;
1579
1580 let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1581 handles.extend(executor_handle);
1582
1583 (
1584 signing_tx,
1585 execution_tx,
1586 sui_client_mock,
1587 tx_subscription,
1588 store,
1589 secrets,
1590 dummy_sui_key,
1591 mock0,
1592 mock1,
1593 mock2,
1594 mock3,
1595 handles,
1596 gas_object_ref,
1597 sui_address,
1598 sui_token_type_tags,
1599 bridge_pause_tx,
1600 )
1601 }
1602}