sui_bridge/
monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `BridgeMonitor` receives all `SuiBridgeEvent` and `EthBridgeEvent`
5//! and handles them accordingly.
6
7use crate::abi::{
8    EthBridgeCommitteeEvents, EthBridgeConfigEvents, EthBridgeEvent, EthBridgeLimiterEvents,
9    EthCommitteeUpgradeableContractEvents, EthSuiBridgeEvents,
10};
11use crate::client::bridge_authority_aggregator::BridgeAuthorityAggregator;
12use crate::crypto::BridgeAuthorityPublicKeyBytes;
13use crate::events::{BlocklistValidatorEvent, CommitteeMemberUrlUpdateEvent};
14use crate::events::{EmergencyOpEvent, SuiBridgeEvent};
15use crate::metrics::BridgeMetrics;
16use crate::retry_with_max_elapsed_time;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::types::{BridgeCommittee, IsBridgePaused};
19use arc_swap::ArcSwap;
20use std::collections::HashMap;
21use std::sync::Arc;
22use sui_types::TypeTag;
23use tokio::time::Duration;
24use tracing::{error, info, warn};
25
26const REFRESH_BRIDGE_RETRY_TIMES: u64 = 3;
27
28pub struct BridgeMonitor<C> {
29    sui_client: Arc<SuiClient<C>>,
30    sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
31    eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
32    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
33    bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
34    sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
35    bridge_metrics: Arc<BridgeMetrics>,
36}
37
38impl<C> BridgeMonitor<C>
39where
40    C: SuiClientInner + 'static,
41{
42    pub fn new(
43        sui_client: Arc<SuiClient<C>>,
44        sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
45        eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
46        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
47        bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
48        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
49        bridge_metrics: Arc<BridgeMetrics>,
50    ) -> Self {
51        Self {
52            sui_client,
53            sui_monitor_rx,
54            eth_monitor_rx,
55            bridge_auth_agg,
56            bridge_paused_watch_tx,
57            sui_token_type_tags,
58            bridge_metrics,
59        }
60    }
61
62    pub async fn run(self) {
63        tracing::info!("Starting BridgeMonitor");
64        let Self {
65            sui_client,
66            mut sui_monitor_rx,
67            mut eth_monitor_rx,
68            bridge_auth_agg,
69            bridge_paused_watch_tx,
70            sui_token_type_tags,
71            bridge_metrics,
72        } = self;
73        let mut latest_token_config = (*sui_token_type_tags.load().clone()).clone();
74
75        loop {
76            tokio::select! {
77                sui_event = sui_monitor_rx.recv() => {
78                    if let Some(sui_event) = sui_event {
79                        Self::handle_sui_events(
80                            sui_event,
81                            &sui_client,
82                            &bridge_auth_agg,
83                            &bridge_paused_watch_tx,
84                            &sui_token_type_tags,
85                            &bridge_metrics,
86                            &mut latest_token_config,
87                        )
88                        .await;
89                    } else {
90                        panic!("BridgeMonitor sui events channel was closed unexpectedly");
91                    }
92                }
93                eth_event = eth_monitor_rx.recv() => {
94                    if let Some(eth_event) = eth_event {
95                        Self::handle_eth_events(eth_event, &bridge_metrics);
96                    } else {
97                        panic!("BridgeMonitor eth events channel was closed unexpectedly");
98                    }
99                }
100            }
101        }
102    }
103
104    async fn handle_sui_events(
105        event: SuiBridgeEvent,
106        sui_client: &Arc<SuiClient<C>>,
107        bridge_auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
108        bridge_paused_watch_tx: &tokio::sync::watch::Sender<IsBridgePaused>,
109        sui_token_type_tags: &Arc<ArcSwap<HashMap<u8, TypeTag>>>,
110        bridge_metrics: &Arc<BridgeMetrics>,
111        latest_token_config: &mut HashMap<u8, TypeTag>,
112    ) {
113        info!("Received SuiBridgeEvent: {:?}", event);
114        macro_rules! bump_sui_counter {
115            ($action:expr) => {
116                bridge_metrics
117                    .observed_governance_actions
118                    .with_label_values(&[$action, "sui"])
119                    .inc();
120            };
121        }
122
123        match event {
124            SuiBridgeEvent::SuiToEthTokenBridgeV1(_) => (),
125            SuiBridgeEvent::TokenTransferApproved(_) => (),
126            SuiBridgeEvent::TokenTransferClaimed(_) => (),
127            SuiBridgeEvent::TokenTransferAlreadyApproved(_) => (),
128            SuiBridgeEvent::TokenTransferAlreadyClaimed(_) => (),
129            SuiBridgeEvent::TokenTransferLimitExceed(_) => {
130                // TODO do we want to do anything here?
131            }
132
133            SuiBridgeEvent::EmergencyOpEvent(event) => {
134                bump_sui_counter!(if event.frozen {
135                    "bridge_paused"
136                } else {
137                    "bridge_unpaused"
138                });
139                let is_paused = get_latest_bridge_pause_status_with_emergency_event(
140                    sui_client.clone(),
141                    event,
142                    Duration::from_secs(10),
143                )
144                .await;
145                bridge_paused_watch_tx
146                    .send(is_paused)
147                    .expect("Bridge pause status watch channel should not be closed");
148            }
149
150            SuiBridgeEvent::CommitteeMemberRegistration(_) => {
151                bump_sui_counter!("committee_member_registered");
152            }
153            SuiBridgeEvent::CommitteeUpdateEvent(_) => {
154                bump_sui_counter!("committee_updated");
155            }
156
157            SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => {
158                bump_sui_counter!("committee_member_url_updated");
159                let new_committee = get_latest_bridge_committee_with_url_update_event(
160                    sui_client.clone(),
161                    event,
162                    Duration::from_secs(10),
163                )
164                .await;
165                let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
166                bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
167                    Arc::new(new_committee),
168                    bridge_metrics.clone(),
169                    committee_names,
170                )));
171                info!("Committee updated with CommitteeMemberUrlUpdateEvent");
172            }
173
174            SuiBridgeEvent::BlocklistValidatorEvent(event) => {
175                bump_sui_counter!(if event.blocklisted {
176                    "validator_blocklisted"
177                } else {
178                    "validator_unblocklisted"
179                });
180                let new_committee = get_latest_bridge_committee_with_blocklist_event(
181                    sui_client.clone(),
182                    event,
183                    Duration::from_secs(10),
184                )
185                .await;
186                let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
187                bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
188                    Arc::new(new_committee),
189                    bridge_metrics.clone(),
190                    committee_names,
191                )));
192                info!("Committee updated with BlocklistValidatorEvent");
193            }
194
195            SuiBridgeEvent::TokenRegistrationEvent(_) => {
196                bump_sui_counter!("new_token_registered");
197            }
198
199            SuiBridgeEvent::NewTokenEvent(event) => {
200                bump_sui_counter!("new_token_added");
201                if let std::collections::hash_map::Entry::Vacant(entry) =
202                    // We only add new tokens but not remove so it's ok to just insert
203                    latest_token_config.entry(event.token_id)
204                {
205                    entry.insert(event.type_name.clone());
206                    sui_token_type_tags.store(Arc::new(latest_token_config.clone()));
207                } else {
208                    // invariant
209                    assert_eq!(event.type_name, latest_token_config[&event.token_id]);
210                }
211            }
212
213            SuiBridgeEvent::UpdateTokenPriceEvent(_event) => {
214                bump_sui_counter!("token_price_updated");
215            }
216
217            SuiBridgeEvent::UpdateRouteLimitEvent(_event) => {
218                bump_sui_counter!("limit_updated");
219            }
220        }
221    }
222
223    fn handle_eth_events(event: EthBridgeEvent, bridge_metrics: &Arc<BridgeMetrics>) {
224        info!("Received EthBridgeEvent: {:?}", event);
225
226        macro_rules! bump_eth_counter {
227            ($action:expr) => {
228                bridge_metrics
229                    .observed_governance_actions
230                    .with_label_values(&[$action, "eth"])
231                    .inc()
232            };
233        }
234
235        match event {
236            EthBridgeEvent::EthBridgeCommitteeEvents(event) => match event {
237                EthBridgeCommitteeEvents::BlocklistUpdatedFilter(event) => {
238                    bump_eth_counter!(if event.is_blocklisted {
239                        "validator_blocklisted"
240                    } else {
241                        "validator_unblocklisted"
242                    });
243                }
244                EthBridgeCommitteeEvents::InitializedFilter(_) => {
245                    bump_eth_counter!("committee_contract_initialized");
246                }
247                EthBridgeCommitteeEvents::UpgradedFilter(_) => {
248                    bump_eth_counter!("committee_contract_upgraded");
249                }
250                EthBridgeCommitteeEvents::BlocklistUpdatedV2Filter(e) => {
251                    bump_eth_counter!(if e.is_blocklisted {
252                        "validator_blocklisted"
253                    } else {
254                        "validator_unblocklisted"
255                    });
256                }
257                EthBridgeCommitteeEvents::ContractUpgradedFilter(_) => {
258                    bump_eth_counter!("committee_contract_upgraded");
259                }
260            },
261            EthBridgeEvent::EthBridgeLimiterEvents(event) => match event {
262                EthBridgeLimiterEvents::InitializedFilter(_) => {
263                    bump_eth_counter!("limiter_contract_initialized");
264                }
265                EthBridgeLimiterEvents::UpgradedFilter(_) => {
266                    bump_eth_counter!("limiter_contract_upgraded");
267                }
268                EthBridgeLimiterEvents::OwnershipTransferredFilter(_) => {
269                    bump_eth_counter!("limiter_contract_ownership_transferred");
270                }
271                EthBridgeLimiterEvents::LimitUpdatedFilter(_) => {
272                    bump_eth_counter!("limit_updated");
273                }
274                // This event is deprecated but we keep it for ABI compatibility
275                // TODO: We can safely update abi and remove it once the testnet bridge contract is upgraded
276                EthBridgeLimiterEvents::HourlyTransferAmountUpdatedFilter(_) => (),
277                EthBridgeLimiterEvents::ContractUpgradedFilter(_) => {
278                    bump_eth_counter!("limiter_contract_upgraded");
279                }
280                EthBridgeLimiterEvents::LimitUpdatedV2Filter(_) => {
281                    bump_eth_counter!("limit_updated");
282                }
283            },
284            EthBridgeEvent::EthBridgeConfigEvents(event) => match event {
285                EthBridgeConfigEvents::InitializedFilter(_) => {
286                    bump_eth_counter!("config_contract_initialized");
287                }
288                EthBridgeConfigEvents::UpgradedFilter(_) => {
289                    bump_eth_counter!("config_contract_upgraded");
290                }
291                EthBridgeConfigEvents::TokenAddedFilter(_) => {
292                    bump_eth_counter!("new_token_added");
293                }
294                EthBridgeConfigEvents::TokenPriceUpdatedFilter(_) => {
295                    bump_eth_counter!("update_token_price");
296                }
297                EthBridgeConfigEvents::ContractUpgradedFilter(_) => {
298                    bump_eth_counter!("config_contract_upgraded");
299                }
300                EthBridgeConfigEvents::TokenPriceUpdatedV2Filter(_) => {
301                    bump_eth_counter!("update_token_price");
302                }
303                EthBridgeConfigEvents::TokensAddedV2Filter(_) => {
304                    bump_eth_counter!("new_token_added");
305                }
306            },
307            EthBridgeEvent::EthCommitteeUpgradeableContractEvents(event) => match event {
308                EthCommitteeUpgradeableContractEvents::InitializedFilter(_) => {
309                    bump_eth_counter!("upgradeable_contract_initialized");
310                }
311                EthCommitteeUpgradeableContractEvents::UpgradedFilter(_) => {
312                    bump_eth_counter!("upgradeable_contract_upgraded");
313                }
314            },
315            EthBridgeEvent::EthSuiBridgeEvents(event) => match event {
316                EthSuiBridgeEvents::TokensClaimedFilter(_) => (),
317                EthSuiBridgeEvents::TokensDepositedFilter(_) => (),
318                EthSuiBridgeEvents::PausedFilter(_) => bump_eth_counter!("bridge_paused"),
319                EthSuiBridgeEvents::UnpausedFilter(_) => bump_eth_counter!("bridge_unpaused"),
320                EthSuiBridgeEvents::UpgradedFilter(_) => {
321                    bump_eth_counter!("bridge_contract_upgraded")
322                }
323                EthSuiBridgeEvents::InitializedFilter(_) => {
324                    bump_eth_counter!("bridge_contract_initialized")
325                }
326                EthSuiBridgeEvents::ContractUpgradedFilter(_) => {
327                    bump_eth_counter!("bridge_contract_upgraded")
328                }
329                EthSuiBridgeEvents::EmergencyOperationFilter(e) => {
330                    if e.paused {
331                        bump_eth_counter!("bridge_paused")
332                    } else {
333                        bump_eth_counter!("bridge_unpaused")
334                    }
335                }
336            },
337        }
338    }
339}
340
341async fn get_latest_bridge_committee_with_url_update_event<C: SuiClientInner>(
342    sui_client: Arc<SuiClient<C>>,
343    event: CommitteeMemberUrlUpdateEvent,
344    staleness_retry_interval: Duration,
345) -> BridgeCommittee {
346    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
347    loop {
348        let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
349            sui_client.get_bridge_committee(),
350            Duration::from_secs(600)
351        ) else {
352            error!("Failed to get bridge committee after retry");
353            continue;
354        };
355        let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(&event.member));
356        let Some(member) = member else {
357            // This is possible when a node is processing an older event while the member quitted at a later point, which is fine.
358            // Or fullnode returns a stale committee that the member hasn't joined, which is rare and tricy to handle so we just log it.
359            warn!(
360                "Committee member not found in the committee: {:?}",
361                event.member
362            );
363            return committee;
364        };
365        if member.base_url == event.new_url {
366            return committee;
367        }
368        // If url does not match, it could be:
369        // 1. the query is sent to a stale fullnode that does not have the latest data yet
370        // 2. the node is processing an older message, and the latest url has changed again
371        // In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
372        tokio::time::sleep(staleness_retry_interval).await;
373        remaining_retry_times -= 1;
374        if remaining_retry_times == 0 {
375            warn!(
376                "Committee member url {:?} does not match onchain record {:?} after retry",
377                event.member, member
378            );
379            return committee;
380        }
381    }
382}
383
384async fn get_latest_bridge_committee_with_blocklist_event<C: SuiClientInner>(
385    sui_client: Arc<SuiClient<C>>,
386    event: BlocklistValidatorEvent,
387    staleness_retry_interval: Duration,
388) -> BridgeCommittee {
389    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
390    loop {
391        let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
392            sui_client.get_bridge_committee(),
393            Duration::from_secs(600)
394        ) else {
395            error!("Failed to get bridge committee after retry");
396            continue;
397        };
398        let mut any_mismatch = false;
399        for pk in &event.public_keys {
400            let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(pk));
401            let Some(member) = member else {
402                // This is possible when a node is processing an older event while the member
403                // quitted at a later point. Or fullnode returns a stale committee that
404                // the member hasn't joined.
405                warn!("Committee member not found in the committee: {:?}", pk);
406                any_mismatch = true;
407                break;
408            };
409            if member.is_blocklisted != event.blocklisted {
410                warn!(
411                    "Committee member blocklist status does not match onchain record: {:?}",
412                    member
413                );
414                any_mismatch = true;
415                break;
416            }
417        }
418        if !any_mismatch {
419            return committee;
420        }
421        // If there is any match, it could be:
422        // 1. the query is sent to a stale fullnode that does not have the latest data yet
423        // 2. the node is processing an older message, and the latest blocklist status has changed again
424        // In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
425        tokio::time::sleep(staleness_retry_interval).await;
426        remaining_retry_times -= 1;
427        if remaining_retry_times == 0 {
428            warn!(
429                "Committee member blocklist status {:?} does not match onchain record after retry",
430                event
431            );
432            return committee;
433        }
434    }
435}
436
437async fn get_latest_bridge_pause_status_with_emergency_event<C: SuiClientInner>(
438    sui_client: Arc<SuiClient<C>>,
439    event: EmergencyOpEvent,
440    staleness_retry_interval: Duration,
441) -> IsBridgePaused {
442    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
443    loop {
444        let Ok(Ok(summary)) =
445            retry_with_max_elapsed_time!(sui_client.get_bridge_summary(), Duration::from_secs(600))
446        else {
447            error!("Failed to get bridge summary after retry");
448            continue;
449        };
450        if summary.is_frozen == event.frozen {
451            return summary.is_frozen;
452        }
453        // If the onchain status does not match, it could be:
454        // 1. the query is sent to a stale fullnode that does not have the latest data yet
455        // 2. the node is processing an older message, and the latest status has changed again
456        // In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
457        tokio::time::sleep(staleness_retry_interval).await;
458        remaining_retry_times -= 1;
459        if remaining_retry_times == 0 {
460            warn!(
461                "Bridge pause status {:?} does not match onchain record {:?} after retry",
462                event, summary.is_frozen
463            );
464            return summary.is_frozen;
465        }
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use std::str::FromStr;
472
473    use super::*;
474    use crate::events::{NewTokenEvent, init_all_struct_tags};
475    use crate::test_utils::{
476        bridge_committee_to_bridge_committee_summary, get_test_authority_and_key,
477    };
478    use crate::types::{BRIDGE_PAUSED, BRIDGE_UNPAUSED, BridgeAuthority};
479    use fastcrypto::traits::KeyPair;
480    use prometheus::Registry;
481    use sui_types::base_types::SuiAddress;
482    use sui_types::bridge::BridgeCommitteeSummary;
483    use sui_types::bridge::MoveTypeCommitteeMember;
484    use sui_types::crypto::get_key_pair;
485
486    use crate::{sui_mock_client::SuiMockClient, types::BridgeCommittee};
487    use sui_types::crypto::ToFromBytes;
488
489    #[tokio::test]
490    async fn test_get_latest_bridge_committee_with_url_update_event() {
491        telemetry_subscribers::init_for_testing();
492        let sui_client_mock = SuiMockClient::default();
493        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
494        let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
495        let pk = kp.public().clone();
496        let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
497        let pk_bytes = pk_as_bytes.as_bytes().to_vec();
498        let event = CommitteeMemberUrlUpdateEvent {
499            member: pk,
500            new_url: "http://new.url".to_string(),
501        };
502        let summary = BridgeCommitteeSummary {
503            members: vec![(
504                pk_bytes.clone(),
505                MoveTypeCommitteeMember {
506                    sui_address: SuiAddress::random_for_testing_only(),
507                    bridge_pubkey_bytes: pk_bytes.clone(),
508                    voting_power: 10000,
509                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
510                    blocklisted: false,
511                },
512            )],
513            member_registration: vec![],
514            last_committee_update_epoch: 0,
515        };
516
517        // Test the regular case, the onchain url matches
518        sui_client_mock.set_bridge_committee(summary.clone());
519        let timer = std::time::Instant::now();
520        let committee = get_latest_bridge_committee_with_url_update_event(
521            sui_client.clone(),
522            event.clone(),
523            Duration::from_secs(2),
524        )
525        .await;
526        assert_eq!(
527            committee.member(&pk_as_bytes).unwrap().base_url,
528            "http://new.url"
529        );
530        assert!(timer.elapsed().as_millis() < 500);
531
532        // Test the case where the onchain url is older. Then update onchain url in 1 second.
533        // Since the retry interval is 2 seconds, it should return the next retry.
534        let old_summary = BridgeCommitteeSummary {
535            members: vec![(
536                pk_bytes.clone(),
537                MoveTypeCommitteeMember {
538                    sui_address: SuiAddress::random_for_testing_only(),
539                    bridge_pubkey_bytes: pk_bytes.clone(),
540                    voting_power: 10000,
541                    http_rest_url: "http://old.url".to_string().as_bytes().to_vec(),
542                    blocklisted: false,
543                },
544            )],
545            member_registration: vec![],
546            last_committee_update_epoch: 0,
547        };
548        sui_client_mock.set_bridge_committee(old_summary.clone());
549        let timer = std::time::Instant::now();
550        // update the url to "http://new.url" in 1 second
551        let sui_client_mock_clone = sui_client_mock.clone();
552        tokio::spawn(async move {
553            tokio::time::sleep(Duration::from_secs(1)).await;
554            sui_client_mock_clone.set_bridge_committee(summary.clone());
555        });
556        let committee = get_latest_bridge_committee_with_url_update_event(
557            sui_client.clone(),
558            event.clone(),
559            Duration::from_secs(2),
560        )
561        .await;
562        assert_eq!(
563            committee.member(&pk_as_bytes).unwrap().base_url,
564            "http://new.url"
565        );
566        let elapsed = timer.elapsed().as_millis();
567        assert!(elapsed > 1000 && elapsed < 3000);
568
569        // Test the case where the onchain url is newer. It should retry up to
570        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
571        let newer_summary = BridgeCommitteeSummary {
572            members: vec![(
573                pk_bytes.clone(),
574                MoveTypeCommitteeMember {
575                    sui_address: SuiAddress::random_for_testing_only(),
576                    bridge_pubkey_bytes: pk_bytes.clone(),
577                    voting_power: 10000,
578                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
579                    blocklisted: false,
580                },
581            )],
582            member_registration: vec![],
583            last_committee_update_epoch: 0,
584        };
585        sui_client_mock.set_bridge_committee(newer_summary.clone());
586        let timer = std::time::Instant::now();
587        let committee = get_latest_bridge_committee_with_url_update_event(
588            sui_client.clone(),
589            event.clone(),
590            Duration::from_millis(500),
591        )
592        .await;
593        assert_eq!(
594            committee.member(&pk_as_bytes).unwrap().base_url,
595            "http://newer.url"
596        );
597        let elapsed = timer.elapsed().as_millis();
598        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
599
600        // Test the case where the member is not found in the committee
601        // It should return the onchain record.
602        let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
603        let pk2 = kp2.public().clone();
604        let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
605        let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
606        let newer_summary = BridgeCommitteeSummary {
607            members: vec![(
608                pk_bytes2.clone(),
609                MoveTypeCommitteeMember {
610                    sui_address: SuiAddress::random_for_testing_only(),
611                    bridge_pubkey_bytes: pk_bytes2.clone(),
612                    voting_power: 10000,
613                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
614                    blocklisted: false,
615                },
616            )],
617            member_registration: vec![],
618            last_committee_update_epoch: 0,
619        };
620        sui_client_mock.set_bridge_committee(newer_summary.clone());
621        let timer = std::time::Instant::now();
622        let committee = get_latest_bridge_committee_with_url_update_event(
623            sui_client.clone(),
624            event.clone(),
625            Duration::from_secs(1),
626        )
627        .await;
628        assert_eq!(
629            committee.member(&pk_as_bytes2).unwrap().base_url,
630            "http://newer.url"
631        );
632        assert!(committee.member(&pk_as_bytes).is_none());
633        let elapsed = timer.elapsed().as_millis();
634        assert!(elapsed < 1000);
635    }
636
637    #[tokio::test]
638    async fn test_get_latest_bridge_committee_with_blocklist_event() {
639        telemetry_subscribers::init_for_testing();
640        let sui_client_mock = SuiMockClient::default();
641        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
642        let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
643        let pk = kp.public().clone();
644        let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
645        let pk_bytes = pk_as_bytes.as_bytes().to_vec();
646
647        // Test the case where the onchain status is the same as the event (blocklisted)
648        let event = BlocklistValidatorEvent {
649            blocklisted: true,
650            public_keys: vec![pk.clone()],
651        };
652        let summary = BridgeCommitteeSummary {
653            members: vec![(
654                pk_bytes.clone(),
655                MoveTypeCommitteeMember {
656                    sui_address: SuiAddress::random_for_testing_only(),
657                    bridge_pubkey_bytes: pk_bytes.clone(),
658                    voting_power: 10000,
659                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
660                    blocklisted: true,
661                },
662            )],
663            member_registration: vec![],
664            last_committee_update_epoch: 0,
665        };
666        sui_client_mock.set_bridge_committee(summary.clone());
667        let timer = std::time::Instant::now();
668        let committee = get_latest_bridge_committee_with_blocklist_event(
669            sui_client.clone(),
670            event.clone(),
671            Duration::from_secs(2),
672        )
673        .await;
674        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
675        assert!(timer.elapsed().as_millis() < 500);
676
677        // Test the case where the onchain status is the same as the event (unblocklisted)
678        let event = BlocklistValidatorEvent {
679            blocklisted: false,
680            public_keys: vec![pk.clone()],
681        };
682        let summary = BridgeCommitteeSummary {
683            members: vec![(
684                pk_bytes.clone(),
685                MoveTypeCommitteeMember {
686                    sui_address: SuiAddress::random_for_testing_only(),
687                    bridge_pubkey_bytes: pk_bytes.clone(),
688                    voting_power: 10000,
689                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
690                    blocklisted: false,
691                },
692            )],
693            member_registration: vec![],
694            last_committee_update_epoch: 0,
695        };
696        sui_client_mock.set_bridge_committee(summary.clone());
697        let timer = std::time::Instant::now();
698        let committee = get_latest_bridge_committee_with_blocklist_event(
699            sui_client.clone(),
700            event.clone(),
701            Duration::from_secs(2),
702        )
703        .await;
704        assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
705        assert!(timer.elapsed().as_millis() < 500);
706
707        // Test the case where the onchain status is older. Then update onchain status in 1 second.
708        // Since the retry interval is 2 seconds, it should return the next retry.
709        let old_summary = BridgeCommitteeSummary {
710            members: vec![(
711                pk_bytes.clone(),
712                MoveTypeCommitteeMember {
713                    sui_address: SuiAddress::random_for_testing_only(),
714                    bridge_pubkey_bytes: pk_bytes.clone(),
715                    voting_power: 10000,
716                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
717                    blocklisted: true,
718                },
719            )],
720            member_registration: vec![],
721            last_committee_update_epoch: 0,
722        };
723        sui_client_mock.set_bridge_committee(old_summary.clone());
724        let timer = std::time::Instant::now();
725        // update unblocklisted in 1 second
726        let sui_client_mock_clone = sui_client_mock.clone();
727        tokio::spawn(async move {
728            tokio::time::sleep(Duration::from_secs(1)).await;
729            sui_client_mock_clone.set_bridge_committee(summary.clone());
730        });
731        let committee = get_latest_bridge_committee_with_blocklist_event(
732            sui_client.clone(),
733            event.clone(),
734            Duration::from_secs(2),
735        )
736        .await;
737        assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
738        let elapsed = timer.elapsed().as_millis();
739        assert!(elapsed > 1000 && elapsed < 3000);
740
741        // Test the case where the onchain url is newer. It should retry up to
742        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
743        let newer_summary = BridgeCommitteeSummary {
744            members: vec![(
745                pk_bytes.clone(),
746                MoveTypeCommitteeMember {
747                    sui_address: SuiAddress::random_for_testing_only(),
748                    bridge_pubkey_bytes: pk_bytes.clone(),
749                    voting_power: 10000,
750                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
751                    blocklisted: true,
752                },
753            )],
754            member_registration: vec![],
755            last_committee_update_epoch: 0,
756        };
757        sui_client_mock.set_bridge_committee(newer_summary.clone());
758        let timer = std::time::Instant::now();
759        let committee = get_latest_bridge_committee_with_blocklist_event(
760            sui_client.clone(),
761            event.clone(),
762            Duration::from_millis(500),
763        )
764        .await;
765        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
766        let elapsed = timer.elapsed().as_millis();
767        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
768
769        // Test the case where the member onchain url is not found in the committee
770        // It should return the onchain record after retrying a few times.
771        let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
772        let pk2 = kp2.public().clone();
773        let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
774        let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
775        let summary = BridgeCommitteeSummary {
776            members: vec![(
777                pk_bytes2.clone(),
778                MoveTypeCommitteeMember {
779                    sui_address: SuiAddress::random_for_testing_only(),
780                    bridge_pubkey_bytes: pk_bytes2.clone(),
781                    voting_power: 10000,
782                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
783                    blocklisted: false,
784                },
785            )],
786            member_registration: vec![],
787            last_committee_update_epoch: 0,
788        };
789        sui_client_mock.set_bridge_committee(summary.clone());
790        let timer = std::time::Instant::now();
791        let committee = get_latest_bridge_committee_with_blocklist_event(
792            sui_client.clone(),
793            event.clone(),
794            Duration::from_secs(1),
795        )
796        .await;
797        assert_eq!(
798            committee.member(&pk_as_bytes2).unwrap().base_url,
799            "http://newer.url"
800        );
801        assert!(committee.member(&pk_as_bytes).is_none());
802        let elapsed = timer.elapsed().as_millis();
803        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
804
805        // Test any mismtach in the blocklist status should retry a few times
806        let event = BlocklistValidatorEvent {
807            blocklisted: true,
808            public_keys: vec![pk, pk2],
809        };
810        let summary = BridgeCommitteeSummary {
811            members: vec![
812                (
813                    pk_bytes.clone(),
814                    MoveTypeCommitteeMember {
815                        sui_address: SuiAddress::random_for_testing_only(),
816                        bridge_pubkey_bytes: pk_bytes.clone(),
817                        voting_power: 5000,
818                        http_rest_url: "http://pk.url".to_string().as_bytes().to_vec(),
819                        blocklisted: true,
820                    },
821                ),
822                (
823                    pk_bytes2.clone(),
824                    MoveTypeCommitteeMember {
825                        sui_address: SuiAddress::random_for_testing_only(),
826                        bridge_pubkey_bytes: pk_bytes2.clone(),
827                        voting_power: 5000,
828                        http_rest_url: "http://pk2.url".to_string().as_bytes().to_vec(),
829                        blocklisted: false,
830                    },
831                ),
832            ],
833            member_registration: vec![],
834            last_committee_update_epoch: 0,
835        };
836        sui_client_mock.set_bridge_committee(summary.clone());
837        let timer = std::time::Instant::now();
838        let committee = get_latest_bridge_committee_with_blocklist_event(
839            sui_client.clone(),
840            event.clone(),
841            Duration::from_millis(500),
842        )
843        .await;
844        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
845        assert!(!committee.member(&pk_as_bytes2).unwrap().is_blocklisted);
846        let elapsed = timer.elapsed().as_millis();
847        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
848    }
849
850    #[tokio::test]
851    async fn test_get_bridge_pause_status_with_emergency_event() {
852        telemetry_subscribers::init_for_testing();
853        let sui_client_mock = SuiMockClient::default();
854        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
855
856        // Test event and onchain status match
857        let event = EmergencyOpEvent { frozen: true };
858        sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
859        let timer = std::time::Instant::now();
860        assert!(
861            get_latest_bridge_pause_status_with_emergency_event(
862                sui_client.clone(),
863                event.clone(),
864                Duration::from_secs(2),
865            )
866            .await
867        );
868        assert!(timer.elapsed().as_millis() < 500);
869
870        let event = EmergencyOpEvent { frozen: false };
871        sui_client_mock.set_is_bridge_paused(BRIDGE_UNPAUSED);
872        let timer = std::time::Instant::now();
873        assert!(
874            !get_latest_bridge_pause_status_with_emergency_event(
875                sui_client.clone(),
876                event.clone(),
877                Duration::from_secs(2),
878            )
879            .await
880        );
881        assert!(timer.elapsed().as_millis() < 500);
882
883        // Test the case where the onchain status (paused) is older. Then update onchain status in 1 second.
884        // Since the retry interval is 2 seconds, it should return the next retry.
885        sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
886        let timer = std::time::Instant::now();
887        // update the bridge to unpaused in 1 second
888        let sui_client_mock_clone = sui_client_mock.clone();
889        tokio::spawn(async move {
890            tokio::time::sleep(Duration::from_secs(1)).await;
891            sui_client_mock_clone.set_is_bridge_paused(BRIDGE_UNPAUSED);
892        });
893        assert!(
894            !get_latest_bridge_pause_status_with_emergency_event(
895                sui_client.clone(),
896                event.clone(),
897                Duration::from_secs(2),
898            )
899            .await
900        );
901        let elapsed = timer.elapsed().as_millis();
902        assert!(elapsed > 1000 && elapsed < 3000, "{}", elapsed);
903
904        // Test the case where the onchain status (paused) is newer. It should retry up to
905        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
906        sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
907        let timer = std::time::Instant::now();
908        assert!(
909            get_latest_bridge_pause_status_with_emergency_event(
910                sui_client.clone(),
911                event.clone(),
912                Duration::from_secs(2),
913            )
914            .await
915        );
916        let elapsed = timer.elapsed().as_millis();
917        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
918    }
919
920    #[tokio::test]
921    async fn test_update_bridge_authority_aggregation_with_url_change_event() {
922        let (
923            sui_monitor_tx,
924            sui_monitor_rx,
925            _eth_monitor_tx,
926            eth_monitor_rx,
927            sui_client_mock,
928            sui_client,
929            bridge_pause_tx,
930            _bridge_pause_rx,
931            mut authorities,
932            bridge_metrics,
933        ) = setup();
934        let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
935        let agg = Arc::new(ArcSwap::new(Arc::new(
936            BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
937        )));
938        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
939        let _handle = tokio::task::spawn(
940            BridgeMonitor::new(
941                sui_client.clone(),
942                sui_monitor_rx,
943                eth_monitor_rx,
944                agg.clone(),
945                bridge_pause_tx,
946                sui_token_type_tags,
947                bridge_metrics,
948            )
949            .run(),
950        );
951        let new_url = "http://new.url".to_string();
952        authorities[0].base_url = new_url.clone();
953        let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
954        let new_committee_summary =
955            bridge_committee_to_bridge_committee_summary(new_committee.clone());
956        sui_client_mock.set_bridge_committee(new_committee_summary.clone());
957        sui_monitor_tx
958            .send(SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(
959                CommitteeMemberUrlUpdateEvent {
960                    member: authorities[0].pubkey.clone(),
961                    new_url: new_url.clone(),
962                },
963            ))
964            .await
965            .unwrap();
966        // Wait for the monitor to process the event
967        tokio::time::sleep(Duration::from_secs(1)).await;
968        // Now expect the committee to be updated
969        assert_eq!(
970            agg.load()
971                .committee
972                .member(&BridgeAuthorityPublicKeyBytes::from(&authorities[0].pubkey))
973                .unwrap()
974                .base_url,
975            new_url
976        );
977    }
978
979    #[tokio::test]
980    async fn test_update_bridge_authority_aggregation_with_blocklist_event() {
981        let (
982            sui_monitor_tx,
983            sui_monitor_rx,
984            _eth_monitor_tx,
985            eth_monitor_rx,
986            sui_client_mock,
987            sui_client,
988            bridge_pause_tx,
989            _bridge_pause_rx,
990            mut authorities,
991            bridge_metrics,
992        ) = setup();
993        let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
994        let agg = Arc::new(ArcSwap::new(Arc::new(
995            BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
996        )));
997        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
998        let _handle = tokio::task::spawn(
999            BridgeMonitor::new(
1000                sui_client.clone(),
1001                sui_monitor_rx,
1002                eth_monitor_rx,
1003                agg.clone(),
1004                bridge_pause_tx,
1005                sui_token_type_tags,
1006                bridge_metrics,
1007            )
1008            .run(),
1009        );
1010        authorities[0].is_blocklisted = true;
1011        let to_blocklist = &authorities[0];
1012        let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1013        let new_committee_summary =
1014            bridge_committee_to_bridge_committee_summary(new_committee.clone());
1015        sui_client_mock.set_bridge_committee(new_committee_summary.clone());
1016        sui_monitor_tx
1017            .send(SuiBridgeEvent::BlocklistValidatorEvent(
1018                BlocklistValidatorEvent {
1019                    public_keys: vec![to_blocklist.pubkey.clone()],
1020                    blocklisted: true,
1021                },
1022            ))
1023            .await
1024            .unwrap();
1025        // Wait for the monitor to process the event
1026        tokio::time::sleep(Duration::from_secs(1)).await;
1027        assert!(
1028            agg.load()
1029                .committee
1030                .member(&BridgeAuthorityPublicKeyBytes::from(&to_blocklist.pubkey))
1031                .unwrap()
1032                .is_blocklisted,
1033        );
1034    }
1035
1036    #[tokio::test]
1037    async fn test_update_bridge_pause_status_with_emergency_event() {
1038        let (
1039            sui_monitor_tx,
1040            sui_monitor_rx,
1041            _eth_monitor_tx,
1042            eth_monitor_rx,
1043            sui_client_mock,
1044            sui_client,
1045            bridge_pause_tx,
1046            bridge_pause_rx,
1047            authorities,
1048            bridge_metrics,
1049        ) = setup();
1050        let event = EmergencyOpEvent {
1051            frozen: !*bridge_pause_tx.borrow(), // toggle the bridge pause status
1052        };
1053        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1054        let agg = Arc::new(ArcSwap::new(Arc::new(
1055            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1056        )));
1057        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1058        let _handle = tokio::task::spawn(
1059            BridgeMonitor::new(
1060                sui_client.clone(),
1061                sui_monitor_rx,
1062                eth_monitor_rx,
1063                agg.clone(),
1064                bridge_pause_tx,
1065                sui_token_type_tags,
1066                bridge_metrics,
1067            )
1068            .run(),
1069        );
1070
1071        sui_client_mock.set_is_bridge_paused(event.frozen);
1072        sui_monitor_tx
1073            .send(SuiBridgeEvent::EmergencyOpEvent(event.clone()))
1074            .await
1075            .unwrap();
1076        // Wait for the monitor to process the event
1077        tokio::time::sleep(Duration::from_secs(1)).await;
1078        // Now expect the committee to be updated
1079        assert!(*bridge_pause_rx.borrow() == event.frozen);
1080    }
1081
1082    #[tokio::test]
1083    async fn test_update_sui_token_type_tags() {
1084        let (
1085            sui_monitor_tx,
1086            sui_monitor_rx,
1087            _eth_monitor_tx,
1088            eth_monitor_rx,
1089            _sui_client_mock,
1090            sui_client,
1091            bridge_pause_tx,
1092            _bridge_pause_rx,
1093            authorities,
1094            bridge_metrics,
1095        ) = setup();
1096        let event = NewTokenEvent {
1097            token_id: 255,
1098            type_name: TypeTag::from_str("0xbeef::beef::BEEF").unwrap(),
1099            native_token: false,
1100            decimal_multiplier: 1000000,
1101            notional_value: 100000000,
1102        };
1103        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1104        let agg = Arc::new(ArcSwap::new(Arc::new(
1105            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1106        )));
1107        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1108        let sui_token_type_tags_clone = sui_token_type_tags.clone();
1109        let _handle = tokio::task::spawn(
1110            BridgeMonitor::new(
1111                sui_client.clone(),
1112                sui_monitor_rx,
1113                eth_monitor_rx,
1114                agg.clone(),
1115                bridge_pause_tx,
1116                sui_token_type_tags_clone,
1117                bridge_metrics,
1118            )
1119            .run(),
1120        );
1121
1122        sui_monitor_tx
1123            .send(SuiBridgeEvent::NewTokenEvent(event.clone()))
1124            .await
1125            .unwrap();
1126        // Wait for the monitor to process the event
1127        tokio::time::sleep(Duration::from_secs(1)).await;
1128        // Now expect new token type tags to appear in sui_token_type_tags
1129        sui_token_type_tags
1130            .load()
1131            .clone()
1132            .get(&event.token_id)
1133            .unwrap();
1134    }
1135
1136    #[allow(clippy::type_complexity)]
1137    fn setup() -> (
1138        mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
1139        mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
1140        mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
1141        mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
1142        SuiMockClient,
1143        Arc<SuiClient<SuiMockClient>>,
1144        tokio::sync::watch::Sender<IsBridgePaused>,
1145        tokio::sync::watch::Receiver<IsBridgePaused>,
1146        Vec<BridgeAuthority>,
1147        Arc<BridgeMetrics>,
1148    ) {
1149        telemetry_subscribers::init_for_testing();
1150        let registry = Registry::new();
1151        mysten_metrics::init_metrics(&registry);
1152        init_all_struct_tags();
1153        let bridge_metrics = Arc::new(BridgeMetrics::new_for_testing());
1154
1155        let sui_client_mock = SuiMockClient::default();
1156        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1157        let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
1158            10000,
1159            &mysten_metrics::get_metrics()
1160                .unwrap()
1161                .channel_inflight
1162                .with_label_values(&["sui_monitor_queue"]),
1163        );
1164        let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
1165            10000,
1166            &mysten_metrics::get_metrics()
1167                .unwrap()
1168                .channel_inflight
1169                .with_label_values(&["eth_monitor_queue"]),
1170        );
1171
1172        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1173        let authorities = vec![
1174            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1175            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1176            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1177            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1178        ];
1179        (
1180            sui_monitor_tx,
1181            sui_monitor_rx,
1182            eth_monitor_tx,
1183            eth_monitor_rx,
1184            sui_client_mock,
1185            sui_client,
1186            bridge_pause_tx,
1187            bridge_pause_rx,
1188            authorities,
1189            bridge_metrics,
1190        )
1191    }
1192}