sui_bridge/
monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `BridgeMonitor` receives all `SuiBridgeEvent` and `EthBridgeEvent`
5//! and handles them accordingly.
6
7use crate::abi::{
8    EthBridgeCommitteeEvents, EthBridgeConfigEvents, EthBridgeEvent, EthBridgeLimiterEvents,
9    EthCommitteeUpgradeableContractEvents, EthSuiBridgeEvents,
10};
11use crate::client::bridge_authority_aggregator::BridgeAuthorityAggregator;
12use crate::crypto::BridgeAuthorityPublicKeyBytes;
13use crate::events::{BlocklistValidatorEvent, CommitteeMemberUrlUpdateEvent};
14use crate::events::{EmergencyOpEvent, SuiBridgeEvent};
15use crate::metrics::BridgeMetrics;
16use crate::retry_with_max_elapsed_time;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::types::{BridgeCommittee, IsBridgePaused};
19use arc_swap::ArcSwap;
20use futures::StreamExt;
21use std::collections::HashMap;
22use std::sync::Arc;
23use sui_rpc::field::{FieldMask, FieldMaskUtil};
24use sui_rpc::proto::sui::rpc::v2::{Checkpoint, SubscribeCheckpointsRequest};
25use sui_types::TypeTag;
26use tokio::time::Duration;
27use tracing::{error, info, warn};
28
29const REFRESH_BRIDGE_RETRY_TIMES: u64 = 3;
30
31pub struct BridgeMonitor<C> {
32    sui_client: Arc<SuiClient<C>>,
33    sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
34    eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
35    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
36    bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
37    sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
38    bridge_metrics: Arc<BridgeMetrics>,
39}
40
41impl<C> BridgeMonitor<C>
42where
43    C: SuiClientInner + 'static,
44{
45    pub fn new(
46        sui_client: Arc<SuiClient<C>>,
47        sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
48        eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
49        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
50        bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
51        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
52        bridge_metrics: Arc<BridgeMetrics>,
53    ) -> Self {
54        Self {
55            sui_client,
56            sui_monitor_rx,
57            eth_monitor_rx,
58            bridge_auth_agg,
59            bridge_paused_watch_tx,
60            sui_token_type_tags,
61            bridge_metrics,
62        }
63    }
64
65    pub async fn run(self) {
66        tracing::info!("Starting BridgeMonitor");
67        let Self {
68            sui_client,
69            mut sui_monitor_rx,
70            mut eth_monitor_rx,
71            bridge_auth_agg,
72            bridge_paused_watch_tx,
73            sui_token_type_tags,
74            bridge_metrics,
75        } = self;
76        let mut latest_token_config = (*sui_token_type_tags.load().clone()).clone();
77
78        loop {
79            tokio::select! {
80                sui_event = sui_monitor_rx.recv() => {
81                    if let Some(sui_event) = sui_event {
82                        Self::handle_sui_events(
83                            sui_event,
84                            &sui_client,
85                            &bridge_auth_agg,
86                            &bridge_paused_watch_tx,
87                            &sui_token_type_tags,
88                            &bridge_metrics,
89                            &mut latest_token_config,
90                        )
91                        .await;
92                    } else {
93                        panic!("BridgeMonitor sui events channel was closed unexpectedly");
94                    }
95                }
96                eth_event = eth_monitor_rx.recv() => {
97                    if let Some(eth_event) = eth_event {
98                        Self::handle_eth_events(eth_event, &bridge_metrics);
99                    } else {
100                        panic!("BridgeMonitor eth events channel was closed unexpectedly");
101                    }
102                }
103            }
104        }
105    }
106
107    async fn handle_sui_events(
108        event: SuiBridgeEvent,
109        sui_client: &Arc<SuiClient<C>>,
110        bridge_auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
111        bridge_paused_watch_tx: &tokio::sync::watch::Sender<IsBridgePaused>,
112        sui_token_type_tags: &Arc<ArcSwap<HashMap<u8, TypeTag>>>,
113        bridge_metrics: &Arc<BridgeMetrics>,
114        latest_token_config: &mut HashMap<u8, TypeTag>,
115    ) {
116        info!("Received SuiBridgeEvent: {:?}", event);
117        macro_rules! bump_sui_counter {
118            ($action:expr) => {
119                bridge_metrics
120                    .observed_governance_actions
121                    .with_label_values(&[$action, "sui"])
122                    .inc();
123            };
124        }
125
126        match event {
127            SuiBridgeEvent::SuiToEthTokenBridgeV1(_) => (),
128            SuiBridgeEvent::SuiToEthTokenBridgeV2(_) => (),
129            SuiBridgeEvent::TokenTransferApproved(_) => (),
130            SuiBridgeEvent::TokenTransferClaimed(_) => (),
131            SuiBridgeEvent::TokenTransferAlreadyApproved(_) => (),
132            SuiBridgeEvent::TokenTransferAlreadyClaimed(_) => (),
133            SuiBridgeEvent::TokenTransferLimitExceed(_) => {
134                // TODO do we want to do anything here?
135            }
136
137            SuiBridgeEvent::EmergencyOpEvent(event) => {
138                bump_sui_counter!(if event.frozen {
139                    "bridge_paused"
140                } else {
141                    "bridge_unpaused"
142                });
143                let is_paused = get_latest_bridge_pause_status_with_emergency_event(
144                    sui_client.clone(),
145                    event,
146                    Duration::from_secs(10),
147                )
148                .await;
149                bridge_paused_watch_tx
150                    .send(is_paused)
151                    .expect("Bridge pause status watch channel should not be closed");
152            }
153
154            SuiBridgeEvent::CommitteeMemberRegistration(_) => {
155                bump_sui_counter!("committee_member_registered");
156            }
157            SuiBridgeEvent::CommitteeUpdateEvent(_) => {
158                bump_sui_counter!("committee_updated");
159            }
160
161            SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => {
162                bump_sui_counter!("committee_member_url_updated");
163                let new_committee = get_latest_bridge_committee_with_url_update_event(
164                    sui_client.clone(),
165                    event,
166                    Duration::from_secs(10),
167                )
168                .await;
169                let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
170                bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
171                    Arc::new(new_committee),
172                    bridge_metrics.clone(),
173                    committee_names,
174                )));
175                info!("Committee updated with CommitteeMemberUrlUpdateEvent");
176            }
177
178            SuiBridgeEvent::BlocklistValidatorEvent(event) => {
179                bump_sui_counter!(if event.blocklisted {
180                    "validator_blocklisted"
181                } else {
182                    "validator_unblocklisted"
183                });
184                let new_committee = get_latest_bridge_committee_with_blocklist_event(
185                    sui_client.clone(),
186                    event,
187                    Duration::from_secs(10),
188                )
189                .await;
190                let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
191                bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
192                    Arc::new(new_committee),
193                    bridge_metrics.clone(),
194                    committee_names,
195                )));
196                info!("Committee updated with BlocklistValidatorEvent");
197            }
198
199            SuiBridgeEvent::TokenRegistrationEvent(_) => {
200                bump_sui_counter!("new_token_registered");
201            }
202
203            SuiBridgeEvent::NewTokenEvent(event) => {
204                bump_sui_counter!("new_token_added");
205                if let std::collections::hash_map::Entry::Vacant(entry) =
206                    // We only add new tokens but not remove so it's ok to just insert
207                    latest_token_config.entry(event.token_id)
208                {
209                    entry.insert(event.type_name.clone());
210                    sui_token_type_tags.store(Arc::new(latest_token_config.clone()));
211                } else {
212                    // invariant
213                    assert_eq!(event.type_name, latest_token_config[&event.token_id]);
214                }
215            }
216
217            SuiBridgeEvent::UpdateTokenPriceEvent(_event) => {
218                bump_sui_counter!("token_price_updated");
219            }
220
221            SuiBridgeEvent::UpdateRouteLimitEvent(_event) => {
222                bump_sui_counter!("limit_updated");
223            }
224        }
225    }
226
227    fn handle_eth_events(event: EthBridgeEvent, bridge_metrics: &Arc<BridgeMetrics>) {
228        info!("Received EthBridgeEvent: {:?}", event);
229
230        macro_rules! bump_eth_counter {
231            ($action:expr) => {
232                bridge_metrics
233                    .observed_governance_actions
234                    .with_label_values(&[$action, "eth"])
235                    .inc()
236            };
237        }
238
239        match event {
240            EthBridgeEvent::EthBridgeCommitteeEvents(event) => match event {
241                EthBridgeCommitteeEvents::BlocklistUpdated(event) => {
242                    bump_eth_counter!(if event.isBlocklisted {
243                        "validator_blocklisted"
244                    } else {
245                        "validator_unblocklisted"
246                    });
247                }
248                EthBridgeCommitteeEvents::Initialized(_) => {
249                    bump_eth_counter!("committee_contract_initialized");
250                }
251                EthBridgeCommitteeEvents::Upgraded(_) => {
252                    bump_eth_counter!("committee_contract_upgraded");
253                }
254                EthBridgeCommitteeEvents::BlocklistUpdatedV2(e) => {
255                    bump_eth_counter!(if e.isBlocklisted {
256                        "validator_blocklisted"
257                    } else {
258                        "validator_unblocklisted"
259                    });
260                }
261                EthBridgeCommitteeEvents::ContractUpgraded(_) => {
262                    bump_eth_counter!("committee_contract_upgraded");
263                }
264            },
265            EthBridgeEvent::EthBridgeLimiterEvents(event) => match event {
266                EthBridgeLimiterEvents::Initialized(_) => {
267                    bump_eth_counter!("limiter_contract_initialized");
268                }
269                EthBridgeLimiterEvents::Upgraded(_) => {
270                    bump_eth_counter!("limiter_contract_upgraded");
271                }
272                EthBridgeLimiterEvents::OwnershipTransferred(_) => {
273                    bump_eth_counter!("limiter_contract_ownership_transferred");
274                }
275                EthBridgeLimiterEvents::LimitUpdated(_) => {
276                    bump_eth_counter!("limit_updated");
277                }
278                // This event is deprecated but we keep it for ABI compatibility
279                // TODO: We can safely update abi and remove it once the testnet bridge contract is upgraded
280                EthBridgeLimiterEvents::HourlyTransferAmountUpdated(_) => (),
281                EthBridgeLimiterEvents::ContractUpgraded(_) => {
282                    bump_eth_counter!("limiter_contract_upgraded");
283                }
284                EthBridgeLimiterEvents::LimitUpdatedV2(_) => {
285                    bump_eth_counter!("limit_updated");
286                }
287            },
288            EthBridgeEvent::EthBridgeConfigEvents(event) => match event {
289                EthBridgeConfigEvents::Initialized(_) => {
290                    bump_eth_counter!("config_contract_initialized");
291                }
292                EthBridgeConfigEvents::Upgraded(_) => {
293                    bump_eth_counter!("config_contract_upgraded");
294                }
295                EthBridgeConfigEvents::TokenAdded(_) => {
296                    bump_eth_counter!("new_token_added");
297                }
298                EthBridgeConfigEvents::TokenPriceUpdated(_) => {
299                    bump_eth_counter!("update_token_price");
300                }
301                EthBridgeConfigEvents::ContractUpgraded(_) => {
302                    bump_eth_counter!("config_contract_upgraded");
303                }
304                EthBridgeConfigEvents::TokenPriceUpdatedV2(_) => {
305                    bump_eth_counter!("update_token_price");
306                }
307                EthBridgeConfigEvents::TokensAddedV2(_) => {
308                    bump_eth_counter!("new_token_added");
309                }
310            },
311            EthBridgeEvent::EthCommitteeUpgradeableContractEvents(event) => match event {
312                EthCommitteeUpgradeableContractEvents::Initialized(_) => {
313                    bump_eth_counter!("upgradeable_contract_initialized");
314                }
315                EthCommitteeUpgradeableContractEvents::Upgraded(_) => {
316                    bump_eth_counter!("upgradeable_contract_upgraded");
317                }
318            },
319            EthBridgeEvent::EthSuiBridgeEvents(event) => match event {
320                EthSuiBridgeEvents::TokensClaimed(_) => (),
321                EthSuiBridgeEvents::TokensDeposited(_) => (),
322                EthSuiBridgeEvents::TokensDepositedV2(_) => (),
323                EthSuiBridgeEvents::Paused(_) => bump_eth_counter!("bridge_paused"),
324                EthSuiBridgeEvents::Unpaused(_) => bump_eth_counter!("bridge_unpaused"),
325                EthSuiBridgeEvents::Upgraded(_) => {
326                    bump_eth_counter!("bridge_contract_upgraded")
327                }
328                EthSuiBridgeEvents::Initialized(_) => {
329                    bump_eth_counter!("bridge_contract_initialized")
330                }
331                EthSuiBridgeEvents::ContractUpgraded(_) => {
332                    bump_eth_counter!("bridge_contract_upgraded")
333                }
334                EthSuiBridgeEvents::EmergencyOperation(e) => {
335                    if e.paused {
336                        bump_eth_counter!("bridge_paused")
337                    } else {
338                        bump_eth_counter!("bridge_unpaused")
339                    }
340                }
341            },
342        }
343    }
344}
345
346async fn get_latest_bridge_committee_with_url_update_event<C: SuiClientInner>(
347    sui_client: Arc<SuiClient<C>>,
348    event: CommitteeMemberUrlUpdateEvent,
349    staleness_retry_interval: Duration,
350) -> BridgeCommittee {
351    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
352    loop {
353        let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
354            sui_client.get_bridge_committee(),
355            Duration::from_secs(600)
356        ) else {
357            error!("Failed to get bridge committee after retry");
358            continue;
359        };
360        let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(&event.member));
361        let Some(member) = member else {
362            // This is possible when a node is processing an older event while the member quitted at a later point, which is fine.
363            // Or fullnode returns a stale committee that the member hasn't joined, which is rare and tricy to handle so we just log it.
364            warn!(
365                "Committee member not found in the committee: {:?}",
366                event.member
367            );
368            return committee;
369        };
370        if member.base_url == event.new_url {
371            return committee;
372        }
373        // If url does not match, it could be:
374        // 1. the query is sent to a stale fullnode that does not have the latest data yet
375        // 2. the node is processing an older message, and the latest url has changed again
376        // In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
377        tokio::time::sleep(staleness_retry_interval).await;
378        remaining_retry_times -= 1;
379        if remaining_retry_times == 0 {
380            warn!(
381                "Committee member url {:?} does not match onchain record {:?} after retry",
382                event.member, member
383            );
384            return committee;
385        }
386    }
387}
388
389async fn get_latest_bridge_committee_with_blocklist_event<C: SuiClientInner>(
390    sui_client: Arc<SuiClient<C>>,
391    event: BlocklistValidatorEvent,
392    staleness_retry_interval: Duration,
393) -> BridgeCommittee {
394    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
395    loop {
396        let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
397            sui_client.get_bridge_committee(),
398            Duration::from_secs(600)
399        ) else {
400            error!("Failed to get bridge committee after retry");
401            continue;
402        };
403        let mut any_mismatch = false;
404        for pk in &event.public_keys {
405            let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(pk));
406            let Some(member) = member else {
407                // This is possible when a node is processing an older event while the member
408                // quitted at a later point. Or fullnode returns a stale committee that
409                // the member hasn't joined.
410                warn!("Committee member not found in the committee: {:?}", pk);
411                any_mismatch = true;
412                break;
413            };
414            if member.is_blocklisted != event.blocklisted {
415                warn!(
416                    "Committee member blocklist status does not match onchain record: {:?}",
417                    member
418                );
419                any_mismatch = true;
420                break;
421            }
422        }
423        if !any_mismatch {
424            return committee;
425        }
426        // If there is any match, it could be:
427        // 1. the query is sent to a stale fullnode that does not have the latest data yet
428        // 2. the node is processing an older message, and the latest blocklist status has changed again
429        // In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
430        tokio::time::sleep(staleness_retry_interval).await;
431        remaining_retry_times -= 1;
432        if remaining_retry_times == 0 {
433            warn!(
434                "Committee member blocklist status {:?} does not match onchain record after retry",
435                event
436            );
437            return committee;
438        }
439    }
440}
441
442async fn get_latest_bridge_pause_status_with_emergency_event<C: SuiClientInner>(
443    sui_client: Arc<SuiClient<C>>,
444    event: EmergencyOpEvent,
445    staleness_retry_interval: Duration,
446) -> IsBridgePaused {
447    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
448    loop {
449        let Ok(Ok(summary)) =
450            retry_with_max_elapsed_time!(sui_client.get_bridge_summary(), Duration::from_secs(600))
451        else {
452            error!("Failed to get bridge summary after retry");
453            continue;
454        };
455        if summary.is_frozen == event.frozen {
456            return summary.is_frozen;
457        }
458        // If the onchain status does not match, it could be:
459        // 1. the query is sent to a stale fullnode that does not have the latest data yet
460        // 2. the node is processing an older message, and the latest status has changed again
461        // In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
462        tokio::time::sleep(staleness_retry_interval).await;
463        remaining_retry_times -= 1;
464        if remaining_retry_times == 0 {
465            warn!(
466                "Bridge pause status {:?} does not match onchain record {:?} after retry",
467                event, summary.is_frozen
468            );
469            return summary.is_frozen;
470        }
471    }
472}
473
474pub async fn subscribe_bridge_events(
475    mut client: sui_rpc::Client,
476    sender: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
477) {
478    let subscription_read_mask = FieldMask::from_paths([
479        Checkpoint::path_builder().sequence_number(),
480        Checkpoint::path_builder()
481            .transactions()
482            .events()
483            .bcs()
484            .value(),
485        Checkpoint::path_builder().transactions().digest(),
486    ]);
487
488    loop {
489        let mut subscription = match client
490            .subscription_client()
491            .subscribe_checkpoints(
492                SubscribeCheckpointsRequest::default()
493                    .with_read_mask(subscription_read_mask.clone()),
494            )
495            .await
496        {
497            Ok(subscription) => subscription,
498            Err(e) => {
499                tracing::warn!("error trying to subscribe to checkpoints: {e}");
500                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
501                continue;
502            }
503        }
504        .into_inner();
505
506        while let Some(item) = subscription.next().await {
507            let checkpoint = match item {
508                Ok(checkpoint) => checkpoint,
509                Err(e) => {
510                    tracing::warn!("error in checkpoint stream: {e}");
511                    break;
512                }
513            };
514
515            let ckpt = checkpoint.cursor();
516            tracing::debug!("recieved checkpoint {ckpt}");
517
518            for txn in checkpoint.checkpoint().transactions() {
519                let txn_digest = txn.digest();
520                let Some(bcs_events) = txn.events_opt().map(|events| events.bcs()) else {
521                    continue;
522                };
523
524                let Ok(events) = bcs_events.deserialize::<sui_types::effects::TransactionEvents>()
525                else {
526                    tracing::warn!(
527                        "error deserializing events from txn {txn_digest} in checkpoint {ckpt}"
528                    );
529                    continue;
530                };
531
532                for event in events.data {
533                    match SuiBridgeEvent::try_from_event(&event) {
534                        Ok(Some(bridge_event)) => {
535                            sender
536                                .send(bridge_event)
537                                .await
538                                .expect("Sending event to monitor channel should not fail");
539                        }
540                        Ok(None) => {}
541                        Err(e) => {
542                            tracing::warn!(
543                                "error deserializing SuiBridgeEvent from txn {txn_digest} in checkpoint {ckpt}: {e}"
544                            );
545                        }
546                    }
547                }
548            }
549        }
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use std::str::FromStr;
556
557    use super::*;
558    use crate::events::{NewTokenEvent, init_all_struct_tags};
559    use crate::test_utils::{
560        bridge_committee_to_bridge_committee_summary, get_test_authority_and_key,
561    };
562    use crate::types::{BRIDGE_PAUSED, BRIDGE_UNPAUSED, BridgeAuthority};
563    use fastcrypto::traits::KeyPair;
564    use prometheus::Registry;
565    use sui_types::base_types::SuiAddress;
566    use sui_types::bridge::BridgeCommitteeSummary;
567    use sui_types::bridge::MoveTypeCommitteeMember;
568    use sui_types::crypto::get_key_pair;
569
570    use crate::{sui_mock_client::SuiMockClient, types::BridgeCommittee};
571    use sui_types::crypto::ToFromBytes;
572
573    #[tokio::test]
574    async fn test_get_latest_bridge_committee_with_url_update_event() {
575        telemetry_subscribers::init_for_testing();
576        let sui_client_mock = SuiMockClient::default();
577        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
578        let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
579        let pk = kp.public().clone();
580        let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
581        let pk_bytes = pk_as_bytes.as_bytes().to_vec();
582        let event = CommitteeMemberUrlUpdateEvent {
583            member: pk,
584            new_url: "http://new.url".to_string(),
585        };
586        let summary = BridgeCommitteeSummary {
587            members: vec![(
588                pk_bytes.clone(),
589                MoveTypeCommitteeMember {
590                    sui_address: SuiAddress::random_for_testing_only(),
591                    bridge_pubkey_bytes: pk_bytes.clone(),
592                    voting_power: 10000,
593                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
594                    blocklisted: false,
595                },
596            )],
597            member_registration: vec![],
598            last_committee_update_epoch: 0,
599        };
600
601        // Test the regular case, the onchain url matches
602        sui_client_mock.set_bridge_committee(summary.clone());
603        let timer = std::time::Instant::now();
604        let committee = get_latest_bridge_committee_with_url_update_event(
605            sui_client.clone(),
606            event.clone(),
607            Duration::from_secs(2),
608        )
609        .await;
610        assert_eq!(
611            committee.member(&pk_as_bytes).unwrap().base_url,
612            "http://new.url"
613        );
614        assert!(timer.elapsed().as_millis() < 500);
615
616        // Test the case where the onchain url is older. Then update onchain url in 1 second.
617        // Since the retry interval is 2 seconds, it should return the next retry.
618        let old_summary = BridgeCommitteeSummary {
619            members: vec![(
620                pk_bytes.clone(),
621                MoveTypeCommitteeMember {
622                    sui_address: SuiAddress::random_for_testing_only(),
623                    bridge_pubkey_bytes: pk_bytes.clone(),
624                    voting_power: 10000,
625                    http_rest_url: "http://old.url".to_string().as_bytes().to_vec(),
626                    blocklisted: false,
627                },
628            )],
629            member_registration: vec![],
630            last_committee_update_epoch: 0,
631        };
632        sui_client_mock.set_bridge_committee(old_summary.clone());
633        let timer = std::time::Instant::now();
634        // update the url to "http://new.url" in 1 second
635        let sui_client_mock_clone = sui_client_mock.clone();
636        tokio::spawn(async move {
637            tokio::time::sleep(Duration::from_secs(1)).await;
638            sui_client_mock_clone.set_bridge_committee(summary.clone());
639        });
640        let committee = get_latest_bridge_committee_with_url_update_event(
641            sui_client.clone(),
642            event.clone(),
643            Duration::from_secs(2),
644        )
645        .await;
646        assert_eq!(
647            committee.member(&pk_as_bytes).unwrap().base_url,
648            "http://new.url"
649        );
650        let elapsed = timer.elapsed().as_millis();
651        assert!(elapsed > 1000 && elapsed < 3000);
652
653        // Test the case where the onchain url is newer. It should retry up to
654        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
655        let newer_summary = BridgeCommitteeSummary {
656            members: vec![(
657                pk_bytes.clone(),
658                MoveTypeCommitteeMember {
659                    sui_address: SuiAddress::random_for_testing_only(),
660                    bridge_pubkey_bytes: pk_bytes.clone(),
661                    voting_power: 10000,
662                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
663                    blocklisted: false,
664                },
665            )],
666            member_registration: vec![],
667            last_committee_update_epoch: 0,
668        };
669        sui_client_mock.set_bridge_committee(newer_summary.clone());
670        let timer = std::time::Instant::now();
671        let committee = get_latest_bridge_committee_with_url_update_event(
672            sui_client.clone(),
673            event.clone(),
674            Duration::from_millis(500),
675        )
676        .await;
677        assert_eq!(
678            committee.member(&pk_as_bytes).unwrap().base_url,
679            "http://newer.url"
680        );
681        let elapsed = timer.elapsed().as_millis();
682        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
683
684        // Test the case where the member is not found in the committee
685        // It should return the onchain record.
686        let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
687        let pk2 = kp2.public().clone();
688        let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
689        let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
690        let newer_summary = BridgeCommitteeSummary {
691            members: vec![(
692                pk_bytes2.clone(),
693                MoveTypeCommitteeMember {
694                    sui_address: SuiAddress::random_for_testing_only(),
695                    bridge_pubkey_bytes: pk_bytes2.clone(),
696                    voting_power: 10000,
697                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
698                    blocklisted: false,
699                },
700            )],
701            member_registration: vec![],
702            last_committee_update_epoch: 0,
703        };
704        sui_client_mock.set_bridge_committee(newer_summary.clone());
705        let timer = std::time::Instant::now();
706        let committee = get_latest_bridge_committee_with_url_update_event(
707            sui_client.clone(),
708            event.clone(),
709            Duration::from_secs(1),
710        )
711        .await;
712        assert_eq!(
713            committee.member(&pk_as_bytes2).unwrap().base_url,
714            "http://newer.url"
715        );
716        assert!(committee.member(&pk_as_bytes).is_none());
717        let elapsed = timer.elapsed().as_millis();
718        assert!(elapsed < 1000);
719    }
720
721    #[tokio::test]
722    async fn test_get_latest_bridge_committee_with_blocklist_event() {
723        telemetry_subscribers::init_for_testing();
724        let sui_client_mock = SuiMockClient::default();
725        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
726        let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
727        let pk = kp.public().clone();
728        let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
729        let pk_bytes = pk_as_bytes.as_bytes().to_vec();
730
731        // Test the case where the onchain status is the same as the event (blocklisted)
732        let event = BlocklistValidatorEvent {
733            blocklisted: true,
734            public_keys: vec![pk.clone()],
735        };
736        let summary = BridgeCommitteeSummary {
737            members: vec![(
738                pk_bytes.clone(),
739                MoveTypeCommitteeMember {
740                    sui_address: SuiAddress::random_for_testing_only(),
741                    bridge_pubkey_bytes: pk_bytes.clone(),
742                    voting_power: 10000,
743                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
744                    blocklisted: true,
745                },
746            )],
747            member_registration: vec![],
748            last_committee_update_epoch: 0,
749        };
750        sui_client_mock.set_bridge_committee(summary.clone());
751        let timer = std::time::Instant::now();
752        let committee = get_latest_bridge_committee_with_blocklist_event(
753            sui_client.clone(),
754            event.clone(),
755            Duration::from_secs(2),
756        )
757        .await;
758        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
759        assert!(timer.elapsed().as_millis() < 500);
760
761        // Test the case where the onchain status is the same as the event (unblocklisted)
762        let event = BlocklistValidatorEvent {
763            blocklisted: false,
764            public_keys: vec![pk.clone()],
765        };
766        let summary = BridgeCommitteeSummary {
767            members: vec![(
768                pk_bytes.clone(),
769                MoveTypeCommitteeMember {
770                    sui_address: SuiAddress::random_for_testing_only(),
771                    bridge_pubkey_bytes: pk_bytes.clone(),
772                    voting_power: 10000,
773                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
774                    blocklisted: false,
775                },
776            )],
777            member_registration: vec![],
778            last_committee_update_epoch: 0,
779        };
780        sui_client_mock.set_bridge_committee(summary.clone());
781        let timer = std::time::Instant::now();
782        let committee = get_latest_bridge_committee_with_blocklist_event(
783            sui_client.clone(),
784            event.clone(),
785            Duration::from_secs(2),
786        )
787        .await;
788        assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
789        assert!(timer.elapsed().as_millis() < 500);
790
791        // Test the case where the onchain status is older. Then update onchain status in 1 second.
792        // Since the retry interval is 2 seconds, it should return the next retry.
793        let old_summary = BridgeCommitteeSummary {
794            members: vec![(
795                pk_bytes.clone(),
796                MoveTypeCommitteeMember {
797                    sui_address: SuiAddress::random_for_testing_only(),
798                    bridge_pubkey_bytes: pk_bytes.clone(),
799                    voting_power: 10000,
800                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
801                    blocklisted: true,
802                },
803            )],
804            member_registration: vec![],
805            last_committee_update_epoch: 0,
806        };
807        sui_client_mock.set_bridge_committee(old_summary.clone());
808        let timer = std::time::Instant::now();
809        // update unblocklisted in 1 second
810        let sui_client_mock_clone = sui_client_mock.clone();
811        tokio::spawn(async move {
812            tokio::time::sleep(Duration::from_secs(1)).await;
813            sui_client_mock_clone.set_bridge_committee(summary.clone());
814        });
815        let committee = get_latest_bridge_committee_with_blocklist_event(
816            sui_client.clone(),
817            event.clone(),
818            Duration::from_secs(2),
819        )
820        .await;
821        assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
822        let elapsed = timer.elapsed().as_millis();
823        assert!(elapsed > 1000 && elapsed < 3000);
824
825        // Test the case where the onchain url is newer. It should retry up to
826        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
827        let newer_summary = BridgeCommitteeSummary {
828            members: vec![(
829                pk_bytes.clone(),
830                MoveTypeCommitteeMember {
831                    sui_address: SuiAddress::random_for_testing_only(),
832                    bridge_pubkey_bytes: pk_bytes.clone(),
833                    voting_power: 10000,
834                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
835                    blocklisted: true,
836                },
837            )],
838            member_registration: vec![],
839            last_committee_update_epoch: 0,
840        };
841        sui_client_mock.set_bridge_committee(newer_summary.clone());
842        let timer = std::time::Instant::now();
843        let committee = get_latest_bridge_committee_with_blocklist_event(
844            sui_client.clone(),
845            event.clone(),
846            Duration::from_millis(500),
847        )
848        .await;
849        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
850        let elapsed = timer.elapsed().as_millis();
851        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
852
853        // Test the case where the member onchain url is not found in the committee
854        // It should return the onchain record after retrying a few times.
855        let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
856        let pk2 = kp2.public().clone();
857        let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
858        let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
859        let summary = BridgeCommitteeSummary {
860            members: vec![(
861                pk_bytes2.clone(),
862                MoveTypeCommitteeMember {
863                    sui_address: SuiAddress::random_for_testing_only(),
864                    bridge_pubkey_bytes: pk_bytes2.clone(),
865                    voting_power: 10000,
866                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
867                    blocklisted: false,
868                },
869            )],
870            member_registration: vec![],
871            last_committee_update_epoch: 0,
872        };
873        sui_client_mock.set_bridge_committee(summary.clone());
874        let timer = std::time::Instant::now();
875        let committee = get_latest_bridge_committee_with_blocklist_event(
876            sui_client.clone(),
877            event.clone(),
878            Duration::from_secs(1),
879        )
880        .await;
881        assert_eq!(
882            committee.member(&pk_as_bytes2).unwrap().base_url,
883            "http://newer.url"
884        );
885        assert!(committee.member(&pk_as_bytes).is_none());
886        let elapsed = timer.elapsed().as_millis();
887        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
888
889        // Test any mismtach in the blocklist status should retry a few times
890        let event = BlocklistValidatorEvent {
891            blocklisted: true,
892            public_keys: vec![pk, pk2],
893        };
894        let summary = BridgeCommitteeSummary {
895            members: vec![
896                (
897                    pk_bytes.clone(),
898                    MoveTypeCommitteeMember {
899                        sui_address: SuiAddress::random_for_testing_only(),
900                        bridge_pubkey_bytes: pk_bytes.clone(),
901                        voting_power: 5000,
902                        http_rest_url: "http://pk.url".to_string().as_bytes().to_vec(),
903                        blocklisted: true,
904                    },
905                ),
906                (
907                    pk_bytes2.clone(),
908                    MoveTypeCommitteeMember {
909                        sui_address: SuiAddress::random_for_testing_only(),
910                        bridge_pubkey_bytes: pk_bytes2.clone(),
911                        voting_power: 5000,
912                        http_rest_url: "http://pk2.url".to_string().as_bytes().to_vec(),
913                        blocklisted: false,
914                    },
915                ),
916            ],
917            member_registration: vec![],
918            last_committee_update_epoch: 0,
919        };
920        sui_client_mock.set_bridge_committee(summary.clone());
921        let timer = std::time::Instant::now();
922        let committee = get_latest_bridge_committee_with_blocklist_event(
923            sui_client.clone(),
924            event.clone(),
925            Duration::from_millis(500),
926        )
927        .await;
928        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
929        assert!(!committee.member(&pk_as_bytes2).unwrap().is_blocklisted);
930        let elapsed = timer.elapsed().as_millis();
931        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
932    }
933
934    #[tokio::test]
935    async fn test_get_bridge_pause_status_with_emergency_event() {
936        telemetry_subscribers::init_for_testing();
937        let sui_client_mock = SuiMockClient::default();
938        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
939
940        // Test event and onchain status match
941        let event = EmergencyOpEvent { frozen: true };
942        sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
943        let timer = std::time::Instant::now();
944        assert!(
945            get_latest_bridge_pause_status_with_emergency_event(
946                sui_client.clone(),
947                event.clone(),
948                Duration::from_secs(2),
949            )
950            .await
951        );
952        assert!(timer.elapsed().as_millis() < 500);
953
954        let event = EmergencyOpEvent { frozen: false };
955        sui_client_mock.set_is_bridge_paused(BRIDGE_UNPAUSED);
956        let timer = std::time::Instant::now();
957        assert!(
958            !get_latest_bridge_pause_status_with_emergency_event(
959                sui_client.clone(),
960                event.clone(),
961                Duration::from_secs(2),
962            )
963            .await
964        );
965        assert!(timer.elapsed().as_millis() < 500);
966
967        // Test the case where the onchain status (paused) is older. Then update onchain status in 1 second.
968        // Since the retry interval is 2 seconds, it should return the next retry.
969        sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
970        let timer = std::time::Instant::now();
971        // update the bridge to unpaused in 1 second
972        let sui_client_mock_clone = sui_client_mock.clone();
973        tokio::spawn(async move {
974            tokio::time::sleep(Duration::from_secs(1)).await;
975            sui_client_mock_clone.set_is_bridge_paused(BRIDGE_UNPAUSED);
976        });
977        assert!(
978            !get_latest_bridge_pause_status_with_emergency_event(
979                sui_client.clone(),
980                event.clone(),
981                Duration::from_secs(2),
982            )
983            .await
984        );
985        let elapsed = timer.elapsed().as_millis();
986        assert!(elapsed > 1000 && elapsed < 3000, "{}", elapsed);
987
988        // Test the case where the onchain status (paused) is newer. It should retry up to
989        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
990        sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
991        let timer = std::time::Instant::now();
992        assert!(
993            get_latest_bridge_pause_status_with_emergency_event(
994                sui_client.clone(),
995                event.clone(),
996                Duration::from_secs(2),
997            )
998            .await
999        );
1000        let elapsed = timer.elapsed().as_millis();
1001        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
1002    }
1003
1004    #[tokio::test]
1005    async fn test_update_bridge_authority_aggregation_with_url_change_event() {
1006        let (
1007            sui_monitor_tx,
1008            sui_monitor_rx,
1009            _eth_monitor_tx,
1010            eth_monitor_rx,
1011            sui_client_mock,
1012            sui_client,
1013            bridge_pause_tx,
1014            _bridge_pause_rx,
1015            mut authorities,
1016            bridge_metrics,
1017        ) = setup();
1018        let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1019        let agg = Arc::new(ArcSwap::new(Arc::new(
1020            BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
1021        )));
1022        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1023        let _handle = tokio::task::spawn(
1024            BridgeMonitor::new(
1025                sui_client.clone(),
1026                sui_monitor_rx,
1027                eth_monitor_rx,
1028                agg.clone(),
1029                bridge_pause_tx,
1030                sui_token_type_tags,
1031                bridge_metrics,
1032            )
1033            .run(),
1034        );
1035        let new_url = "http://new.url".to_string();
1036        authorities[0].base_url = new_url.clone();
1037        let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1038        let new_committee_summary =
1039            bridge_committee_to_bridge_committee_summary(new_committee.clone());
1040        sui_client_mock.set_bridge_committee(new_committee_summary.clone());
1041        sui_monitor_tx
1042            .send(SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(
1043                CommitteeMemberUrlUpdateEvent {
1044                    member: authorities[0].pubkey.clone(),
1045                    new_url: new_url.clone(),
1046                },
1047            ))
1048            .await
1049            .unwrap();
1050        // Wait for the monitor to process the event
1051        tokio::time::sleep(Duration::from_secs(1)).await;
1052        // Now expect the committee to be updated
1053        assert_eq!(
1054            agg.load()
1055                .committee
1056                .member(&BridgeAuthorityPublicKeyBytes::from(&authorities[0].pubkey))
1057                .unwrap()
1058                .base_url,
1059            new_url
1060        );
1061    }
1062
1063    #[tokio::test]
1064    async fn test_update_bridge_authority_aggregation_with_blocklist_event() {
1065        let (
1066            sui_monitor_tx,
1067            sui_monitor_rx,
1068            _eth_monitor_tx,
1069            eth_monitor_rx,
1070            sui_client_mock,
1071            sui_client,
1072            bridge_pause_tx,
1073            _bridge_pause_rx,
1074            mut authorities,
1075            bridge_metrics,
1076        ) = setup();
1077        let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1078        let agg = Arc::new(ArcSwap::new(Arc::new(
1079            BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
1080        )));
1081        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1082        let _handle = tokio::task::spawn(
1083            BridgeMonitor::new(
1084                sui_client.clone(),
1085                sui_monitor_rx,
1086                eth_monitor_rx,
1087                agg.clone(),
1088                bridge_pause_tx,
1089                sui_token_type_tags,
1090                bridge_metrics,
1091            )
1092            .run(),
1093        );
1094        authorities[0].is_blocklisted = true;
1095        let to_blocklist = &authorities[0];
1096        let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1097        let new_committee_summary =
1098            bridge_committee_to_bridge_committee_summary(new_committee.clone());
1099        sui_client_mock.set_bridge_committee(new_committee_summary.clone());
1100        sui_monitor_tx
1101            .send(SuiBridgeEvent::BlocklistValidatorEvent(
1102                BlocklistValidatorEvent {
1103                    public_keys: vec![to_blocklist.pubkey.clone()],
1104                    blocklisted: true,
1105                },
1106            ))
1107            .await
1108            .unwrap();
1109        // Wait for the monitor to process the event
1110        tokio::time::sleep(Duration::from_secs(1)).await;
1111        assert!(
1112            agg.load()
1113                .committee
1114                .member(&BridgeAuthorityPublicKeyBytes::from(&to_blocklist.pubkey))
1115                .unwrap()
1116                .is_blocklisted,
1117        );
1118    }
1119
1120    #[tokio::test]
1121    async fn test_update_bridge_pause_status_with_emergency_event() {
1122        let (
1123            sui_monitor_tx,
1124            sui_monitor_rx,
1125            _eth_monitor_tx,
1126            eth_monitor_rx,
1127            sui_client_mock,
1128            sui_client,
1129            bridge_pause_tx,
1130            bridge_pause_rx,
1131            authorities,
1132            bridge_metrics,
1133        ) = setup();
1134        let event = EmergencyOpEvent {
1135            frozen: !*bridge_pause_tx.borrow(), // toggle the bridge pause status
1136        };
1137        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1138        let agg = Arc::new(ArcSwap::new(Arc::new(
1139            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1140        )));
1141        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1142        let _handle = tokio::task::spawn(
1143            BridgeMonitor::new(
1144                sui_client.clone(),
1145                sui_monitor_rx,
1146                eth_monitor_rx,
1147                agg.clone(),
1148                bridge_pause_tx,
1149                sui_token_type_tags,
1150                bridge_metrics,
1151            )
1152            .run(),
1153        );
1154
1155        sui_client_mock.set_is_bridge_paused(event.frozen);
1156        sui_monitor_tx
1157            .send(SuiBridgeEvent::EmergencyOpEvent(event.clone()))
1158            .await
1159            .unwrap();
1160        // Wait for the monitor to process the event
1161        tokio::time::sleep(Duration::from_secs(1)).await;
1162        // Now expect the committee to be updated
1163        assert!(*bridge_pause_rx.borrow() == event.frozen);
1164    }
1165
1166    #[tokio::test]
1167    async fn test_update_sui_token_type_tags() {
1168        let (
1169            sui_monitor_tx,
1170            sui_monitor_rx,
1171            _eth_monitor_tx,
1172            eth_monitor_rx,
1173            _sui_client_mock,
1174            sui_client,
1175            bridge_pause_tx,
1176            _bridge_pause_rx,
1177            authorities,
1178            bridge_metrics,
1179        ) = setup();
1180        let event = NewTokenEvent {
1181            token_id: 255,
1182            type_name: TypeTag::from_str("0xbeef::beef::BEEF").unwrap(),
1183            native_token: false,
1184            decimal_multiplier: 1000000,
1185            notional_value: 100000000,
1186        };
1187        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1188        let agg = Arc::new(ArcSwap::new(Arc::new(
1189            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1190        )));
1191        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1192        let sui_token_type_tags_clone = sui_token_type_tags.clone();
1193        let _handle = tokio::task::spawn(
1194            BridgeMonitor::new(
1195                sui_client.clone(),
1196                sui_monitor_rx,
1197                eth_monitor_rx,
1198                agg.clone(),
1199                bridge_pause_tx,
1200                sui_token_type_tags_clone,
1201                bridge_metrics,
1202            )
1203            .run(),
1204        );
1205
1206        sui_monitor_tx
1207            .send(SuiBridgeEvent::NewTokenEvent(event.clone()))
1208            .await
1209            .unwrap();
1210        // Wait for the monitor to process the event
1211        tokio::time::sleep(Duration::from_secs(1)).await;
1212        // Now expect new token type tags to appear in sui_token_type_tags
1213        sui_token_type_tags
1214            .load()
1215            .clone()
1216            .get(&event.token_id)
1217            .unwrap();
1218    }
1219
1220    #[allow(clippy::type_complexity)]
1221    fn setup() -> (
1222        mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
1223        mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
1224        mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
1225        mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
1226        SuiMockClient,
1227        Arc<SuiClient<SuiMockClient>>,
1228        tokio::sync::watch::Sender<IsBridgePaused>,
1229        tokio::sync::watch::Receiver<IsBridgePaused>,
1230        Vec<BridgeAuthority>,
1231        Arc<BridgeMetrics>,
1232    ) {
1233        telemetry_subscribers::init_for_testing();
1234        let registry = Registry::new();
1235        mysten_metrics::init_metrics(&registry);
1236        init_all_struct_tags();
1237        let bridge_metrics = Arc::new(BridgeMetrics::new_for_testing());
1238
1239        let sui_client_mock = SuiMockClient::default();
1240        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1241        let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
1242            10000,
1243            &mysten_metrics::get_metrics()
1244                .unwrap()
1245                .channel_inflight
1246                .with_label_values(&["sui_monitor_queue"]),
1247        );
1248        let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
1249            10000,
1250            &mysten_metrics::get_metrics()
1251                .unwrap()
1252                .channel_inflight
1253                .with_label_values(&["eth_monitor_queue"]),
1254        );
1255
1256        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1257        let authorities = vec![
1258            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1259            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1260            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1261            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1262        ];
1263        (
1264            sui_monitor_tx,
1265            sui_monitor_rx,
1266            eth_monitor_tx,
1267            eth_monitor_rx,
1268            sui_client_mock,
1269            sui_client,
1270            bridge_pause_tx,
1271            bridge_pause_rx,
1272            authorities,
1273            bridge_metrics,
1274        )
1275    }
1276}