1use crate::abi::EthBridgeEvent;
10use crate::action_executor::{
11 BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
12};
13use crate::error::BridgeError;
14use crate::events::SuiBridgeEvent;
15use crate::metrics::BridgeMetrics;
16use crate::storage::BridgeOrchestratorTables;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::sui_syncer::GrpcSyncedEvents;
19use crate::types::EthLog;
20use alloy::primitives::Address as EthAddress;
21use mysten_metrics::spawn_logged_monitored_task;
22use std::sync::Arc;
23use sui_json_rpc_types::SuiEvent;
24use sui_types::Identifier;
25use tokio::task::JoinHandle;
26use tracing::{error, info};
27
28pub struct BridgeOrchestrator<C> {
29 _sui_client: Arc<SuiClient<C>>,
30 sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
31 sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
32 eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
33 store: Arc<BridgeOrchestratorTables>,
34 sui_monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
35 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
36 metrics: Arc<BridgeMetrics>,
37}
38
39impl<C> BridgeOrchestrator<C>
40where
41 C: SuiClientInner + 'static,
42{
43 pub fn new(
44 sui_client: Arc<SuiClient<C>>,
45 sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
46 sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
47 eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
48 store: Arc<BridgeOrchestratorTables>,
49 sui_monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
50 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
51 metrics: Arc<BridgeMetrics>,
52 ) -> Self {
53 Self {
54 _sui_client: sui_client,
55 sui_events_rx,
56 sui_grpc_events_rx,
57 eth_events_rx,
58 store,
59 sui_monitor_tx,
60 eth_monitor_tx,
61 metrics,
62 }
63 }
64
65 pub async fn run(
66 self,
67 bridge_action_executor: impl BridgeActionExecutorTrait,
68 ) -> Vec<JoinHandle<()>> {
69 tracing::info!("Starting BridgeOrchestrator");
70 let mut task_handles = vec![];
71 let store_clone = self.store.clone();
72
73 let (handles, executor_sender) = bridge_action_executor.run();
75 task_handles.extend(handles);
76 let executor_sender_clone = executor_sender.clone();
77 let metrics_clone = self.metrics.clone();
78 task_handles.push(spawn_logged_monitored_task!(Self::run_sui_watcher(
79 store_clone,
80 executor_sender_clone,
81 self.sui_events_rx,
82 self.sui_monitor_tx,
83 metrics_clone,
84 )));
85 let store_clone = self.store.clone();
86
87 let actions = store_clone
89 .get_all_pending_actions()
90 .into_values()
91 .collect::<Vec<_>>();
92 for action in actions {
93 submit_to_executor(&executor_sender, action)
94 .await
95 .expect("Submit to executor should not fail");
96 }
97
98 let metrics_clone = self.metrics.clone();
99 task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
100 store_clone,
101 executor_sender,
102 self.eth_events_rx,
103 self.eth_monitor_tx,
104 metrics_clone,
105 )));
106
107 task_handles
108 }
109
110 pub async fn run_with_grpc(
111 self,
112 bridge_action_executor: impl BridgeActionExecutorTrait,
113 ) -> Vec<JoinHandle<()>> {
114 tracing::info!("Starting BridgeOrchestrator with gRPC syncer");
115 let mut task_handles = vec![];
116 let store_clone = self.store.clone();
117
118 let (handles, executor_sender) = bridge_action_executor.run();
120 task_handles.extend(handles);
121 let executor_sender_clone = executor_sender.clone();
122 let metrics_clone = self.metrics.clone();
123
124 task_handles.push(spawn_logged_monitored_task!(Self::run_sui_grpc_watcher(
125 store_clone,
126 executor_sender_clone,
127 self.sui_grpc_events_rx,
128 self.sui_monitor_tx,
129 metrics_clone,
130 )));
131 let store_clone = self.store.clone();
132
133 let actions = store_clone
135 .get_all_pending_actions()
136 .into_values()
137 .collect::<Vec<_>>();
138 for action in actions {
139 submit_to_executor(&executor_sender, action)
140 .await
141 .expect("Submit to executor should not fail");
142 }
143
144 let metrics_clone = self.metrics.clone();
145 task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
146 store_clone,
147 executor_sender,
148 self.eth_events_rx,
149 self.eth_monitor_tx,
150 metrics_clone,
151 )));
152
153 task_handles
154 }
155
156 async fn run_sui_watcher(
157 store: Arc<BridgeOrchestratorTables>,
158 executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
159 mut sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
160 monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
161 metrics: Arc<BridgeMetrics>,
162 ) {
163 info!("Starting sui watcher task");
164 while let Some((identifier, events)) = sui_events_rx.recv().await {
165 if events.is_empty() {
166 continue;
167 }
168 info!("Received {} Sui events: {:?}", events.len(), events);
169 metrics
170 .sui_watcher_received_events
171 .inc_by(events.len() as u64);
172 let bridge_events = events
173 .iter()
174 .filter_map(|sui_event| {
175 match SuiBridgeEvent::try_from_sui_event(sui_event) {
176 Ok(bridge_event) => Some(bridge_event),
177 Err(BridgeError::ZeroValueBridgeTransfer(_)) => {
179 error!("Zero value bridge transfer: {:?}", sui_event);
180 None
181 }
182 Err(e) => {
183 panic!(
184 "Sui Event could not be deserialzed to SuiBridgeEvent: {:?}",
185 e
186 );
187 }
188 }
189 })
190 .collect::<Vec<_>>();
191
192 let mut actions = vec![];
193 for (sui_event, opt_bridge_event) in events.iter().zip(bridge_events) {
194 if opt_bridge_event.is_none() {
195 metrics.sui_watcher_unrecognized_events.inc();
197 error!("Sui event not recognized: {:?}", sui_event);
198 continue;
199 }
200 let bridge_event: SuiBridgeEvent = opt_bridge_event.unwrap();
202 info!("Observed Sui bridge event: {:?}", bridge_event);
203
204 monitor_tx
206 .send(bridge_event.clone())
207 .await
208 .expect("Sending event to monitor channel should not fail");
209
210 if let Some(mut action) = bridge_event.try_into_bridge_action() {
211 metrics.last_observed_actions_seq_num.with_label_values(&[
212 action.chain_id().to_string().as_str(),
213 action.action_type().to_string().as_str(),
214 ]);
215
216 action = action.update_to_token_transfer();
217
218 actions.push(action);
219 }
220 }
221
222 if !actions.is_empty() {
223 info!("Received {} actions from Sui: {:?}", actions.len(), actions);
224 metrics
225 .sui_watcher_received_actions
226 .inc_by(actions.len() as u64);
227 store
229 .insert_pending_actions(&actions)
230 .expect("Store operation should not fail");
231 for action in actions {
232 submit_to_executor(&executor_tx, action)
233 .await
234 .expect("Submit to executor should not fail");
235 }
236 }
237
238 let cursor = events.last().unwrap().id;
240 store
241 .update_sui_event_cursor(identifier, cursor)
242 .expect("Store operation should not fail");
243 }
244 panic!("Sui event channel was closed unexpectedly");
245 }
246
247 pub async fn run_sui_grpc_watcher(
248 store: Arc<BridgeOrchestratorTables>,
249 executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
250 mut sui_grpc_events_rx: mysten_metrics::metered_channel::Receiver<GrpcSyncedEvents>,
251 monitor_tx: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
252 metrics: Arc<BridgeMetrics>,
253 ) {
254 info!("Starting sui gRPC watcher task");
255 while let Some((last_seq_num, events)) = sui_grpc_events_rx.recv().await {
256 if events.is_empty() {
257 continue;
258 }
259 info!(
260 "Received {} Sui events: last_seq_num={}",
261 events.len(),
262 last_seq_num
263 );
264 metrics
265 .sui_watcher_received_events
266 .inc_by(events.len() as u64);
267
268 let mut actions = vec![];
269 for bridge_event in events {
270 info!("Observed Sui bridge event (gRPC): {:?}", bridge_event);
271
272 monitor_tx
274 .send(bridge_event.clone())
275 .await
276 .expect("Sending event to monitor channel should not fail");
277
278 if let Some(mut action) = bridge_event.try_into_bridge_action() {
280 metrics.last_observed_actions_seq_num.with_label_values(&[
281 action.chain_id().to_string().as_str(),
282 action.action_type().to_string().as_str(),
283 ]);
284
285 action = action.update_to_token_transfer();
286 actions.push(action);
287 }
288 }
289
290 if !actions.is_empty() {
291 info!(
292 "Received {} actions from Sui gRPC: {:?}",
293 actions.len(),
294 actions
295 );
296 metrics
297 .sui_watcher_received_actions
298 .inc_by(actions.len() as u64);
299 store
301 .insert_pending_actions(&actions)
302 .expect("Store operation should not fail");
303 for action in actions {
304 submit_to_executor(&executor_tx, action)
305 .await
306 .expect("Submit to executor should not fail");
307 }
308 }
309
310 store
312 .update_sui_sequence_number_cursor(last_seq_num)
313 .expect("Store operation should not fail");
314 }
315 panic!("Sui gRPC event channel was closed unexpectedly");
316 }
317
318 async fn run_eth_watcher(
319 store: Arc<BridgeOrchestratorTables>,
320 executor_tx: mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
321 mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(
322 alloy::primitives::Address,
323 u64,
324 Vec<EthLog>,
325 )>,
326 eth_monitor_tx: mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
327 metrics: Arc<BridgeMetrics>,
328 ) {
329 info!("Starting eth watcher task");
330 while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
331 if logs.is_empty() {
332 store
333 .update_eth_event_cursor(contract, end_block)
334 .expect("Store operation should not fail");
335 continue;
336 }
337
338 info!("Received {} Eth events", logs.len());
339 metrics
340 .eth_watcher_received_events
341 .inc_by(logs.len() as u64);
342
343 let bridge_events = logs
344 .iter()
345 .map(EthBridgeEvent::try_from_eth_log)
346 .collect::<Vec<_>>();
347
348 let mut actions = vec![];
349 for (log, opt_bridge_event) in logs.iter().zip(bridge_events) {
350 if opt_bridge_event.is_none() {
351 metrics.eth_watcher_unrecognized_events.inc();
353 error!("Eth event not recognized: {:?}", log);
354 continue;
355 }
356 let bridge_event = opt_bridge_event.unwrap();
358 info!("Observed Eth bridge event: {:?}", bridge_event);
359
360 eth_monitor_tx
362 .send(bridge_event.clone())
363 .await
364 .expect("Sending event to monitor channel should not fail");
365
366 match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
367 Ok(Some(action)) => {
368 metrics.last_observed_actions_seq_num.with_label_values(&[
369 action.chain_id().to_string().as_str(),
370 action.action_type().to_string().as_str(),
371 ]);
372 actions.push(action)
373 }
374 Ok(None) => {}
375 Err(e) => {
376 error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
377 }
378 }
379 }
380 if !actions.is_empty() {
381 info!("Received {} actions from Eth: {:?}", actions.len(), actions);
382 metrics
383 .eth_watcher_received_actions
384 .inc_by(actions.len() as u64);
385 store
387 .insert_pending_actions(&actions)
388 .expect("Store operation should not fail");
389 for action in actions {
391 submit_to_executor(&executor_tx, action)
392 .await
393 .expect("Submit to executor should not fail");
394 }
395 }
396
397 store
398 .update_eth_event_cursor(contract, end_block)
399 .expect("Store operation should not fail");
400 }
401 panic!("Eth event channel was closed");
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use crate::{
408 test_utils::{get_test_eth_to_sui_bridge_action, get_test_log_and_action},
409 types::BridgeActionDigest,
410 };
411 use alloy::primitives::TxHash;
412 use prometheus::Registry;
413 use std::str::FromStr;
414
415 use super::*;
416 use crate::events::SuiBridgeEvent;
417 use crate::events::init_all_struct_tags;
418 use crate::test_utils::get_test_sui_to_eth_bridge_action;
419 use crate::{events::tests::get_test_sui_event_and_action, sui_mock_client::SuiMockClient};
420
421 #[tokio::test]
422 async fn test_sui_watcher_task() {
423 let (
427 sui_events_tx,
428 sui_events_rx,
429 _sui_grpc_events_tx,
430 sui_grpc_events_rx,
431 _eth_events_tx,
432 eth_events_rx,
433 sui_monitor_tx,
434 _sui_monitor_rx,
435 eth_monitor_tx,
436 _eth_monitor_rx,
437 sui_client,
438 store,
439 ) = setup();
440 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
441 let registry = Registry::new();
443 let metrics = Arc::new(BridgeMetrics::new(®istry));
444 let _handles = BridgeOrchestrator::new(
445 Arc::new(sui_client),
446 sui_events_rx,
447 sui_grpc_events_rx,
448 eth_events_rx,
449 store.clone(),
450 sui_monitor_tx,
451 eth_monitor_tx,
452 metrics,
453 )
454 .run(executor)
455 .await;
456
457 let identifier = Identifier::from_str("test_sui_watcher_task").unwrap();
458 let (sui_event, mut bridge_action) = get_test_sui_event_and_action(identifier.clone());
459 bridge_action = bridge_action.update_to_token_transfer();
460 sui_events_tx
461 .send((identifier.clone(), vec![sui_event.clone()]))
462 .await
463 .unwrap();
464
465 let start = std::time::Instant::now();
466 assert_eq!(
468 executor_requested_action_rx.recv().await.unwrap(),
469 bridge_action.digest()
470 );
471 loop {
472 let actions = store.get_all_pending_actions();
473 if actions.is_empty() {
474 if start.elapsed().as_secs() > 5 {
475 panic!("Timed out waiting for action to be written to WAL");
476 }
477 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
478 continue;
479 }
480 assert_eq!(actions.len(), 1);
481 let action = actions.get(&bridge_action.digest()).unwrap();
482 assert_eq!(action, &bridge_action);
483 assert_eq!(
484 store.get_sui_event_cursors(&[identifier]).unwrap()[0].unwrap(),
485 sui_event.id,
486 );
487 break;
488 }
489 }
490
491 #[tokio::test]
492 async fn test_eth_watcher_task() {
493 let (
498 _sui_events_tx,
499 sui_events_rx,
500 _sui_grpc_events_tx,
501 sui_grpc_events_rx,
502 eth_events_tx,
503 eth_events_rx,
504 sui_monitor_tx,
505 _sui_monitor_rx,
506 eth_monitor_tx,
507 _eth_monitor_rx,
508 sui_client,
509 store,
510 ) = setup();
511 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
512 let registry = Registry::new();
514 let metrics = Arc::new(BridgeMetrics::new(®istry));
515 let _handles = BridgeOrchestrator::new(
516 Arc::new(sui_client),
517 sui_events_rx,
518 sui_grpc_events_rx,
519 eth_events_rx,
520 store.clone(),
521 sui_monitor_tx,
522 eth_monitor_tx,
523 metrics,
524 )
525 .run(executor)
526 .await;
527 let address = EthAddress::random();
528 let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
529 let log_index_in_tx = 10;
530 let log_block_num = log.block_number.unwrap();
531 let eth_log = EthLog {
532 log: log.clone(),
533 tx_hash: log.transaction_hash.unwrap(),
534 block_number: log_block_num,
535 log_index_in_tx,
536 };
537 let end_block_num = log_block_num + 15;
538
539 eth_events_tx
540 .send((address, end_block_num, vec![eth_log.clone()]))
541 .await
542 .unwrap();
543
544 assert_eq!(
546 executor_requested_action_rx.recv().await.unwrap(),
547 bridge_action.digest()
548 );
549 let start = std::time::Instant::now();
550 loop {
551 let actions = store.get_all_pending_actions();
552 if actions.is_empty() {
553 if start.elapsed().as_secs() > 5 {
554 panic!("Timed out waiting for action to be written to WAL");
555 }
556 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
557 continue;
558 }
559 assert_eq!(actions.len(), 1);
560 let action = actions.get(&bridge_action.digest()).unwrap();
561 assert_eq!(action, &bridge_action);
562 assert_eq!(
563 store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
564 end_block_num,
565 );
566 break;
567 }
568 }
569
570 #[tokio::test]
571 async fn test_sui_grpc_watcher_task() {
572 let (
573 _sui_events_tx,
574 sui_events_rx,
575 sui_grpc_events_tx,
576 sui_grpc_events_rx,
577 _eth_events_tx,
578 eth_events_rx,
579 sui_monitor_tx,
580 _sui_monitor_rx,
581 eth_monitor_tx,
582 _eth_monitor_rx,
583 sui_client,
584 store,
585 ) = setup();
586 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
587 let registry = Registry::new();
589 let metrics = Arc::new(BridgeMetrics::new(®istry));
590 let _handles = BridgeOrchestrator::new(
591 Arc::new(sui_client),
592 sui_events_rx,
593 sui_grpc_events_rx,
594 eth_events_rx,
595 store.clone(),
596 sui_monitor_tx,
597 eth_monitor_tx,
598 metrics,
599 )
600 .run_with_grpc(executor)
601 .await;
602
603 let identifier = Identifier::from_str("test_sui_grpc_watcher_task").unwrap();
604 let (sui_event, mut bridge_action) = get_test_sui_event_and_action(identifier);
605 bridge_action = bridge_action.update_to_token_transfer();
606
607 let bridge_event = SuiBridgeEvent::try_from_sui_event(&sui_event)
609 .unwrap()
610 .unwrap();
611
612 let last_seq_num = 42u64;
613 sui_grpc_events_tx
614 .send((last_seq_num, vec![bridge_event]))
615 .await
616 .unwrap();
617
618 let start = std::time::Instant::now();
619 assert_eq!(
621 executor_requested_action_rx.recv().await.unwrap(),
622 bridge_action.digest()
623 );
624 loop {
625 let actions = store.get_all_pending_actions();
626 if actions.is_empty() {
627 if start.elapsed().as_secs() > 5 {
628 panic!("Timed out waiting for action to be written to WAL");
629 }
630 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
631 continue;
632 }
633 assert_eq!(actions.len(), 1);
634 let action = actions.get(&bridge_action.digest()).unwrap();
635 assert_eq!(action, &bridge_action);
636 assert_eq!(
638 store.get_sui_sequence_number_cursor().unwrap().unwrap(),
639 last_seq_num,
640 );
641 break;
642 }
643 }
644
645 #[tokio::test]
646 async fn test_resume_actions_in_pending_logs_with_grpc() {
648 let (
649 _sui_events_tx,
650 sui_events_rx,
651 _sui_grpc_events_tx,
652 sui_grpc_events_rx,
653 _eth_events_tx,
654 eth_events_rx,
655 sui_monitor_tx,
656 _sui_monitor_rx,
657 eth_monitor_tx,
658 _eth_monitor_rx,
659 sui_client,
660 store,
661 ) = setup();
662 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
663
664 let action1 = get_test_sui_to_eth_bridge_action(
665 None,
666 Some(0),
667 Some(99),
668 Some(10000),
669 None,
670 None,
671 None,
672 );
673
674 let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
675 store
676 .insert_pending_actions(&vec![action1.clone(), action2.clone()])
677 .unwrap();
678
679 let registry = Registry::new();
681 let metrics = Arc::new(BridgeMetrics::new(®istry));
682 let _handles = BridgeOrchestrator::new(
683 Arc::new(sui_client),
684 sui_events_rx,
685 sui_grpc_events_rx,
686 eth_events_rx,
687 store.clone(),
688 sui_monitor_tx,
689 eth_monitor_tx,
690 metrics,
691 )
692 .run_with_grpc(executor)
693 .await;
694
695 let mut digests = std::collections::HashSet::new();
697 digests.insert(executor_requested_action_rx.recv().await.unwrap());
698 digests.insert(executor_requested_action_rx.recv().await.unwrap());
699 assert!(digests.contains(&action1.digest()));
700 assert!(digests.contains(&action2.digest()));
701 assert_eq!(digests.len(), 2);
702 }
703
704 #[tokio::test]
705 async fn test_resume_actions_in_pending_logs() {
707 let (
708 _sui_events_tx,
709 sui_events_rx,
710 _sui_grpc_events_tx,
711 sui_grpc_events_rx,
712 _eth_events_tx,
713 eth_events_rx,
714 sui_monitor_tx,
715 _sui_monitor_rx,
716 eth_monitor_tx,
717 _eth_monitor_rx,
718 sui_client,
719 store,
720 ) = setup();
721 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
722
723 let action1 = get_test_sui_to_eth_bridge_action(
724 None,
725 Some(0),
726 Some(99),
727 Some(10000),
728 None,
729 None,
730 None,
731 );
732
733 let action2 = get_test_eth_to_sui_bridge_action(None, None, None, None);
734 store
735 .insert_pending_actions(&vec![action1.clone(), action2.clone()])
736 .unwrap();
737
738 let registry = Registry::new();
740 let metrics = Arc::new(BridgeMetrics::new(®istry));
741 let _handles = BridgeOrchestrator::new(
742 Arc::new(sui_client),
743 sui_events_rx,
744 sui_grpc_events_rx,
745 eth_events_rx,
746 store.clone(),
747 sui_monitor_tx,
748 eth_monitor_tx,
749 metrics,
750 )
751 .run(executor)
752 .await;
753
754 let mut digests = std::collections::HashSet::new();
756 digests.insert(executor_requested_action_rx.recv().await.unwrap());
757 digests.insert(executor_requested_action_rx.recv().await.unwrap());
758 assert!(digests.contains(&action1.digest()));
759 assert!(digests.contains(&action2.digest()));
760 assert_eq!(digests.len(), 2);
761 }
762
763 #[allow(clippy::type_complexity)]
764 fn setup() -> (
765 mysten_metrics::metered_channel::Sender<(Identifier, Vec<SuiEvent>)>,
766 mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
767 mysten_metrics::metered_channel::Sender<(u64, Vec<SuiBridgeEvent>)>,
768 mysten_metrics::metered_channel::Receiver<(u64, Vec<SuiBridgeEvent>)>,
769 mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
770 mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
771 mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
772 mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
773 mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
774 mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
775 SuiClient<SuiMockClient>,
776 Arc<BridgeOrchestratorTables>,
777 ) {
778 telemetry_subscribers::init_for_testing();
779 let registry = Registry::new();
780 mysten_metrics::init_metrics(®istry);
781
782 init_all_struct_tags();
783
784 let temp_dir = tempfile::tempdir().unwrap();
785 let store = BridgeOrchestratorTables::new(temp_dir.path());
786
787 let mock_client = SuiMockClient::default();
788 let sui_client = SuiClient::new_for_testing(mock_client.clone());
789
790 let (eth_events_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
791 100,
792 &mysten_metrics::get_metrics()
793 .unwrap()
794 .channel_inflight
795 .with_label_values(&["unit_test_eth_events_queue"]),
796 );
797
798 let (sui_events_tx, sui_events_rx) = mysten_metrics::metered_channel::channel(
799 100,
800 &mysten_metrics::get_metrics()
801 .unwrap()
802 .channel_inflight
803 .with_label_values(&["unit_test_legacy_sui_events_queue"]),
804 );
805 let (sui_grpc_events_tx, sui_grpc_events_rx) = mysten_metrics::metered_channel::channel(
806 100,
807 &mysten_metrics::get_metrics()
808 .unwrap()
809 .channel_inflight
810 .with_label_values(&["unit_test_sui_events_queue"]),
811 );
812 let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
813 10000,
814 &mysten_metrics::get_metrics()
815 .unwrap()
816 .channel_inflight
817 .with_label_values(&["sui_monitor_queue"]),
818 );
819 let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
820 10000,
821 &mysten_metrics::get_metrics()
822 .unwrap()
823 .channel_inflight
824 .with_label_values(&["eth_monitor_queue"]),
825 );
826 (
827 sui_events_tx,
828 sui_events_rx,
829 sui_grpc_events_tx,
830 sui_grpc_events_rx,
831 eth_events_tx,
832 eth_events_rx,
833 sui_monitor_tx,
834 sui_monitor_rx,
835 eth_monitor_tx,
836 eth_monitor_rx,
837 sui_client,
838 store,
839 )
840 }
841
842 struct MockExecutor {
844 requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
845 }
846
847 impl MockExecutor {
848 fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
849 let (tx, rx) = tokio::sync::broadcast::channel(100);
850 (
851 Self {
852 requested_transactions_tx: tx,
853 },
854 rx,
855 )
856 }
857 }
858
859 impl BridgeActionExecutorTrait for MockExecutor {
860 fn run(
861 self,
862 ) -> (
863 Vec<tokio::task::JoinHandle<()>>,
864 mysten_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
865 ) {
866 let (tx, mut rx) =
867 mysten_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
868 100,
869 &mysten_metrics::get_metrics()
870 .unwrap()
871 .channel_inflight
872 .with_label_values(&["unit_test_mock_executor"]),
873 );
874
875 let handles = tokio::spawn(async move {
876 while let Some(action) = rx.recv().await {
877 self.requested_transactions_tx
878 .send(action.0.digest())
879 .unwrap();
880 }
881 });
882 (vec![handles], tx)
883 }
884 }
885}